Source code for hypernets.model.hyper_model

# -*- coding:utf-8 -*-
"""

"""
import time
import traceback
from collections import UserDict

from ..core.context import DefaultContext
from ..core.meta_learner import MetaLearner
from ..core.trial import Trial, TrialHistory, DiskTrialStore, DominateBasedTrialHistory
from ..discriminators import UnPromisingTrial
from ..dispatchers import get_dispatcher
from ..tabular import get_tool_box
from ..utils import logging, const, to_repr

logger = logging.get_logger(__name__)


[docs]class HyperModel: def __init__(self, searcher, dispatcher=None, callbacks=None, reward_metric=None, task=None, discriminator=None): """ :param searcher: :param dispatcher: :param callbacks: :param reward_metric: :param task: """ self.searcher = searcher self.dispatcher = dispatcher self.callbacks = callbacks if callbacks is not None else [] self.reward_metric = reward_metric searcher_type = searcher.kind() if searcher_type == const.SEARCHER_MOO: objective_names = [_.name for _ in searcher.objectives] directions = [_.direction for _ in searcher.objectives] self.history = DominateBasedTrialHistory(directions=directions, objective_names=objective_names) else: self.history = TrialHistory(searcher.optimize_direction) self.task = task self.discriminator = discriminator if self.discriminator: self.discriminator.bind_history(self.history) self.context = DefaultContext() def _get_estimator(self, space_sample): raise NotImplementedError @property def reward_metrics(self): if isinstance(self.reward_metric, list): return self.reward_metric else: return [self.reward_metric]
[docs] def load_estimator(self, model_file): raise NotImplementedError
def _run_trial(self, space_sample, trial_no, X, y, X_eval, y_eval, X_test=None, cv=False, num_folds=3, model_file=None, **fit_kwargs): start_time = time.time() estimator = self._get_estimator(space_sample) if self.discriminator: estimator.set_discriminator(self.discriminator) for callback in self.callbacks: try: callback.on_build_estimator(self, space_sample, estimator, trial_no) except Exception as e: logger.warn(e) metrics = fit_kwargs.pop('metrics') if 'metrics' in fit_kwargs else None if metrics is not None: assert isinstance(metrics, (tuple, list)), 'metrics should be list or tuple' metrics = list(set(list(metrics)).union(set(self.reward_metrics))) else: metrics = self.reward_metrics succeeded = False scores = None oof = None oof_scores = None x_vals = None y_vals = None X_trains = None y_trains = None try: if cv: ret_data = estimator.fit_cross_validation(X, y, stratified=True, num_folds=num_folds, shuffle=False, random_state=9527, metrics=metrics, **fit_kwargs) scores, oof, oof_scores, X_trains, y_trains, x_vals, y_vals = ret_data else: estimator.fit(X, y, **fit_kwargs) succeeded = True except UnPromisingTrial as e: logger.info(f'{e}') except Exception as e: logger.error(f'run_trail failed! trail_no={trial_no}') track = traceback.format_exc() logger.error(track) if succeeded: if model_file is None or len(model_file) == 0: model_file = '%05d_%s.pkl' % (trial_no, space_sample.space_id) estimator.save(model_file) elapsed = time.time() - start_time # Notes: does not contains evaluation trial = Trial(space_sample, trial_no, reward=None, elapsed=elapsed, model_file=model_file, succeeded=succeeded) trial.context = self.context if self.searcher.kind() != const.SEARCHER_MOO: if scores is None: scores = estimator.evaluate(X_eval, y_eval, metrics=metrics, **fit_kwargs) reward = self._get_reward(scores, self.reward_metrics) else: if cv: assert x_vals is not None and y_vals is not None reward = [fn.evaluate_cv(trial, estimator, X_trains, y_trains, x_vals, y_vals, X_test) for fn in self.searcher.objectives] else: reward = [fn.evaluate(trial, estimator, X_eval, y_eval, X, y, X_test) for fn in self.searcher.objectives] trial.reward = reward trial.iteration_scores = estimator.get_iteration_scores() trial.memo['scores'] = scores if oof is not None and self._is_memory_enough(oof): trial.memo['oof'] = oof if oof_scores is not None: trial.memo['oof_scores'] = oof_scores # improved = self.history.append(trial) self.searcher.update_result(space_sample, reward) else: elapsed = time.time() - start_time if self.searcher.kind() == const.SEARCHER_MOO: nan_scores = [None] * len(self.searcher.objectives) else: nan_scores = 0 trial = Trial(space_sample, trial_no, nan_scores, elapsed, succeeded=succeeded) if self.history is not None: t = self.history.get_worst() if t is not None: self.searcher.update_result(space_sample, t.reward) return trial @staticmethod def _is_memory_enough(oof): tb = get_tool_box(oof) free = tb.memory_free() / tb.memory_total() return free > 0.618 def _get_reward(self, value: dict, keys: list = None): def cast_float(value): try: fv = float(value) return fv except TypeError: return None if keys is None: keys = ['reward'] if not isinstance(value, (dict, UserDict)): raise ValueError(f"[value] should be a dict but is {value} ") rewards = [] for key in keys: if callable(key) and hasattr(key, '__name__'): key_name = key.__name__ else: key_name = key if key_name in value: reward = cast_float(value[key_name]) if reward is not None: rewards.append(reward) else: raise ValueError( f'[value] should be a numeric or a dict which has a key named "{key}" whose value is a numeric.') return rewards
[docs] def get_best_trial(self): return self.history.get_best()
@property def best_reward(self): best = self.get_best_trial() if best is not None: if isinstance(best, list): return [t.reward for t in best] else: return best.reward else: return None @property def best_trial_no(self): best = self.get_best_trial() if best is not None: if isinstance(best, list): return [t.trial_no for t in best] else: return best.trial_no else: return None
[docs] def get_top_trials(self, top_n): return self.history.get_top(top_n)
def _before_search(self): pass def _after_search(self, last_trial_no): pass
[docs] def search(self, X, y, X_eval, y_eval, X_test=None, cv=False, num_folds=3, max_trials=10, dataset_id=None, trial_store=None, **fit_kwargs): """ :param X: Pandas or Dask DataFrame, feature data for training :param y: Pandas or Dask Series, target values for training :param X_eval: (Pandas or Dask DataFrame) or None, feature data for evaluation :param y_eval: (Pandas or Dask Series) or None, target values for evaluation :param X_test: (Pandas or Dask Series) or None, target values for evaluation of indicators like PSI :param cv: Optional, int(default=False), If set to `true`, use cross-validation instead of evaluation set reward to guide the search process :param num_folds: Optional, int(default=3), Number of cross-validated folds, only valid when cv is true :param max_trials: Optional, int(default=10), The upper limit of the number of search trials, the search process stops when the number is exceeded :param dataset_id: :param trial_store: :param fit_kwargs: Optional, dict, parameters for fit method of model :return: """ if self.task is None or self.task == const.TASK_AUTO: self.task, _ = self.infer_task_type(y) if self.task not in [const.TASK_BINARY, const.TASK_MULTICLASS, const.TASK_REGRESSION, const.TASK_MULTILABEL]: logger.warning(f'Unexpected task "{self.task}"') if dataset_id is None: dataset_id = self.generate_dataset_id(X, y) if isinstance(trial_store, str): trial_store = DiskTrialStore(trial_store) if self.searcher.use_meta_learner: self.searcher.set_meta_learner(MetaLearner(self.history, dataset_id, trial_store)) self._before_search() dispatcher = self.dispatcher if self.dispatcher else get_dispatcher(self) for callback in self.callbacks: try: callback.on_search_start(self, X, y, X_eval, y_eval, cv, num_folds, max_trials, dataset_id, trial_store, **fit_kwargs) except Exception as e: logger.warn(e) try: trial_no = dispatcher.dispatch(self, X, y, X_eval, y_eval, X_test, cv, num_folds, max_trials, dataset_id, trial_store, **fit_kwargs) for callback in self.callbacks: try: callback.on_search_end(self) except Exception as e: logger.warn(e) except Exception as e: cb_ex = False for callback in self.callbacks: try: callback.on_search_error(self) except Exception as ce: logger.warn(ce) cb_ex = True if cb_ex: raise e else: raise self._after_search(trial_no)
[docs] def generate_dataset_id(self, X, y): if hasattr(X, 'shape') and len(getattr(X, 'shape')) == 2: tb = get_tool_box(X, y) sign = tb.data_hasher()([X, y]) else: import hashlib repr = '' if X is not None: if isinstance(X, list): repr += f'X len({len(X)})|' if hasattr(X, 'shape'): repr += f'X shape{X.shape}|' if hasattr(X, 'dtypes'): repr += f'x.dtypes({list(X.dtypes)})|' if y is not None: if isinstance(y, list): repr += f'y len({len(y)})|' if hasattr(y, 'shape'): repr += f'y shape{y.shape}|' if hasattr(y, 'dtype'): repr += f'y.dtype({y.dtype})|' sign = hashlib.md5(repr.encode('utf-8')).hexdigest() return sign
[docs] def final_train(self, space_sample, X, y, **kwargs): estimator = self._get_estimator(space_sample) estimator.set_discriminator(None) estimator.fit(X, y, **kwargs) return estimator
[docs] def export_configuration(self, trials): configurations = [] for trial in trials: configurations.append(self.export_trial_configuration(trial)) return configurations
[docs] def export_trial_configuration(self, trial): raise NotImplementedError
[docs] def infer_task_type(self, y): return get_tool_box(y).infer_task_type(y)
[docs] def plot_hyperparams(self, destination='notebook', output='hyperparams.html'): return self.history.plot_hyperparams(destination, output)
def __repr__(self): return to_repr(self)