Source code for hypernets.model.estimator

# -*- coding:utf-8 -*-
"""

"""
import copy

import numpy as np
from sklearn.model_selection import KFold, StratifiedKFold
from hypernets.utils import const


[docs]class Estimator(): def __init__(self, space_sample, task=const.TASK_BINARY, discriminator=None): self.space_sample = space_sample self.task = task self.discriminator = discriminator # fitted self.model = None self.cv_ = None self.cv_models_ = None @property def _estimator_type(self): if self.task in {const.TASK_BINARY, const.TASK_MULTICLASS, const.TASK_MULTILABEL}: return 'classifier' elif self.task in {const.TASK_REGRESSION, }: return 'regressor' else: return None
[docs] def set_discriminator(self, discriminator): self.discriminator = discriminator
[docs] def summary(self): raise NotImplementedError
[docs] def fit(self, X, y, **kwargs): raise NotImplementedError
[docs] def fit_cross_validation(self, X, y, stratified=True, num_folds=3, shuffle=False, random_state=9527, metrics=None): raise NotImplementedError
[docs] def predict(self, X, **kwargs): raise NotImplementedError
[docs] def predict_proba(self, X, **kwargs): raise NotImplementedError
[docs] def evaluate(self, X, y, metrics=None, **kwargs): raise NotImplementedError
[docs] def save(self, model_file): raise NotImplementedError
[docs] @staticmethod def load(model_file): raise NotImplementedError
[docs] def proba2predict(self, proba, proba_threshold=0.5): if self.task == 'regression': return proba if proba.shape[-1] > 2: predict = proba.argmax(axis=-1) elif proba.shape[-1] == 2: predict = (proba[:, 1] > proba_threshold).astype('int32') else: predict = (proba > proba_threshold).astype('int32') return predict
[docs] def get_iteration_scores(self): return []
[docs]class CrossValidationEstimator(): def __init__(self, base_estimator, task, num_folds=3, stratified=False, shuffle=False, random_state=None): self.base_estimator = base_estimator self.num_folds = num_folds self.stratified = stratified self.shuffle = shuffle self.random_state = random_state self.task = task self.oof_ = None self.classes_ = None self.estimators_ = []
[docs] def fit(self, X, y, **kwargs): self.oof_ = None self.estimators_ = [] if self.stratified and self.task == 'binary': iterators = StratifiedKFold(n_splits=self.num_folds, shuffle=self.shuffle, random_state=self.random_state) else: iterators = KFold(n_splits=self.num_folds, shuffle=self.shuffle, random_state=self.random_state) y = np.array(y) sample_weight = kwargs.get('sample_weight') for n_fold, (train_idx, valid_idx) in enumerate(iterators.split(X, y)): x_train_fold, y_train_fold = X.iloc[train_idx], y[train_idx] x_val_fold, y_val_fold = X.iloc[valid_idx], y[valid_idx] kwargs['eval_set'] = [(x_val_fold, y_val_fold)] if sample_weight is not None: sw_fold = sample_weight[train_idx] kwargs['sample_weight'] = sw_fold fold_est = copy.deepcopy(self.base_estimator) fold_est.fit(x_train_fold, y_train_fold, **kwargs) if self.classes_ is None: self.classes_ = fold_est.classes_ if self.task == 'regression': proba = fold_est.predict(x_val_fold) else: proba = fold_est.predict_proba(x_val_fold) if self.oof_ is None: if len(proba.shape) == 1: self.oof_ = np.zeros(y.shape, proba.dtype) else: self.oof_ = np.zeros((y.shape[0], proba.shape[-1]), proba.dtype) self.oof_[valid_idx] = proba self.estimators_.append(fold_est) return self
[docs] def predict_proba(self, X): proba_sum = None for est in self.estimators_: proba = est.predict_proba(X) if proba_sum is None: proba_sum = proba else: proba_sum += proba return proba_sum / len(self.estimators_)
[docs] def predict(self, X): if self.task == 'regression': pred_sum = None for est in self.estimators_: pred = est.predict(X) if pred_sum is None: pred_sum = pred else: pred_sum += pred return pred_sum / len(self.estimators_) elif self.task == 'binary': proba = self.predict_proba(X) pred = self.proba2predict(proba) pred = np.array(self.classes_).take(pred, axis=0) return pred
[docs] def proba2predict(self, proba, proba_threshold=0.5): assert len(proba.shape) <= 2 if self.task == 'regression': return proba if len(proba.shape) == 2: if proba.shape[-1] > 2: predict = proba.argmax(axis=-1) else: predict = (proba[:, -1] > proba_threshold).astype('int32') else: predict = (proba > proba_threshold).astype('int32') return predict