Source code for hypernets.tabular.column_selector

# -*- coding:utf-8 -*-
"""

"""
import numpy as np
import pandas as pd
from scipy.stats import skew, kurtosis
from sklearn.compose import make_column_selector

from .cfg import TabularCfg as cfg

try:
    import dask
    from dask import dataframe as dd

    _dask_installed = True
except ImportError:
    _dask_installed = False

try:
    import jieba

    _jieba_installed = True
except ImportError:
    _jieba_installed = False


[docs]class ColumnSelector(make_column_selector): __doc__ = make_column_selector.__doc__ def __init__(self, pattern=None, *, dtype_include=None, dtype_exclude=None): super(ColumnSelector, self).__init__(pattern, dtype_include=dtype_include, dtype_exclude=dtype_exclude) def __call__(self, df): if _dask_installed and isinstance(df, dd.DataFrame): # # if not hasattr(df, 'iloc'): # # raise ValueError("make_column_selector can only be applied to " # # "pandas dataframes") # df_row = df.iloc[:1] df_row = df if self.dtype_include is not None or self.dtype_exclude is not None: df_row = df_row.select_dtypes(include=self.dtype_include, exclude=self.dtype_exclude) cols = df_row.columns if self.pattern is not None: cols = cols[cols.str.contains(self.pattern, regex=True)] result = cols.tolist() else: result = super(ColumnSelector, self).__call__(df) return result def __str__(self) -> str: return self.__repr__() def __repr__(self) -> str: attrs = [] if self.pattern: attrs.append(f'pattern:{self.pattern}') if self.dtype_include: attrs.append(f'include:{self.dtype_include}') if self.dtype_exclude: attrs.append(f'exclude:{self.dtype_exclude}') s = f'{self.__class__.__name__}({", ".join(attrs)})' return s
[docs]class AutoCategoryColumnSelector(ColumnSelector): def __init__(self, pattern=None, *, dtype_include=None, dtype_exclude=None, cat_exponent=0.5): assert 0. < cat_exponent < 1.0 super(AutoCategoryColumnSelector, self).__init__(pattern, dtype_include=dtype_include, dtype_exclude=dtype_exclude) self.cat_exponent = cat_exponent def __call__(self, df, *args, uniquer=None, **kwargs): if self.pattern is not None or self.dtype_include is not None: selected = super().__call__(df) else: selected = [] dtype_exclude = self.dtype_exclude if self.dtype_exclude is not None else [] others = [c for c in df.columns.to_list() if c not in selected and str(df.dtypes[c]) not in dtype_exclude] if len(others) > 0: if callable(uniquer): nuniques = uniquer(df[others]) elif _dask_installed and isinstance(df, dd.DataFrame): nuniques = [df[c].nunique() for c in others] nuniques = {k: v for k, v in zip(others, dask.compute(*nuniques))} else: nuniques = df[others].nunique(axis=0).to_dict() nunique_limit = len(df) ** self.cat_exponent selected += [c for c, n in nuniques.items() if n <= nunique_limit] return selected
[docs]class TextColumnSelector(ColumnSelector): def __init__(self, pattern=None, *, dtype_include=None, dtype_exclude=None, word_count_threshold=cfg.column_selector_text_word_count_threshold): assert isinstance(word_count_threshold, int) and word_count_threshold >= 1 if dtype_include is None: dtype_include = ['object'] super(TextColumnSelector, self).__init__(pattern, dtype_include=dtype_include, dtype_exclude=dtype_exclude) self.word_count_threshold = word_count_threshold def __call__(self, df, *args, **kwargs): selected = super().__call__(df) if len(selected) > 0 and self.word_count_threshold > 1: word_count = df[selected].applymap(self._word_count).max(axis=0) if _dask_installed and isinstance(df, dd.DataFrame): word_count = word_count.compute() selected = [c for c, n in word_count.to_dict().items() if n >= self.word_count_threshold] return selected @staticmethod def _word_count(s): if not isinstance(s, str): return 0 exist_chinese = False if _jieba_installed: for ch in s: if u'\u4e00' <= ch <= u'\u9fff': exist_chinese = True break if exist_chinese: word_count = 0 for w in jieba.cut(s): if len(w.strip()) > 0: word_count += 1 else: word_count = len(s.split()) return word_count
[docs]class LatLongColumnSelector: def __call__(self, df): cols = column_object(df) if cols is None or len(cols) < 1: return cols if _dask_installed and isinstance(df, dd.DataFrame): row = df.reduction(LatLongColumnSelector._reduce_is_latlong, aggregate=np.all, aggregate_kwargs=dict(axis=0), meta={c: 'bool' for c in cols} ).compute() elif isinstance(df, pd.DataFrame): row = LatLongColumnSelector._reduce_is_latlong(df) else: raise ValueError(f'Unsupported dataframe type "{type(df)}"') r = [k for k, v in row.to_dict().items() if v] return r @staticmethod def _is_latlong(v): try: return v is None or \ (isinstance(v, tuple) and len(v) == 2 and -90.0 <= v[0] <= 90.0 and -180.0 <= v[1] <= 180.0) except: return False @staticmethod def _reduce_is_latlong(df): fn = np.vectorize(LatLongColumnSelector._is_latlong, otypes=[bool], signature='()->()') return df.apply(fn).all(axis=0)
[docs]class MinMaxColumnSelector(object): def __init__(self, min=None, max=None): self.min = min self.max = max def __call__(self, df): if _dask_installed and isinstance(df, dd.DataFrame): return self._select_dask_dataframe(df) elif isinstance(df, pd.DataFrame): return self._select_pandas_dataframe(df) else: raise ValueError(f'Unsupported dataframe type "{type(df)}"') def _select_pandas_dataframe(self, df): if self.min is not None and self.max is not None: df = df.aggregate(['min', 'max']) df = df.loc[:, (df.loc['min'] >= self.min) & (df.loc['max'] <= self.max)] elif self.min is not None: df = df.aggregate(['min']) df = df.loc[:, df.loc['min'] >= self.min] elif self.max is not None: df = df.aggregate(['max']) df = df.loc[:, df.loc['max'] <= self.max] return list(df.columns) def _select_dask_dataframe(self, df): if self.min is not None and self.max is not None: min_values, max_values = dask.compute(df.reduction(np.min, np.min), df.reduction(np.max, np.max)) df = pd.DataFrame({'min': min_values, 'max': max_values }).T df = df.loc[:, (df.loc['min'] >= self.min) & (df.loc['max'] <= self.max)] elif self.min is not None: df = pd.DataFrame({'min': df.reduction(np.min, np.min).compute() }).T df = df.loc[:, df.loc['min'] >= self.min] elif self.max is not None: df = pd.DataFrame({'max': df.reduction(np.max, np.max).compute() }).T df = df.loc[:, df.loc['max'] <= self.max] return list(df.columns)
[docs]class CompositedColumnSelector(object): def __init__(self, selectors): assert isinstance(selectors, (tuple, list)) and len(selectors) > 0 self.selectors = selectors def __call__(self, df): n = len(self.selectors) for i, selector in enumerate(self.selectors): columns = selector(df) if (i == n - 1) or len(columns) == 0: return columns df = df[columns] return list(df.columns) # un-reached
column_all = ColumnSelector() column_object_category_bool = ColumnSelector(dtype_include=['object', 'category', 'bool']) column_object_category_bool_with_auto = AutoCategoryColumnSelector(dtype_include=['object', 'category', 'bool'], cat_exponent=0.5) column_text = TextColumnSelector(dtype_include=['object']) column_latlong = LatLongColumnSelector() column_object = ColumnSelector(dtype_include=['object']) column_category = ColumnSelector(dtype_include=['category']) column_bool = ColumnSelector(dtype_include=['bool']) column_number = ColumnSelector(dtype_include='number') column_number_exclude_timedelta = ColumnSelector(dtype_include='number', dtype_exclude='timedelta') column_object_category_bool_int = ColumnSelector( dtype_include=['object', 'category', 'bool', 'int', 'int8', 'int16', 'int32', 'int64', 'uint', 'uint8', 'uint16', 'uint32', 'uint64']) column_timedelta = ColumnSelector(dtype_include='timedelta') column_datetimetz = ColumnSelector(dtype_include='datetimetz') column_datetime = ColumnSelector(dtype_include='datetime') column_all_datetime = ColumnSelector(dtype_include=['datetime', 'datetimetz']) column_int = ColumnSelector(dtype_include=['int', 'int8', 'int16', 'int32', 'int64', 'uint', 'uint8', 'uint16', 'uint32', 'uint64']) column_float = ColumnSelector(dtype_include=['float', 'float32', 'float64']) column_exclude_datetime = ColumnSelector( dtype_exclude=['timedelta', 'datetime', 'datetimetz', 'period[M]', 'period[D]', 'period[Q]']) column_zero_or_positive_int32 = CompositedColumnSelector( selectors=[column_int, MinMaxColumnSelector(0, np.iinfo(np.int32).max)] ) column_positive_int32 = CompositedColumnSelector( selectors=[column_int, MinMaxColumnSelector(1, np.iinfo(np.int32).max)] )
[docs]def column_min_max(X, min_value=None, max_value=None): selector = MinMaxColumnSelector(min_value, max_value) return selector(X)
[docs]def column_skewness_kurtosis(X, skew_threshold=0.5, kurtosis_threshold=0.5, columns=None): if columns is None: columns = column_number_exclude_timedelta(X) skew_values = skew(X[columns], axis=0, nan_policy='omit') kurtosis_values = kurtosis(X[columns], axis=0, nan_policy='omit') selected = [c for i, c in enumerate(columns) if abs(skew_values[i]) > skew_threshold or abs(kurtosis_values[i]) > kurtosis_threshold] return selected
[docs]def column_skewness_kurtosis_diff(X_1, X_2, diff_threshold=5, columns=None, smooth_fn=np.log, skewness_weights=1, kurtosis_weights=0): skew_x_1, skew_x_2, kurtosis_x_1, kurtosis_x_2, columns = calc_skewness_kurtosis(X_1, X_2, columns, smooth_fn) diff = np.log( abs(skew_x_1 - skew_x_2) * skewness_weights + np.log(abs(kurtosis_x_1 - kurtosis_x_2)) * kurtosis_weights) if isinstance(diff_threshold, tuple): index = np.argwhere((diff > diff_threshold[0]) & (diff <= diff_threshold[1])) else: index = np.argwhere(diff > diff_threshold) selected = [c for i, c in enumerate(columns) if i in index] return selected
[docs]def calc_skewness_kurtosis(X_1, X_2, columns=None, smooth_fn=np.log): if columns is None: columns = column_number_exclude_timedelta(X_1) X_1_t = X_1[columns] X_2_t = X_2[columns] if smooth_fn is not None: X_1_t[columns] = smooth_fn(X_1_t) X_2_t[columns] = smooth_fn(X_2_t) skew_x_1 = skew(X_1_t, axis=0, nan_policy='omit') skew_x_2 = skew(X_2_t, axis=0, nan_policy='omit') kurtosis_x_1 = kurtosis(X_1_t, axis=0, nan_policy='omit') kurtosis_x_2 = kurtosis(X_2_t, axis=0, nan_policy='omit') return skew_x_1, skew_x_2, kurtosis_x_1, kurtosis_x_2, columns