Source code for hypernets.pipeline.base

# -*- coding:utf-8 -*-
__author__ = 'yangjian'
"""

"""

from sklearn import pipeline, compose

from hypernets.core.ops import ConnectionSpace
from hypernets.core.search_space import ModuleSpace, Choice
from hypernets.tabular import dataframe_mapper


[docs]class HyperTransformer(ModuleSpace): def __init__(self, transformer=None, space=None, name=None, **hyperparams): self.transformer = transformer ModuleSpace.__init__(self, space, name, **hyperparams) def _compile(self): if self.transformer is not None: pv = self.param_values self.compile_fn = self.transformer(**pv) else: self.compile_fn = None def _forward(self, inputs): return self.compile_fn def _on_params_ready(self): pass
[docs]class ComposeTransformer(HyperTransformer): def __init__(self, space=None, name=None, **hyperparams): HyperTransformer.__init__(self, None, space, name, **hyperparams)
[docs] def compose(self): raise NotImplementedError
[docs] def get_transformers(self, last_module, input_id): transformers = [] next = last_module while True: if next.id == input_id: break assert isinstance(next, HyperTransformer) if isinstance(next, ComposeTransformer): next, transformer = next.compose() else: transformer = (next.name, next.output) transformers.insert(0, transformer) inputs = self.space.get_inputs(next) if len(inputs) <= 0: break assert len(inputs) == 1, 'Pipeline does not support branching.' next = inputs[0] return next, transformers
[docs]class PipelineInput(HyperTransformer): def __init__(self, space=None, name=None, **hyperparams): HyperTransformer.__init__(self, None, space, name, **hyperparams) self.output_id = None
[docs]class PipelineOutput(ComposeTransformer): def __init__(self, pipeline_name, columns=None, space=None, name=None, **hyperparams): ComposeTransformer.__init__(self, space, name, **hyperparams) self.input_id = None self.pipeline_name = pipeline_name self.columns = columns
[docs] def compose(self): inputs = self.space.get_inputs(self) assert len(inputs) == 1, 'Pipeline does not support branching.' next, steps = self.get_transformers(inputs[0], self.input_id) p = self.create_pipeline(steps) return next, (self.pipeline_name, p)
[docs] @staticmethod def create_pipeline(steps): return pipeline.Pipeline(steps)
[docs]class Pipeline(ConnectionSpace): input_space_cls = PipelineInput output_space_cls = PipelineOutput def __init__(self, module_list, columns=None, keep_link=False, space=None, name=None): assert isinstance(module_list, list), f'module_list must be a List.' assert len(module_list) > 0, f'module_list contains at least 1 Module.' assert all([isinstance(m, (ModuleSpace, list)) for m in module_list]), 'module_list can only contains ModuleSpace or list.' self._module_list = module_list self.columns = columns self.hp_lazy = Choice([0]) ConnectionSpace.__init__(self, self.pipeline_fn, keep_link, space, name, hp_lazy=self.hp_lazy)
[docs] def pipeline_fn(self, m): last = self._module_list[0] for i in range(1, len(self._module_list)): self.connect_module_or_subgraph(last, self._module_list[i]) # self._module_list[i](last) last = self._module_list[i] pipeline_input = self.input_space_cls(name=self.name + '_input', space=self.space) pipeline_output = self.output_space_cls(pipeline_name=self.name, columns=self.columns, name=self.name + '_output', space=self.space) pipeline_input.output_id = pipeline_output.id pipeline_output.input_id = pipeline_input.id input = self.space.get_sub_graph_inputs(last) assert len(input) == 1, 'Pipeline does not support branching.' output = self.space.get_sub_graph_outputs(last) assert len(output) == 1, 'Pipeline does not support branching.' input[0](pipeline_input) pipeline_output(output[0]) return pipeline_input, pipeline_output
[docs]class ColumnTransformer(ComposeTransformer): def __init__(self, remainder='drop', sparse_threshold=0.3, n_jobs=None, transformer_weights=None, space=None, name=None, **hyperparams): if remainder is not None and remainder != 'drop': hyperparams['remainder'] = remainder if sparse_threshold is not None and sparse_threshold != 0.3: hyperparams['sparse_threshold'] = sparse_threshold if n_jobs is not None: hyperparams['n_jobs'] = n_jobs if transformer_weights is not None: hyperparams['transformer_weights'] = transformer_weights ComposeTransformer.__init__(self, space, name, **hyperparams)
[docs] def compose(self): inputs = self.space.get_inputs(self) assert all([isinstance(m, PipelineOutput) for m in inputs]), 'The upstream module of `ColumnTransformer` must be `Pipeline`.' transformers = [] next = None for p in inputs: next, (pipeline_name, transformer) = p.compose() transformers.append((p.pipeline_name, transformer, p.columns)) pv = self.param_values ct = compose.ColumnTransformer(transformers, **pv) return next, (self.name, ct)
[docs]class DataFrameMapper(ComposeTransformer): def __init__(self, default=False, sparse=False, df_out=False, input_df=False, space=None, name=None, **hyperparams): if default != False: hyperparams['default'] = default if sparse is not None and sparse != False: hyperparams['sparse'] = sparse if df_out is not None and df_out != False: hyperparams['df_out'] = df_out if input_df is not None and input_df != False: hyperparams['input_df'] = input_df ComposeTransformer.__init__(self, space, name, **hyperparams)
[docs] def compose(self): inputs = self.space.get_inputs(self) assert all([isinstance(m, PipelineOutput) for m in inputs]), 'The upstream module of `DataFrameMapper` must be `Pipeline`.' transformers = [] next = None for p in inputs: next, (pipeline_name, transformer) = p.compose() transformers.append((p.columns, transformer)) pv = self.param_values ct = self._create_dataframe_mapper(features=transformers, **pv) return next, (self.name, ct)
@staticmethod def _create_dataframe_mapper(features, **kwargs): return dataframe_mapper.DataFrameMapper(features=features, **kwargs)