from collections import OrderedDict
from datetime import datetime
import numpy as np
import pandas as pd
from hypernets.hyperctl.batch import Batch, _ShellJob
from hypernets.utils import logging as hyn_logging
from hypernets.hyperctl import consts
logger = hyn_logging.getLogger(__name__)
[docs]class BatchCallback:
[docs] def on_start(self, batch):
pass
[docs] def on_job_start(self, batch, job, executor):
pass
[docs] def on_job_succeed(self, batch, job, executor, elapsed: float):
pass
[docs] def on_job_failed(self, batch, job, executor, elapsed: float):
"""Job ran failed """
pass
[docs] def on_job_break(self, batch, job, exception):
""" Job failed before running"""
pass
[docs] def on_finish(self, batch, elapsed: float):
"""Batch finished"""
pass
[docs]class ConsoleCallback(BatchCallback):
[docs] def on_start(self, batch):
print("on_start")
[docs] def on_job_start(self, batch, job, executor):
print("on_job_start")
[docs] def on_job_succeed(self, batch, job, executor, elapsed: float):
print("on_job_succeed")
[docs] def on_job_failed(self, batch, job, executor, elapsed: float):
print("on_job_failed")
[docs] def on_job_break(self, batch, job, exception):
print("on_job_break")
[docs] def on_finish(self, batch, elapsed: float):
print("on_finish")
[docs]class VisDOMCallback(BatchCallback):
def __init__(self, n_tail_jobs=100, elapsed_cut_bins=10, datetime_qcut_bins=10):
self.n_tail_jobs = n_tail_jobs
self.elapsed_cut_bins = elapsed_cut_bins
self.datetime_qcut_bins = datetime_qcut_bins
self._session_ = None
self._job_db_ = None
def _ensure_session(self):
import visdom
if self._session_ is None or not self._session_.check_connection(timeout_seconds=3): # lazy singleton
self._session_ = visdom.Visdom()
return self._session_
def _update_chart(self):
vis = self._ensure_session()
assert self._job_db_ is not None
df_job_db = self._job_db_.copy()
# plot status summary
status_summary_dict = df_job_db['status'].value_counts().to_dict()
status_summary_dict.setdefault(_ShellJob.STATUS_SUCCEED, 0)
status_summary_dict.setdefault(_ShellJob.STATUS_INIT, 0)
status_summary_dict.setdefault(_ShellJob.STATUS_FAILED, 0)
status_summary_dict.setdefault(_ShellJob.STATUS_RUNNING, 0)
vis.pie(
win='job_status_summary',
X=np.array(list(status_summary_dict.values())),
opts=dict(title="Job status summary", legend=list(status_summary_dict.keys()))
)
# plot last {self.n_tail_jobs} jobs elapsed
last_succeed_job = df_job_db[df_job_db['status'] == _ShellJob.STATUS_SUCCEED]\
.sort_values(by='end_datetime', ascending=False).head(self.n_tail_jobs)
if last_succeed_job.shape[0] > 1: # can not plot if only one row
vis.bar(win='last_job_elapsed', X=last_succeed_job['elapsed'],
opts=dict(title=f"Last {self.n_tail_jobs} succeed jobs", xlabel="Job name", ylabel="Elapsed",
rownames=last_succeed_job.index.tolist()))
# plot elapsed cut summary
df_job_db_elapsed_not_null = df_job_db[df_job_db['elapsed'].notnull()].copy()
if df_job_db_elapsed_not_null.shape[0] > 1:
df_job_db_elapsed_not_null['cut_elapsed'] = pd.cut(df_job_db_elapsed_not_null['elapsed'],
bins=self.elapsed_cut_bins, precision=2)
elapsed_cut_dict = {}
for k, v in df_job_db_elapsed_not_null['cut_elapsed'].value_counts().to_dict().items():
elapsed_cut_dict[str(k)] = v
vis.bar(
win='elapsed_cut_summary',
X=np.array(list(elapsed_cut_dict.values())),
opts=dict(title=f"Elapsed cut summary({self.elapsed_cut_bins} bins)", showlegend=False,
rownames=list(elapsed_cut_dict.keys()), xlabel="Elapsed range", ylabel="Jobs count")
)
# plot elapsed cut box
df_valid_end_datetime = df_job_db[df_job_db['end_datetime'].notnull()].copy()
if df_valid_end_datetime.shape[0] >= self.datetime_qcut_bins and df_valid_end_datetime.shape[0] > 1:
df_valid_end_datetime['end_datetime_qcut'] = pd.qcut(df_valid_end_datetime['end_datetime'].astype('float'),
q=self.datetime_qcut_bins,
duplicates='drop', precision=2)
df_box = pd.DataFrame()
min_len = min([len(_) for _ in df_valid_end_datetime[['end_datetime_qcut', 'elapsed']]
.groupby('end_datetime_qcut').groups.values()])
for k, v in df_valid_end_datetime[['end_datetime_qcut', 'elapsed']]\
.groupby('end_datetime_qcut').groups.items():
df_box[k] = df_valid_end_datetime.loc[v[:min_len]].reset_index()['elapsed']
def format_timestamp(t):
return datetime.fromtimestamp(t).strftime('%m-%d %H:%M:%S')
def format_interval(i):
return " - ".join((format_timestamp(i.right), format_timestamp(i.left)))
if df_box.shape[0] > 1:
vis.boxplot(df_box.values,
win='elapsed_boxplot',
opts=dict(title=f"Elapsed boxplot({self.datetime_qcut_bins} bins)",
xlabel="Time Range", ylabel="Elapsed", showlegend=False,
legend=[format_interval(c) for c in df_box.columns]))
# plot job count on each node
df_job_db_end_node_not_null = df_job_db[df_job_db['node'].notnull()].copy()
if df_job_db_end_node_not_null.shape[1] > 0:
node_summary_dict = df_job_db_end_node_not_null['node'].value_counts().to_dict()
node_jobs_array = list(node_summary_dict.values())
if len(node_jobs_array) > 1:
vis.bar(win='summary_node_jobs', X=node_jobs_array, opts=dict(title="succeed jobs of each node",
rownames=list(node_summary_dict.keys()),
xlabel="Node name",
ylabel="Jobs count"))
# vis.close()
[docs] def on_start(self, batch: Batch):
job_rows = []
# init job summary
for job in batch.jobs:
job_status = job.status
job_node = None
job_elapsed = None
if job_status == _ShellJob.STATUS_SUCCEED:
job_state = job.state_data()
job_ext = job_state['ext']
job_elapsed = job_state['elapsed']
if job_ext.get('backend_type') == 'remote':
job_node = job_ext.get('hostname')
else:
job_node = consts.HOST_LOCALHOST
row = (job.name, job_status, job_node, job_elapsed, job.start_datetime, job.end_datetime)
job_rows.append(row)
job_db = pd.DataFrame(data=job_rows, columns=['name', 'status', 'node', 'elapsed',
'start_datetime', 'end_datetime'])
job_db.set_index('name', inplace=True)
self._job_db_ = job_db
self._update_chart()
@staticmethod
def _extract_job_state(job):
if job.status != _ShellJob.STATUS_SUCCEED:
return None, None
else:
job_state = job.state_data()
job_ext = job_state['ext']
job_elapsed = job_state['elapsed']
if job_ext.get('backend_type') == 'remote':
return job_ext.get('hostname'), job_elapsed
else:
return consts.HOST_LOCALHOST, job_elapsed
[docs] def on_job_start(self, batch, job, executor):
self._update_on_event(job)
[docs] def on_job_succeed(self, batch: Batch, job, executor, elapsed: float):
self._update_on_event(job)
def _update_on_event(self, job):
assert self._job_db_ is not None
# job should in self._job_db
self._job_db_.loc[job.name, 'status'] = job.status
if job.status == _ShellJob.STATUS_SUCCEED:
job_node, job_elapsed = self._extract_job_state(job)
self._job_db_.loc[job.name, 'elapsed'] = job_elapsed
self._job_db_.loc[job.name, 'start_datetime'] = job.start_datetime
self._job_db_.loc[job.name, 'end_datetime'] = job.end_datetime
self._job_db_.loc[job.name, 'node'] = job_node
# re-redner
self._update_chart()
[docs] def on_job_break(self, batch, job, exception):
self._update_on_event(job)
[docs] def on_job_failed(self, batch, job, executor, elapsed: float):
self._update_on_event(job)