# -*- coding:utf-8 -*-
"""
"""
import math
import os
import pickle
import numpy as np
import psutil
from joblib import Parallel, delayed
from sklearn import metrics as sk_metrics
from hypernets.utils import const, logging
from .cfg import TabularCfg as cfg
logger = logging.get_logger(__name__)
_MIN_BATCH_SIZE = 100000
def _task_to_average(task):
if task == const.TASK_MULTICLASS:
average = 'macro'
else:
average = 'binary'
return average
[docs]def calc_score(y_true, y_preds, y_proba=None, metrics=('accuracy',), task=const.TASK_BINARY, pos_label=1,
classes=None, average=None):
score = {}
if y_proba is None:
y_proba = y_preds
if len(y_proba.shape) == 2 and y_proba.shape[-1] == 1:
y_proba = y_proba.reshape(-1)
if len(y_preds.shape) == 2 and y_preds.shape[-1] == 1:
y_preds = y_preds.reshape(-1)
if average is None:
average = _task_to_average(task)
recall_options = dict(average=average, labels=classes)
if pos_label is not None:
recall_options['pos_label'] = pos_label
for metric in metrics:
if callable(metric):
score[metric.__name__] = metric(y_true, y_preds)
else:
metric_lower = metric.lower()
if metric_lower == 'auc':
if len(y_proba.shape) == 2:
if task == const.TASK_MULTICLASS:
score[metric] = sk_metrics.roc_auc_score(y_true, y_proba, multi_class='ovo', labels=classes)
else:
score[metric] = sk_metrics.roc_auc_score(y_true, y_proba[:, 1])
else:
score[metric] = sk_metrics.roc_auc_score(y_true, y_proba)
elif metric_lower == 'accuracy':
if y_preds is None:
score[metric] = 0
else:
score[metric] = sk_metrics.accuracy_score(y_true, y_preds)
elif metric_lower == 'recall':
score[metric] = sk_metrics.recall_score(y_true, y_preds, **recall_options)
elif metric_lower == 'precision':
score[metric] = sk_metrics.precision_score(y_true, y_preds, **recall_options)
elif metric_lower == 'f1':
score[metric] = sk_metrics.f1_score(y_true, y_preds, **recall_options)
elif metric_lower == 'mse':
score[metric] = sk_metrics.mean_squared_error(y_true, y_preds)
elif metric_lower == 'mae':
score[metric] = sk_metrics.mean_absolute_error(y_true, y_preds)
elif metric_lower == 'msle':
score[metric] = sk_metrics.mean_squared_log_error(y_true, y_preds)
elif metric_lower in {'rmse', 'rootmeansquarederror', 'root_mean_squared_error'}:
score[metric] = np.sqrt(sk_metrics.mean_squared_error(y_true, y_preds))
elif metric_lower == 'r2':
score[metric] = sk_metrics.r2_score(y_true, y_preds)
elif metric_lower in {'logloss', 'log_loss'}:
score[metric] = sk_metrics.log_loss(y_true, y_proba, labels=classes)
return score
[docs]def metric_to_scoring(metric, task=const.TASK_BINARY, pos_label=None):
assert isinstance(metric, str)
metric2scoring = {
'auc': 'roc_auc_ovo',
'accuracy': 'accuracy',
'recall': 'recall',
'precision': 'precision',
'f1': 'f1',
'mse': 'neg_mean_squared_error',
'mae': 'neg_mean_absolute_error',
'msle': 'neg_mean_squared_log_error',
'rmse': 'neg_root_mean_squared_error',
'rootmeansquarederror': 'neg_root_mean_squared_error',
'root_mean_squared_error': 'neg_root_mean_squared_error',
'r2': 'r2',
'logloss': 'neg_log_loss',
}
metric2fn = {
'recall': sk_metrics.recall_score,
'precision': sk_metrics.precision_score,
'f1': sk_metrics.f1_score,
}
metric_lower = metric.lower()
if metric_lower not in metric2scoring.keys() and metric_lower not in metric2fn.keys():
raise ValueError(f'Not found matching scoring for {metric}')
if metric_lower in metric2fn.keys():
options = dict(average=_task_to_average(task))
if pos_label is not None:
options['pos_label'] = pos_label
scoring = sk_metrics.make_scorer(metric2fn[metric_lower], **options)
else:
scoring = sk_metrics.get_scorer(metric2scoring[metric_lower])
return scoring
[docs]def evaluate(estimator, X, y, metrics, *, task=None, pos_label=None, classes=None,
average=None, threshold=0.5, n_jobs=-1):
assert classes is None or isinstance(classes, (list, tuple, np.ndarray))
if isinstance(estimator, str):
assert os.path.exists(estimator), f'Not found {estimator}'
if task is None and classes is not None and len(classes) >= 2:
task = const.TASK_BINARY if len(classes) == 2 else const.TASK_MULTICLASS
if task is None:
task, c2 = _detect_task(estimator, y)
if classes is None or len(classes) < 2:
classes = c2
n_jobs = _detect_jobs(X, n_jobs)
if task in {const.TASK_BINARY, const.TASK_MULTICLASS}:
proba = predict_proba(estimator, X, n_jobs=n_jobs)
pred = proba2predict(proba, task=task, threshold=threshold, classes=classes)
if metrics is None:
metrics = ['auc', 'accuracy', 'f1', 'recall']
else:
pred = predict(estimator, X, n_jobs=n_jobs)
proba = None
if metrics is None:
metrics = ['mse', 'mae', 'msle', 'rmse', 'r2']
if task == const.TASK_BINARY and pos_label is None:
pos_label = classes[-1]
if logger.is_info_enabled():
logger.info(f'calc_score {metrics}, task={task}, pos_label={pos_label}, classes={classes}, average={average}')
scores = calc_score(y_true=y, y_preds=pred, y_proba=proba, metrics=metrics,
classes=classes, pos_label=pos_label, average=average)
return scores
[docs]def predict_proba(estimator, X, *, n_jobs=None):
if isinstance(estimator, str):
assert os.path.exists(estimator), f'Not found {estimator}'
n_jobs = _detect_jobs(X, n_jobs)
if logger.is_info_enabled():
logger.info(f'predict_proba with n_jobs={n_jobs}')
proba = _call_predict(estimator, 'predict_proba', X, n_jobs=n_jobs)
return proba
[docs]def predict(estimator, X, *, task=None, classes=None, threshold=0.5, n_jobs=None):
assert classes is None or isinstance(classes, (list, tuple, np.ndarray))
if isinstance(estimator, str):
assert os.path.exists(estimator), f'Not found {estimator}'
if task is None and classes is not None and len(classes) >= 2:
task = const.TASK_BINARY if len(classes) == 2 else const.TASK_MULTICLASS
if task is None:
task, c2 = _detect_task(estimator, None)
if classes is None or len(classes) < 2:
classes = c2
if task == const.TASK_REGRESSION:
n_jobs = _detect_jobs(X, n_jobs)
if logger.is_info_enabled():
logger.info(f'predict with n_jobs={n_jobs}')
pred = _call_predict(estimator, 'predict', X, n_jobs=n_jobs)
else:
proba = predict_proba(estimator, X, n_jobs=n_jobs)
if task is None and (len(proba.shape) < 2 or proba.shape[1] == 1):
task = const.TASK_REGRESSION
pred = proba2predict(proba, task=task, threshold=threshold, classes=classes)
return pred
[docs]def proba2predict(proba, *, task=None, threshold=0.5, classes=None):
assert len(proba.shape) <= 2
if len(proba.shape) == 0: # empty
return proba
from hypernets.tabular import get_tool_box
def is_one_dim(x):
return len(x.shape) == 1 or (len(x.shape) == 2 and x.shape[1] == 1)
if logger.is_info_enabled():
logger.info(f'proba2predict with task={task}, classes={classes}, threshold={threshold}')
if task == const.TASK_BINARY and is_one_dim(proba):
proba = get_tool_box(proba).fix_binary_predict_proba_result(proba)
if task == const.TASK_REGRESSION or is_one_dim(proba): # regression
return proba
if proba.shape[-1] > 2: # multiclass
pred = proba.argmax(axis=-1)
else: # binary
pred = (proba[:, -1] > threshold).astype(np.int32)
if classes is not None:
# if dex.is_dask_object(pred):
# pred = dex.da.take(np.array(classes), pred, axis=0)
# else:
# pred = np.take(np.array(classes), pred, axis=0)
tb = get_tool_box(pred)
pred = tb.take_array(np.array(classes), pred, axis=0)
return pred
def _detect_jobs(X, n_jobs):
if callable(getattr(X, 'compute', None)): # dask data frame
return 1
assert X.shape[0] > 0, f'Not found data.'
if n_jobs is None:
n_jobs = cfg.joblib_njobs
if n_jobs <= 0:
n_jobs = math.ceil(X.shape[0] / _MIN_BATCH_SIZE)
cores = psutil.cpu_count()
if n_jobs > cores:
n_jobs = cores
return n_jobs
def _detect_task(estimator, y):
if isinstance(estimator, str):
with open(estimator, 'rb') as f:
estimator = pickle.load(f)
task = None
classes = None
if task is None and hasattr(estimator, 'task'):
task = getattr(estimator, 'task', None)
if task is None and type(estimator).__name__.find('Pipeline') >= 0 and hasattr(estimator, 'steps'):
task = getattr(estimator.steps[-1][1], 'task', None)
if hasattr(estimator, 'classes_'):
classes = getattr(estimator, 'classes_', None)
if not (isinstance(classes, (list, tuple, np.ndarray)) and len(classes) > 1):
classes = None
if task is None and classes is not None:
task = const.TASK_BINARY if len(classes) == 2 else const.TASK_MULTICLASS
if classes is None and type(estimator).__name__.find('Pipeline') >= 0 and hasattr(estimator, 'steps'):
classes = getattr(estimator.steps[-1][1], 'classes_', None)
if not (isinstance(classes, (list, tuple, np.ndarray)) and len(classes) > 1):
classes = None
if task is None and classes is not None:
task = const.TASK_BINARY if len(classes) == 2 else const.TASK_MULTICLASS
if task is None and y is not None:
from hypernets.tabular import get_tool_box
task, c2 = get_tool_box(y).infer_task_type(y)
if classes is None:
classes = c2
return task, classes
def _load_and_run(estimator, fn_name, df, log_level):
if log_level is not None:
import warnings
warnings.filterwarnings('ignore')
logging.set_level(log_level)
if isinstance(estimator, str):
logger.info(f'load estimator {estimator}')
with open(estimator, 'rb') as f:
estimator = pickle.load(f)
fn = getattr(estimator, fn_name)
assert callable(fn)
logger.info(f'call {fn_name}')
result = fn(df)
return result
def _call_predict(estimator, fn_name, df, n_jobs=1):
if n_jobs > 1:
batch_size = math.ceil(df.shape[0] / n_jobs)
df_parts = [df[i:i + batch_size].copy() for i in range(df.index.start, df.index.stop, batch_size)]
log_level = logging.get_level()
options = cfg.joblib_options
pss = Parallel(n_jobs=n_jobs, **options)(delayed(_load_and_run)(
estimator, fn_name, x, log_level
) for x in df_parts)
if len(pss[0].shape) > 1:
result = np.vstack(pss)
else:
result = np.hstack(pss)
else:
result = _load_and_run(estimator, fn_name, df, None)
return result
[docs]class Metrics:
calc_score = calc_score
metric_to_scoring = metric_to_scoring
evaluate = evaluate
proba2predict = proba2predict
predict = predict
predict_proba = predict_proba