# -*- coding:utf-8 -*-
"""
"""
import datetime
import json
import os
import time
import numpy as np
import pandas as pd
from IPython.display import display, update_display, display_markdown
from tqdm.auto import tqdm
from ..utils import logging, fs, to_repr
logger = logging.get_logger(__name__)
[docs]class Callback():
def __init__(self):
pass
[docs] def on_search_start(self, hyper_model, X, y, X_eval, y_eval, cv, num_folds, max_trials, dataset_id, trial_store,
**fit_kwargs):
pass
[docs] def on_search_end(self, hyper_model):
pass
[docs] def on_search_error(self, hyper_model):
pass
[docs] def on_build_estimator(self, hyper_model, space, estimator, trial_no):
pass
[docs] def on_trial_begin(self, hyper_model, space, trial_no):
pass
[docs] def on_trial_end(self, hyper_model, space, trial_no, reward, improved, elapsed):
pass
[docs] def on_trial_error(self, hyper_model, space, trial_no):
pass
[docs] def on_skip_trial(self, hyper_model, space, trial_no, reason, reward, improved, elapsed):
pass
def __repr__(self):
return to_repr(self)
[docs]class EarlyStoppingError(RuntimeError):
def __init__(self, *arg):
self.args = arg
[docs]class EarlyStoppingCallback(Callback):
REASON_TRIAL_LIMIT = 'max_no_improvement_trials'
REASON_TIME_LIMIT = 'time_limit'
REASON_EXPECTED_REWARD = 'expected_reward'
def __init__(self, max_no_improvement_trials=0, mode='min', min_delta=0, time_limit=None, expected_reward=None):
super(Callback, self).__init__()
# assert time_limit is None or time_limit > 60, 'If `time_limit` is not None, it must be greater than 60.'
# settings
if mode == 'min':
self.op = np.less
elif mode == 'max':
self.op = np.greater
else:
raise ValueError(f'Unsupported mode:{mode}')
self.max_no_improvement_trials = max_no_improvement_trials
self.mode = mode
self.min_delta = min_delta
self.time_limit = time_limit
self.expected_reward = expected_reward
# running state
self.start_time = None
self.best_reward = None
self.best_trial_no = None
self.counter_no_improvement_trials = 0
self.triggered = None
self.triggered_reason = None
[docs] def on_search_start(self, hyper_model, X, y, X_eval, y_eval, cv, num_folds, max_trials, dataset_id, trial_store,
**fit_kwargs):
self.triggered = False
self.triggered_reason = None
[docs] def on_trial_begin(self, hyper_model, space, trial_no):
if self.start_time is None:
self.start_time = time.time()
[docs] def on_trial_end(self, hyper_model, space, trial_no, reward, improved, elapsed):
reward = reward[0] # NOTE only use first metric
if self.start_time is None:
self.start_time = time.time()
time_total = time.time() - self.start_time
if self.time_limit is not None and self.time_limit > 0:
if time_total > self.time_limit:
self.triggered = True
self.triggered_reason = self.REASON_TIME_LIMIT
if self.expected_reward is not None and self.expected_reward != 0.0:
if self.op(reward, self.expected_reward):
self.triggered = True
self.triggered_reason = self.REASON_EXPECTED_REWARD
if self.max_no_improvement_trials is not None and self.max_no_improvement_trials > 0:
if self.best_reward is None:
self.best_reward = reward
self.best_trial_no = trial_no
else:
if self.op(reward, self.best_reward - self.min_delta):
self.best_reward = reward
self.best_trial_no = trial_no
self.counter_no_improvement_trials = 0
else:
self.counter_no_improvement_trials += 1
if self.counter_no_improvement_trials >= self.max_no_improvement_trials:
self.triggered = True
self.triggered_reason = self.REASON_TRIAL_LIMIT
if self.triggered:
msg = f'Early stopping on trial : {trial_no}, reason: {self.triggered_reason}, ' \
f'best reward: {self.best_reward}, best trial: {self.best_trial_no}, ' \
f'elapsed seconds: {time_total}'
if logger.is_info_enabled():
logger.info(msg)
raise EarlyStoppingError(msg)
[docs]class FileLoggingCallback(Callback):
def __init__(self, searcher, output_dir=None):
super(FileLoggingCallback, self).__init__()
self.output_dir = self._prepare_output_dir(output_dir, searcher)
[docs] @staticmethod
def open(file_path, mode):
return open(file_path, mode=mode)
[docs] @staticmethod
def mkdirs(dir_path, exist_ok):
os.makedirs(dir_path, exist_ok=exist_ok)
def _prepare_output_dir(self, log_dir, searcher):
if log_dir is None:
log_dir = 'log'
if log_dir[-1] == '/':
log_dir = log_dir[:-1]
running_dir = f'exp_{searcher.__class__.__name__}_{datetime.datetime.now().__format__("%m%d-%H%M%S")}'
output_path = os.path.expanduser(f'{log_dir}/{running_dir}')
self.mkdirs(output_path, exist_ok=True)
return output_path
[docs] def on_build_estimator(self, hyper_model, space, estimator, trial_no):
pass
[docs] def on_trial_begin(self, hyper_model, space, trial_no):
pass
# with open(f'{self.output_dir}/trial_{trial_no}.log', 'w') as f:
# f.write(space.params_summary())
[docs] def on_trial_end(self, hyper_model, space, trial_no, reward, improved, elapsed):
reward = reward[0]
with self.open(f'{self.output_dir}/trial_{improved}_{trial_no:04d}_{reward:010.8f}_{elapsed:06.2f}.log',
'w') as f:
f.write(space.params_summary())
f.write('\r\n----------------Summary for Searcher----------------\r\n')
f.write(hyper_model.searcher.summary())
topn = 10
diff = hyper_model.history.diff(hyper_model.history.get_top(topn))
with self.open(f'{self.output_dir}/top_{topn}_diff.txt', 'w') as f:
diff_str = json.dumps(diff, indent=5)
f.write(diff_str)
f.write('\r\n')
f.write(hyper_model.searcher.summary())
with self.open(f'{self.output_dir}/top_{topn}_config.txt', 'w') as f:
trials = hyper_model.history.get_top(topn)
configs = hyper_model.export_configuration(trials)
for trial, conf in zip(trials, configs):
f.write(f'Trial No: {trial.trial_no}, Reward: {trial.reward}\r\n')
f.write(conf)
f.write('\r\n---------------------------------------------------\r\n\r\n')
[docs] def on_skip_trial(self, hyper_model, space, trial_no, reason, reward, improved, elapsed):
reward_repr = "_".join(list(map(lambda v: f"{v:010.8f}", reward)))
with self.open(
f'{self.output_dir}/trial_{reason}_{improved}_{trial_no:04d}_{reward_repr}_{elapsed:06.2f}.log',
'w') as f:
f.write(space.params_summary())
topn = 5
diff = hyper_model.history.diff(hyper_model.history.get_top(topn))
with self.open(f'{self.output_dir}/top_{topn}_diff.txt', 'w') as f:
diff_str = json.dumps(diff, indent=5)
f.write(diff_str)
[docs]class FileStorageLoggingCallback(FileLoggingCallback):
[docs] @staticmethod
def open(file_path, mode):
return fs.open(file_path, mode=mode)
[docs] @staticmethod
def mkdirs(dir_path, exist_ok):
fs.mkdirs(dir_path, exist_ok=exist_ok)
[docs]class SummaryCallback(Callback):
def __init__(self):
super(SummaryCallback, self).__init__()
self.start_search_time = None
[docs] def on_search_start(self, hyper_model, X, y, X_eval, y_eval, cv, num_folds, max_trials, dataset_id, trial_store,
**fit_kwargs):
self.start_search_time = time.time()
[docs] def on_build_estimator(self, hyper_model, space, estimator, trial_no):
# if logger.is_info_enabled():
# logger.info(f'\nTrial No:{trial_no}')
# logger.info(space.params_summary())
estimator.summary()
[docs] def on_trial_begin(self, hyper_model, space, trial_no):
if logger.is_info_enabled():
msg = f'\nTrial No:{trial_no}{space.params_summary()}\ntrial {trial_no} begin'
logger.info(msg)
[docs] def on_trial_end(self, hyper_model, space, trial_no, reward, improved, elapsed):
if logger.is_info_enabled():
logger.info(f'trial end. reward:{reward}, improved:{improved}, elapsed:{elapsed}')
logger.info(f'Total elapsed:{time.time() - self.start_search_time}')
[docs] def on_skip_trial(self, hyper_model, space, trial_no, reason, reward, improved, elapsed):
if logger.is_info_enabled():
logger.info(f'&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&')
logger.info(f'trial skip. reason:{reason}, reward:{reward}, improved:{improved}, elapsed:{elapsed}')
logger.info(f'&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&')
[docs]class NotebookCallback(Callback):
def __init__(self):
super(NotebookCallback, self).__init__()
self.current_trial_display_id = None
self.search_summary_display_id = None
self.best_trial_display_id = None
self.title_display_id = None
self.last_trial_no = 0
self.last_reward = 0
self.start_time = 0
self.max_trials = 0
[docs] def on_search_start(self, hyper_model, X, y, X_eval, y_eval, cv, num_folds, max_trials, dataset_id, trial_store,
**fit_kwargs):
self.start_time = time.time()
self.max_trials = max_trials
df_holder = pd.DataFrame()
settings = {'X': X.shape,
'y': y.shape,
'X_eval': X_eval.shape if X_eval is not None else None,
'y_eval': y_eval.shape if y_eval is not None else None,
'cv': cv,
'num_folds': num_folds,
'max_trials': max_trials,
# 'dataset_id': dataset_id,
# 'trail_store': trial_store,
'fit_kwargs': fit_kwargs.keys()
}
df_settings = pd.DataFrame({k: [v] for k, v in settings.items()})
display_markdown('#### Experiment Settings:', raw=True)
display(hyper_model, display_id=False)
display(df_settings, display_id=False)
display_markdown('#### Trials Summary:', raw=True)
handle = display(df_holder, display_id=True)
if handle is not None:
self.search_summary_display_id = handle.display_id
display_markdown('#### Best Trial:', raw=True)
handle = display(df_holder, display_id=True)
if handle is not None:
self.best_trial_display_id = handle.display_id
handle = display({'text/markdown': '#### Current Trial:'}, raw=True, include=['text/markdown'],
display_id=True)
if handle is not None:
self.title_display_id = handle.display_id
handle = display(df_holder, display_id=True)
if handle is not None:
self.current_trial_display_id = handle.display_id
[docs] def on_trial_begin(self, hyper_model, space, trial_no):
df_summary = pd.DataFrame([(trial_no, self.last_reward, hyper_model.best_trial_no,
hyper_model.best_reward,
time.time() - self.start_time,
len([t for t in hyper_model.history.trials if t.succeeded]),
self.max_trials)],
columns=['Trial No.', 'Previous reward', 'Best trial', 'Best reward',
'Total elapsed', 'Valid trials',
'Max trials'])
if self.search_summary_display_id is not None:
update_display(df_summary, display_id=self.search_summary_display_id)
if self.current_trial_display_id is not None:
update_display(space, display_id=self.current_trial_display_id)
[docs] def on_search_end(self, hyper_model):
df_summary = pd.DataFrame([(self.last_trial_no, self.last_reward, hyper_model.best_trial_no,
hyper_model.best_reward,
time.time() - self.start_time,
len([t for t in hyper_model.history.trials if t.succeeded]),
self.max_trials)],
columns=['Trial No.', 'Previous reward', 'Best trial', 'Best reward',
'Total elapsed', 'Valid trials',
'Max trials'])
if self.search_summary_display_id is not None:
update_display(df_summary, display_id=self.search_summary_display_id)
if self.title_display_id is not None:
update_display({'text/markdown': '#### Top trials:'}, raw=True, include=['text/markdown'],
display_id=self.title_display_id)
df_best_trials = pd.DataFrame([
(t.trial_no, t.reward, t.elapsed, t.space_sample.vectors) for t in hyper_model.get_top_trials(5)],
columns=['Trial No.', 'Reward', 'Elapsed', 'Space Vector'])
if self.current_trial_display_id is not None:
update_display(df_best_trials, display_id=self.current_trial_display_id)
[docs] def on_trial_end(self, hyper_model, space, trial_no, reward, improved, elapsed):
reward = reward[0]
self.last_trial_no = trial_no
self.last_reward = reward
best_trial = hyper_model.get_best_trial()
if best_trial is not None and not isinstance(best_trial, list) and self.best_trial_display_id is not None:
update_display(best_trial.space_sample, display_id=self.best_trial_display_id)
[docs] def on_trial_error(self, hyper_model, space, trial_no):
self.last_trial_no = trial_no
self.last_reward = 'ERR'
[docs]class ProgressiveCallback(Callback):
def __init__(self):
super(ProgressiveCallback, self).__init__()
self.pbar = None
[docs] def on_search_start(self, hyper_model, X, y, X_eval, y_eval, cv, num_folds, max_trials, dataset_id, trial_store,
**fit_kwargs):
self.pbar = tqdm(total=max_trials, leave=False, desc='search')
[docs] def on_search_end(self, hyper_model):
self.pbar.update(self.pbar.total)
self.pbar.close()
self.pbar = None
[docs] def on_search_error(self, hyper_model):
self.on_search_end(hyper_model)
[docs] def on_trial_end(self, hyper_model, space, trial_no, reward, improved, elapsed):
self.pbar.update(1)
[docs] def on_trial_error(self, hyper_model, space, trial_no):
self.pbar.update(1)