Source code for hypernets.core.trial

# -*- coding:utf-8 -*-

import datetime
import os
import pickle
import shutil
from collections import OrderedDict
from typing import List

import numpy as np
import pandas as pd

from hypernets.utils.common import isnotebook, to_repr
from ..core.searcher import OptimizeDirection
from ..core import pareto

def _is_bigdata(v):
    big_data_types = (pd.Series, pd.DataFrame, np.ndarray)
    if isinstance(v, big_data_types):
        return True

    type_name = type(v).__name__.lower()
    if any(type_name.find(s) for s in ('array', 'dataframe', 'series')):
        return True

    return False

[docs]class Trial(): def __init__(self, space_sample, trial_no, reward=None, elapsed=None, model_file=None, succeeded=True): self.space_sample = space_sample self.trial_no = trial_no self.reward = reward self.elapsed = elapsed self.model_file = model_file self.succeeded = succeeded self.memo = {} self.iteration_scores = {} self.context = None def __repr__(self): return to_repr(self) def _repr_html_(self): html = f'<div><h>Trial:</h>' html += '''<table border="1" class="dataframe"> <thead> <tr style="text-align: right;"> <th>key</th> <th>value</th> </tr> </thead> <tbody>''' html += f'''<tr> <td>Trial No.</td> <td>{self.trial_no}</td> </tr> <tr> <td>Reward</td> <td>{self.reward}</td> </tr> <tr> <td>Elapsed</td> <td>{self.elapsed}</td> </tr> <tr> <td>space.signature</td> <td>{self.space_sample.signature}</td> </tr> <tr> <td>space.vectors</td> <td>{self.space_sample.vectors}</td> </tr>''' params = self.space_sample.get_assigned_params() for i, hp in enumerate(params): html += f'''<tr> <td>{i}-{hp.alias}</td> <td>{hp.value}</td> </tr> <tr>''' html += ''' </tbody> </table> </div>''' return html def __getstate__(self): try: state = super().__getstate__() except AttributeError: state = self.__dict__ # state = {k: v for k, v in state.items() if k != 'memo'} memo = state.get('memo', None) big_data_exists = isinstance(memo, dict) and any(_is_bigdata(v) for v in memo.values()) if big_data_exists: compacted_memo = {k: v for k, v in memo.items() if not _is_bigdata(v)} state = state.copy() state['memo'] = compacted_memo return state
[docs] def to_df(self, include_params=False): out = OrderedDict(trial_no=self.trial_no, succeeded=self.succeeded, reward=self.reward, elapsed=self.elapsed) if isinstance(self.memo, dict): out.update(self.memo) if include_params: for p in self.space_sample.get_assigned_params(): out[p.alias] = p.value return pd.DataFrame({k: [v] for k, v in out.items()})
[docs] def get_model(self): from hypernets.utils import fs with, 'rb') as f: obj = pickle.load(f) return obj
[docs]class TrialHistory(): def __init__(self, optimize_direction): self.trials = [] self.optimize_direction = optimize_direction
[docs] def append(self, trial): old_best = self.get_best() self.trials.append(trial) new_best = self.get_best() improved = old_best != new_best return improved
[docs] def is_existed(self, space_sample): return space_sample.vectors in [t.space_sample.vectors for t in self.trials]
[docs] def get_trial(self, space_sample): all_vectors = [t.space_sample.vectors for t in self.trials] index = all_vectors.index(space_sample.vectors) if index >= 0: return self.trials[index] else: return None
[docs] def get_best(self): top1 = self.get_top(1) if len(top1) <= 0: return None else: return top1[0]
[docs] def get_worst(self): topn = self.get_top() return topn[-1] if len(topn) > 0 else None
[docs] def get_top(self, n=None): assert n is None or isinstance(n, int) valid_trials = [t for t in self.trials if t.succeeded] if len(valid_trials) <= 0: return [] sorted_trials = sorted(valid_trials, key=lambda t: t.reward, reverse=self.optimize_direction in ['max', OptimizeDirection.Maximize]) if isinstance(n, int) and n < len(sorted_trials): sorted_trials = sorted_trials[:n] return sorted_trials
[docs] def get_space_signatures(self): signatures = set() for s in [t.space_sample for t in self.trials]: signatures.add(s.signature) return signatures
[docs] def diff(self, trials): signatures = set() for s in [t.space_sample for t in trials]: signatures.add(s.signature) diffs = {} for sign in signatures: ts = [t for t in trials if t.space_sample.signature == sign] pv_dict = {} for t in ts: for p in t.space_sample.get_assigned_params(): k = p.alias v = str(p.value) if pv_dict.get(k) is None: pv_dict[k] = {} if pv_dict[k].get(v) is None: pv_dict[k][v] = [t.reward] else: pv_dict[k][v].append(t.reward) diffs[sign] = pv_dict return diffs
[docs] def get_trajectories(self): times, best_rewards, rewards = [0.0], [0.0], [0.0] his = sorted(self.trials, key=lambda t: t.trial_no) best_trial_no = 0 best_elapsed = 0 for t in his: rewards.append(t.reward) times.append(t.elapsed + times[-1]) if t.reward > best_rewards[-1]: best_rewards.append(t.reward) best_trial_no = t.trial_no best_elapsed = times[-1] else: best_rewards.append(best_rewards[-1]) return times, best_rewards, rewards, best_trial_no, best_elapsed
[docs] def save(self, filepath): if filepath.endswith('.pkl'): with open(filepath, 'wb') as output: pickle.dump(self, output, protocol=pickle.HIGHEST_PROTOCOL) return with open(filepath, 'w') as output: output.write(f'{self.optimize_direction}\r\n') for trial in self.trials: data = f'{trial.trial_no}|{trial.space_sample.vectors}|{trial.reward}|{trial.elapsed}' + \ f'|{trial.model_file if trial.model_file else ""}|{trial.succeeded}\r\n' output.write(data)
[docs] @staticmethod def load_history(space_fn, filepath): if filepath.endswith('.pkl'): with open(filepath, 'rb') as input: history = pickle.load(input) return history with open(filepath, 'r') as input: line = input.readline() history = TrialHistory(line.strip()) while line is not None and line != '': line = input.readline() if line is None or line.strip() == '': continue fields = line.strip().split('|') assert len(fields) >= 4, f'Trial format is not correct. \r\nline:[{line}]' sample = space_fn() vector = [float(n) if n.__contains__('.') else int(n) for n in fields[1].replace('[', '').replace(']', '').split(',')] sample.assign_by_vectors(vector) if len(fields) > 4: model_file = fields[4] else: model_file = None trial = Trial(space_sample=sample, trial_no=int(fields[0]), reward=float(fields[2]), elapsed=float(fields[3]), model_file=model_file) if len(fields) > 5: trial.succeeded = bool(fields[5]) history.append(trial) return history
def __repr__(self): out = OrderedDict(direction=self.optimize_direction) if len(self.trials) > 0: tops = self.get_top() out['size'] = len(self.trials) out['succeeded'] = len(tops) if len(tops) > 0: out['best_reward'] = tops[0].reward out['worst_reward'] = tops[-1].reward repr_ = ', '.join('%s=%r' % (k, v) for k, v in out.items()) return f'{type(self).__name__}({repr_})'
[docs] def to_df(self, include_params=False): if len(self.trials) > 0: df = pd.concat([t.to_df(include_params) for t in self.trials], axis=0) df.reset_index(drop=True, inplace=True) else: df = pd.DataFrame() return df
[docs] def plot_hyperparams(self, destination='notebook', output='hyperparams.html'): """Plot hyperparams in a parallel line chart Args: destination: one of notebook, html output: the html file path Returns: """ try: import pyecharts except Exception as e: raise Exception("You may not install 'pyecharts'," "please refer to and install it.") import pyecharts.options as opts from pyecharts.charts import Parallel if destination == 'notebook' and not isnotebook(): raise Exception("You may not running in a notebook," " try to set 'destination' to 'html' or run it in notebook ") if self.trials is None or len(self.trials) < 1: raise Exception("Trials is empty ") REWARD_METRIC_COL = 'Reward metric' def get_space_params(trial): space = trial.space_sample params_dict = {} for hyper_param in space.get_all_params(): references = list(hyper_param.references) if len(references) > 0: param_name = hyper_param.alias[len(list(hyper_param.references)[0].name) + 1:] param_value = hyper_param.value if isinstance(param_value, int) or isinstance(param_value, float): if not isinstance(param_value, bool): params_dict[param_name] = param_value params_dict[REWARD_METRIC_COL] = trial.reward return params_dict def make_dims(df_params): parallel_axis = [] for i, col in enumerate(df_params.columns): if df_params.dtypes[col].kind == 'O': parallel_axis.append({ "dim": i, "name": col, "type": "category", "data": df_params[col].unique().tolist(), }) else: parallel_axis.append({'dim': i, 'name': col}) return parallel_axis trials_params = [get_space_params(trial) for trial in self.trials] param_names = list(set([v for ps in trials_params for v in ps.keys()])) param_names.remove(REWARD_METRIC_COL) param_names.insert(len(param_names), REWARD_METRIC_COL) trial_params_values = [] for t in trials_params: param_values = [t.get(n) for n in param_names] trial_params_values.append(param_values) df_train_params = pd.DataFrame(data=trial_params_values, columns=param_names) # remove if all is None df_train_params.dropna(axis=1, how='all', inplace=True) parallel_axis = make_dims(df_train_params) chart = \ Parallel(init_opts=opts.InitOpts(width="%dpx" % (len(param_names) * 100), height="400px")) \ .add_schema(schema=parallel_axis).add(series_name="", data=df_train_params.values.tolist(), linestyle_opts=opts.LineStyleOpts(width=1, opacity=0.5), ).set_global_opts( visualmap_opts=[ opts.VisualMapOpts( type_="color", is_calculable=True, precision=2, # dimension=0, pos_left="-10", pos_bottom="30", max_=df_train_params[REWARD_METRIC_COL].max().tolist(), min_=df_train_params[REWARD_METRIC_COL].min().tolist() ) ] ) if destination == 'notebook': return chart.render_notebook() else: return chart.render(output)
[docs]class DominateBasedTrialHistory(TrialHistory): def __init__(self, directions, objective_names): super(DominateBasedTrialHistory, self).__init__(optimize_direction=directions[0]) self.directions = directions self.objective_names = objective_names
[docs] def get_best(self): succeed_trials = list(filter(lambda t: t.succeeded, self.trials)) solutions = np.asarray([t.reward for t in succeed_trials]) optimal_inx = pareto.calc_nondominated_set(solutions=solutions, directions=self.directions) return [succeed_trials[i] for i in optimal_inx]
[docs] def append(self, trial): self.trials.append(trial) return trial in self.get_best()
[docs] def to_df(self, include_params=False): if len(self.trials) > 0: df = super(DominateBasedTrialHistory, self).to_df(include_params=include_params) ns = self.get_best() df['non_dominated'] = [t in ns for t in self.trials] df['model_index'] = [ns.index(t) if t in ns else None for t in self.trials] scores: np.ndarray = np.array(df['reward'].values.tolist()) assert scores.shape[1] == len(self.objective_names) for i, name in enumerate(self.objective_names): df[f'reward_{name}'] = scores[:, i] else: df = pd.DataFrame() return df
[docs] def plot_best_trials(self, index=True, figsize=(5, 5), loc=None, bbox_to_anchor=None, xlim=None, ylim=None): try: from matplotlib import pyplot as plt except Exception: raise RuntimeError("it requires matplotlib installed.") if len(self.objective_names) != 2: raise RuntimeError("plot currently works only in case of 2 objectives. ") best_trials = self.get_best() objective_names = self.objective_names fig = plt.figure(figsize=figsize) ax = fig.add_subplot() comparison = list(filter(lambda v: v not in best_trials, self.trials)) if len(comparison) > 0: comp_scores = np.array([t.reward for t in comparison]) ax.scatter(comp_scores[:, 0], comp_scores[:, 1], c='blue', label='dominated', marker='o') if len(best_trials) > 0: best_scores = np.array([t.reward for t in best_trials]) ax.scatter(best_scores[:, 0], best_scores[:, 1], c='red', label='non-dominated', marker='o') best_scores_sorted = np.array(sorted(best_scores, key=lambda v: v[1])) # non-dominated does not mean optimal ax.plot(best_scores_sorted[:, 0], best_scores_sorted[:, 1], color='c') if index: for i, t in enumerate(best_trials): ax.text(t.reward[0], t.reward[1], f"{i}", ha='center', va='bottom', fontsize=9) if xlim: ax.set_xlim(*xlim) if ylim: ax.set_ylim(*ylim) ax.legend(loc=loc, bbox_to_anchor=bbox_to_anchor) plt.xlabel(objective_names[0]) plt.ylabel(objective_names[1]) plt.title(f"Best trials in TrialHistory(total={len(self.trials)})") return fig, ax
[docs]class TrialStore(object): def __init__(self): self.reset() self.load()
[docs] def put(self, dataset_id, trial): self.put_to_cache(dataset_id, trial) self._put(dataset_id, trial)
[docs] def get_from_cache(self, dataset_id, space_sample): if self._cache.get(dataset_id) is None: self._cache[dataset_id] = {} dataset = self._cache[dataset_id] if dataset.get(space_sample.signature) is None: dataset[space_sample.signature] = {} trial = dataset[space_sample.signature].get(self.sample2key(space_sample)) return trial
[docs] def put_to_cache(self, dataset_id, trial): if self._cache.get(dataset_id) is None: self._cache[dataset_id] = {} dataset = self._cache[dataset_id] if dataset.get(trial.space_sample.signature) is None: dataset[trial.space_sample.signature] = {} dataset[trial.space_sample.signature][self.sample2key(trial.space_sample)] = trial
[docs] def get(self, dataset_id, space_sample): trial = self.get_from_cache(dataset_id, space_sample) if trial is None: trial = self._get(dataset_id, space_sample) if trial is not None: self.put_to_cache(dataset_id, trial) return trial
def _get(self, dataset_id, space_sample): raise NotImplementedError def _put(self, dataset_id, trial): raise NotImplementedError
[docs] def load(self): raise NotImplementedError
[docs] def get_all(self, dataset_id, space_signature): raise NotImplementedError
[docs] def reset(self): raise NotImplementedError
[docs] def persist(self): raise NotImplementedError
[docs] def sample2key(self, space_sample): key = ','.join([str(f) for f in space_sample.vectors]) return key
[docs] def check_trial(self, trial): pass
[docs]class DiskTrialStore(TrialStore): def __init__(self, home_dir=None): self.home_dir = self.prepare_home_dir(home_dir) TrialStore.__init__(self)
[docs] def prepare_home_dir(self, home_dir): if home_dir is None: home_dir = 'trial_store' if home_dir[-1] == '/': home_dir = home_dir[:-1] home_dir = os.path.expanduser(home_dir) if not os.path.exists(home_dir): os.makedirs(home_dir) return home_dir
def _prepare_output_dir(self, log_dir, searcher): if log_dir is None: log_dir = 'log' if log_dir[-1] == '/': log_dir = log_dir[:-1] running_dir = f'exp_{searcher.__class__.__name__}_{"%m%d-%H%M%S")}' output_path = os.path.expanduser(f'{log_dir}/{running_dir}/') if not os.path.exists(output_path): os.makedirs(output_path) return output_path
[docs] def load(self): pass
[docs] def clear_history(self): shutil.rmtree(self.home_dir) self.prepare_home_dir(self.home_dir) self.reset()
[docs] def reset(self): self._cache = {}
def _get(self, dataset_id, space_sample): path = self.get_trial_path(dataset_id, space_sample) trial = self._load_trial(path) if trial is not None: trial.space_sample = space_sample return trial def _load_trial(self, path): if not os.path.exists(path): return None else: with open(path, 'rb') as f: trial = pickle.load(f) self.check_trial(trial) return trial def _put(self, dataset_id, trial): path = self.get_trial_path(dataset_id, trial.space_sample) if not os.path.exists(os.path.dirname(path)): os.makedirs(os.path.dirname(path)) with open(path, 'wb') as f: temp = Trial(space_sample=None, trial_no=trial.trial_no, reward=trial.reward, elapsed=trial.elapsed, model_file=trial.model_file) temp.space_sample_vectors = trial.space_sample.vectors pickle.dump(temp, f)
[docs] def persist(self): pass
[docs] def get_trial_path(self, dataset_id, space_sample): path = f'{self.home_dir}/{dataset_id}/{space_sample.signature}/{self.sample2key(space_sample)}.pkl' return path
[docs] def get_all(self, dataset_id, space_signature): path = f'{self.home_dir}/{dataset_id}/{space_signature}' trials = [] if not os.path.exists(path): return trials files = os.listdir(path) for f in files: if f.endswith('.pkl'): f = path + '/' + f trial = self._load_trial(f) trials.append(trial) return trials
default_trial_store = None
[docs]def get_default_trial_store(): return default_trial_store
[docs]def set_default_trial_store(trial_store): global default_trial_store default_trial_store = trial_store