# -*- coding:utf-8 -*-
__author__ = 'yangjian'
"""
"""
import copy
import inspect
import time
import numpy as np
from joblib import Parallel, delayed
from sklearn import model_selection as sksel
from sklearn.metrics import roc_auc_score, matthews_corrcoef, make_scorer
from hypernets.core import randint
from hypernets.utils import logging, const
from .cfg import TabularCfg as cfg
logger = logging.getLogger(__name__)
roc_auc_scorer = make_scorer(roc_auc_score, greater_is_better=True, needs_threshold=True)
matthews_corrcoef_scorer = make_scorer(matthews_corrcoef)
[docs]class FeatureSelectionCallback:
[docs] def on_round_start(self, round_no, features, ):
pass
[docs] def on_round_end(self, round_no, auc, features, remove_features, elapsed):
pass
[docs] def on_remove_shift_variable(self, shift_score, remove_features):
pass
[docs] def on_task_break(self, round_no, auc, features):
pass
[docs] def on_task_finished(self, round_no, auc, features):
pass
def _shift_score(X, y, scorer, cv, log_level):
from . import get_tool_box
if log_level is not None:
import warnings
warnings.filterwarnings('ignore')
logging.set_level(log_level)
tb = get_tool_box(X, y)
model = tb.general_estimator(X, y, task=const.TASK_BINARY)
if cv:
if scorer is None:
scorer = roc_auc_scorer
score_ = sksel.cross_val_score(model, X=X, y=y, verbose=0, scoring=scorer, cv=cv)
score = np.mean(score_)
else:
X_train, X_test, y_train, y_test = \
tb.train_test_split(X, y, test_size=0.3, random_state=9527, stratify=y)
model.fit(X_train, y_train, **_get_fit_kwargs(model, X_eval=X_test, y_eval=y_test))
if scorer:
score = scorer(model, X_test, y_test)
else:
y_proba = model.predict_proba(X_test)
score = tb.metrics.calc_score(y_test, y_proba, y_proba, metrics=['auc'])
assert 'auc' in score.keys()
score = score['auc']
return score
def _get_fit_kwargs(estimator, *, X_eval, y_eval, early_stopping_rounds=20, verbose=False):
kwargs = {}
fit_params = inspect.signature(estimator.fit).parameters.keys()
if 'eval_set' in fit_params and X_eval is not None and y_eval is not None:
kwargs['eval_set'] = [(X_eval, y_eval)]
if 'early_stopping_rounds' in fit_params and early_stopping_rounds is not None:
if type(estimator).__name__.find('DaskLGBM') < 0: # DaskLGBMXxx dose not support early_stopping_rounds now
kwargs['early_stopping_rounds'] = early_stopping_rounds
if 'verbose' in fit_params:
kwargs['verbose'] = verbose
return kwargs
def _detect_options(estimator, method, **kwargs):
r = {}
params = inspect.signature(getattr(estimator, method)).parameters.keys()
for k, v in kwargs.items():
if k in params:
r[k] = v
return r
[docs]class DriftDetector:
def __init__(self, preprocessor=None, estimator=None, random_state=None):
self.preprocessor = preprocessor
self.estimator_ = estimator
self.random_state = random_state if random_state is not None else randint()
self.auc_ = None
self.feature_names_ = None
self.feature_importances_ = None
self.fitted = False
[docs] def fit(self, X_train, X_test, sample_balance=True, max_test_samples=None, cv=5):
logger.info('Fit data for concept drift detection')
assert X_train.shape[1] == X_test.shape[1], 'The number of columns in X_train and X_test must be the same.'
assert len(set(X_train.columns.to_list()) - set(X_test.columns.to_list())) == 0, \
'The name of columns in X_train and X_test must be the same.'
from . import get_tool_box
train_size, test_size = len(X_train), len(X_test)
tb = get_tool_box(X_train, X_test)
if max_test_samples is not None and max_test_samples < test_size:
X_test, _ = tb.train_test_split(X_test, train_size=max_test_samples, random_state=self.random_state)
test_size = len(X_test)
if sample_balance:
if test_size > train_size:
X_test, _ = tb.train_test_split(X_test, train_size=train_size, random_state=self.random_state)
elif test_size < train_size:
X_train, _ = tb.train_test_split(X_train, train_size=test_size, random_state=self.random_state)
logger.info('Merge train and test data...')
X_merged, y = self._train_test_merge(X_train, X_test)
logger.info('Preprocessing...')
if self.preprocessor is None:
self.preprocessor = tb.general_preprocessor(X_merged)
X_merged = self.preprocessor.fit_transform(X_merged, y)
logger.info('Fitting and scoring...')
estimators, auc_all, importances = self._fit_and_score(X_merged, y, cv=cv)
self.feature_names_ = X_merged.columns.to_list()
self.estimator_ = estimators
self.auc_ = np.mean(auc_all)
self.feature_importances_ = np.mean(importances, axis=0)
self.fitted = True
return self
def _fit_and_score(self, X_merged, y_merged, *, cv):
from . import get_tool_box
auc_all = []
importances = []
estimators = []
tb = get_tool_box(X_merged, y_merged)
sel = tb.select_1d
iterators = tb.statified_kfold(n_splits=cv, shuffle=True, random_state=self.random_state)
for n_fold, (train_idx, valid_idx) in enumerate(iterators.split(X_merged, y_merged)):
logger.info(f'Fold:{n_fold + 1}')
x_train_fold, y_train_fold = sel(X_merged, train_idx), sel(y_merged, train_idx)
x_val_fold, y_val_fold = sel(X_merged, valid_idx), sel(y_merged, valid_idx)
estimator = tb.general_estimator(X_merged, y_merged, self.estimator_, task=const.TASK_BINARY)
# kwargs = {}
# estimator_type = type(estimator).__name__
# if estimator_type.find('LGBMClassifier') >= 0:
# kwargs['eval_set'] = [(x_val_fold, y_val_fold)]
# kwargs['early_stopping_rounds'] = 10
# kwargs['verbose'] = 0
kwargs = _get_fit_kwargs(estimator, X_eval=x_val_fold, y_eval=y_val_fold)
estimator.fit(x_train_fold, y_train_fold, **kwargs)
kwargs = _detect_options(estimator, 'predict_proba', to_local=True, verbose=False)
proba = estimator.predict_proba(x_val_fold, **kwargs)[:, 1]
y_val_fold, proba = tb.to_local(y_val_fold, proba)
auc = roc_auc_score(y_val_fold, proba)
logger.info(f'auc: {auc}')
auc_all.append(auc)
estimators.append(estimator)
importances.append(estimator.feature_importances_)
return estimators, auc_all, importances
[docs] def predict_proba(self, X):
assert self.fitted, 'Please fit it first.'
from . import get_tool_box
tb = get_tool_box(X)
X = self._copy_data(X)
cat_cols = tb.column_selector.column_object_category_bool(X)
num_cols = tb.column_selector.column_number_exclude_timedelta(X)
# X.loc[:, cat_cols + num_cols] = self.preprocessor.transform(X)
Xt = self.preprocessor.transform(X)
diff_cols = set(X.columns.tolist()) - set(cat_cols + num_cols)
if diff_cols:
# X.loc[:, cat_cols + num_cols] = Xt
X = tb.concat_df([X[diff_cols], Xt[cat_cols + num_cols]], axis=1)
else:
X = Xt
oof_proba = []
for i, estimator in enumerate(self.estimator_):
proba = estimator.predict_proba(X)[:, 1]
oof_proba.append(proba)
# proba = np.mean(oof_proba, axis=0)
proba = tb.mean_oof(oof_proba)
return proba
[docs] def train_test_split(self, X, y, test_size=0.25, remain_for_train=0.3):
assert 0 <= remain_for_train < 1.0, '`remain_for_train` must be < 1.0 and >= 0.'
if isinstance(test_size, float):
assert 0 < test_size < 1.0, '`test_size` must be < 1.0 and > 0.'
test_size = int(len(X) * test_size)
assert isinstance(test_size, int), '`test_size` can only be int or float'
split_size = int(test_size + test_size * remain_for_train)
assert split_size < len(X), \
'test_size+test_size*remain_for_train must be less than the number of samples in X.'
from . import get_tool_box
tb = get_tool_box(X, y)
sel = tb.select_1d
proba = self.predict_proba(X)
proba, = tb.to_local(proba)
sorted_indices = np.argsort(proba)
target_col = '__train_test_split_y__'
if hasattr(X, 'insert'):
X.insert(0, target_col, y)
else:
X[target_col] = y
if remain_for_train == 0:
X_train = sel(X, sorted_indices[:-test_size])
X_test = sel(X, sorted_indices[-test_size:])
else:
X_train_1 = sel(X, sorted_indices[:-split_size])
X_mixed = sel(X, sorted_indices[-split_size:])
X_train_2, X_test = tb.train_test_split(
X_mixed, test_size=test_size, shuffle=True, random_state=self.random_state)
X_train = tb.concat_df([X_train_1, X_train_2], axis=0)
y_train = X_train.pop(target_col)
y_test = X_test.pop(target_col)
X.pop(target_col)
return X_train, X_test, y_train, y_test
@staticmethod
def _copy_data(X):
return copy.deepcopy(X)
def _train_test_merge(self, X_train, X_test, shuffle=True):
from . import get_tool_box
target_col = '__hypernets_tmp__target__'
if hasattr(X_train, 'insert'):
X_train.insert(0, target_col, 0)
else:
X_train[target_col] = 0
if hasattr(X_test, 'insert'):
X_test.insert(0, target_col, 1)
else:
X_test[target_col] = 1
tb = get_tool_box(X_train, X_test)
X_merge = tb.concat_df([X_train, X_test], axis=0, repartition=shuffle, random_state=self.random_state)
y = X_merge.pop(target_col)
X_train.pop(target_col)
X_test.pop(target_col)
return X_merge, y
[docs]class FeatureSelectorWithDriftDetection:
parallelizable = True
def __init__(self, remove_shift_variable=True, variable_shift_threshold=0.7, variable_shift_scorer=None,
auc_threshold=0.55, min_features=10, remove_size=0.1,
sample_balance=True, max_test_samples=None, cv=5, random_state=None,
callbacks=None):
self.remove_shift_variable = remove_shift_variable
self.variable_shift_threshold = variable_shift_threshold
self.variable_shift_scorer = variable_shift_scorer
self.auc_threshold = auc_threshold
self.min_features = min_features
self.remove_size = remove_size
self.sample_balance = sample_balance
self.max_test_samples = max_test_samples
self.cv = cv
self.random_state = random_state if random_state is not None else randint()
self.callbacks = callbacks
[docs] def select(self, X_train, X_test, *, preprocessor=None, estimator=None, copy_data=False):
logger.info('Feature selection to try to eliminate the concept drift.')
if copy_data:
detector = self.get_detector(preprocessor, estimator, self.random_state)
X_train = detector._copy_data(X_train)
X_test = detector._copy_data(X_test)
scores = None
if self.remove_shift_variable:
scores = self._covariate_shift_score(X_train, X_test, scorer=self.variable_shift_scorer)
remain_features = []
remove_features = []
for col, score in scores.items():
if score <= self.variable_shift_threshold:
remain_features.append(col)
else:
remove_features.append(col)
logger.info(f'Remove shift variables:{col}, score:{score}')
if len(remain_features) < X_train.shape[1]:
X_train = X_train[remain_features]
X_test = X_test[remain_features]
if self.callbacks is not None:
for callback in self.callbacks:
callback.on_remove_shift_variable(scores, remove_features)
round = 1
history = []
while True:
start_time = time.time()
if self.callbacks is not None:
for callback in self.callbacks:
callback.on_round_start(round_no=round, features=X_train.columns.to_list())
logger.info(f'\nRound: {round}\n')
detector = self.get_detector(preprocessor, estimator, self.random_state)
detector.fit(X_train, X_test,
sample_balance=self.sample_balance,
max_test_samples=self.max_test_samples,
cv=self.cv)
logger.info(f'AUC:{detector.auc_}, Features:{detector.feature_names_}')
elapsed = time.time() - start_time
history.append({'auc': detector.auc_,
'n_features': len(detector.feature_names_),
'removed_features': [],
'feature_names': detector.feature_names_,
'feature_importances': detector.feature_importances_,
'elapsed': elapsed
})
if detector.auc_ <= self.auc_threshold:
logger.info(
f'AUC:{detector.auc_} has dropped below the threshold:{self.auc_threshold}, feature selection is over.')
if self.callbacks is not None:
for callback in self.callbacks:
callback.on_task_finished(round_no=round, auc=detector.auc_, features=detector.feature_names_)
return detector.feature_names_, history, scores
indices = np.argsort(detector.feature_importances_)
if indices.shape[0] <= self.min_features:
logger.info(f'The number of remaining features is insufficient to continue remove features. '
f'AUC:{detector.auc_} '
f'Remaining features:{detector.feature_names_}')
if self.callbacks is not None:
for callback in self.callbacks:
callback.on_task_break(round_no=round, auc=detector.auc_, features=detector.feature_names_)
return detector.feature_names_, history, scores
removes = int(indices.shape[0] * self.remove_size)
if removes <= 0:
logger.info(f'The number of remaining features is insufficient to continue remove features. '
f'AUC:{detector.auc_} '
f'Remaining features:({len(detector.feature_names_)}) / {detector.feature_names_}')
if self.callbacks is not None:
for callback in self.callbacks:
callback.on_task_break(round_no=round, auc=detector.auc_, features=detector.feature_names_)
return detector.feature_names_, history, scores
if (indices.shape[0] - removes) < self.min_features:
removes = indices.shape[0] - self.min_features
remain_features = list(np.array(detector.feature_names_)[indices[:-removes]])
remove_features = list(set(detector.feature_names_) - set(remain_features))
history[-1]['removed_features'] = remove_features
logger.info(f'Removed features: {remove_features}')
X_train = X_train[remain_features]
X_test = X_test[remain_features]
if self.callbacks is not None:
for callback in self.callbacks:
callback.on_round_end(round_no=round, auc=detector.auc_, features=detector.feature_names_,
remove_features=remove_features, elapsed=elapsed)
round += 1
def _covariate_shift_score(self, X_train, X_test, *,
preprocessor=None, estimator=None, scorer=None, cv=None,
shuffle=True, copy_data=True):
from . import get_tool_box
# assert all(isinstance(x, (pd.DataFrame, dd.DataFrame)) for x in (X_train, X_test)), \
# 'X_train and X_test must be a pandas or dask DataFrame.'
assert set(X_train.columns.to_list()) == set(X_test.columns.to_list()), \
'The columns in X_train and X_test must be the same.'
detector = self.get_detector(preprocessor, estimator, self.random_state)
if copy_data:
X_train = detector._copy_data(X_train)
X_test = detector._copy_data(X_test)
shift_variable_sample_limit = cfg.shift_variable_sample_limit
if len(X_train) > shift_variable_sample_limit:
logger.info(f'sample X_train to {shift_variable_sample_limit}')
X_train = get_tool_box(X_train).select_1d(X_train, np.arange(shift_variable_sample_limit))
if len(X_test) > shift_variable_sample_limit:
logger.info(f'sample X_test to {shift_variable_sample_limit}')
X_test = get_tool_box(X_test).select_1d(X_test, np.arange(shift_variable_sample_limit))
# Set target value
logger.info('Set target value...')
X_merged, y = detector._train_test_merge(X_train, X_test, shuffle=shuffle)
logger.info('Preprocessing...')
# Preprocess data: imputing and scaling
if preprocessor is None:
preprocessor = get_tool_box(X_merged).general_preprocessor(X_merged)
X_merged = preprocessor.fit_transform(X_merged)
# Calculate the shift score for each column separately.
logger.info('Scoring...')
scores = self._score_features(X_merged, y, scorer, cv)
return scores
def _score_features(self, X_merged, y, scorer, cv):
if not self.parallelizable or cfg.joblib_njobs in {0, 1}:
scores = {}
for c in X_merged.columns:
x = X_merged[[c]]
score = _shift_score(x, y, scorer, cv, None)
logger.info(f'column:{c}, score:{score}')
scores[c] = score
else:
log_level = logging.get_level()
col_parts = [X_merged[[c]] for c in X_merged.columns]
pss = Parallel(n_jobs=cfg.joblib_njobs, **cfg.joblib_options)(
delayed(_shift_score)(x, y, scorer, cv, log_level) for x in col_parts
)
scores = {k: v for k, v in zip(X_merged.columns.to_list(), pss)}
logger.info(f'scores: {scores}')
return scores
[docs] @staticmethod
def get_detector(preprocessor=None, estimator=None, random_state=None):
return DriftDetector(preprocessor=preprocessor, estimator=estimator, random_state=random_state)