Source code for hypernets.searchers.nsga_searcher

from typing import List
from functools import cmp_to_key

import numpy as np

from hypernets.utils import logging as hyn_logging, const
from ..core.pareto import pareto_dominate
from ..core import HyperSpace, get_random_state

from .moo import MOOSearcher
from .genetic import Individual, SinglePointMutation, _Survival, create_recombination

logger = hyn_logging.get_logger(__name__)


class _NSGAIndividual(Individual):
    def __init__(self, dna: HyperSpace, scores: np.ndarray, random_state):

        super().__init__(dna, scores, random_state)

        self.dna = dna
        self.scores = scores

        self.rank: int = -1  # rank starts from 1

        self.S: List[_NSGAIndividual] = []
        self.n: int = -1

        self.T: List[_NSGAIndividual] = []

        self.distance: float = -1.0  # crowding-distance

    def reset(self):
        self.rank = -1
        self.S = []
        self.n = 0
        self.T = []
        self.distance = -1.0

    def __repr__(self):
        return f"{self.__class__.__name__}(scores={self.scores}, " \
               f"rank={self.rank}, n={self.n}, distance={self.distance})"


class _RankAndCrowdSortSurvival(_Survival):

    def __init__(self, directions, population_size, random_state):
        self.directions = directions
        self.population_size = population_size
        self.random_state = random_state

    @staticmethod
    def crowding_distance_assignment(I: List[_NSGAIndividual]):
        scores_array = np.array([indi.scores for indi in I])

        maximum_array = np.max(scores_array, axis=0)
        minimum_array = np.min(scores_array, axis=0)

        for m in range(len(I[0].scores)):
            sorted_I = list(sorted(I, key=lambda v: v.scores[m], reverse=False))
            sorted_I[0].distance = float("inf")  # so that boundary points always selected, because they are not crowd
            sorted_I[len(I) - 1].distance = float("inf")
            # only assign distances for non-boundary points
            for i in range(len(I) - 2):
                ti = i + 1
                sorted_I[ti].distance = sorted_I[ti].distance \
                                        + (sorted_I[ti + 1].scores[m] - sorted_I[ti - 1].scores[m]) \
                                        / (maximum_array[m] - minimum_array[m])
        return I

    def fast_non_dominated_sort(self, pop: List[_NSGAIndividual]):
        for p in pop:
            p.reset()
        F_1 = []
        F = [F_1]  # to store pareto front of levels respectively
        for p in pop:
            p.n = 0
            for q in pop:
                if p == q:
                    continue
                if self.dominate(p, q, pop=pop):
                    p.S.append(q)
                if self.dominate(q, p, pop=pop):
                    p.T.append(q)
                    p.n = p.n + 1

            if p.n == 0:
                p.rank = 0
                F_1.append(p)

        i = 0
        while True:
            Q = []
            for p in F[i]:
                for q in p.S:
                    q.n = q.n - 1
                    if q.n == 0:
                        q.rank = i + 1
                        Q.append(q)
            if len(Q) == 0:
                break
            F.append(Q)
            i = i + 1
        return F

    def sort_font(self, front: List[_NSGAIndividual]):
        return self.crowding_distance_assignment(front)

    def sort_population(self, population: List[_NSGAIndividual]):
        return sorted(population, key=self.cmp_operator, reverse=False)

    def update(self, pop: List[_NSGAIndividual], challengers: List[Individual]):
        temp_pop = []
        temp_pop.extend(pop)
        temp_pop.extend(challengers)
        if len(pop) < self.population_size:
            return temp_pop

        # assign a weighted Euclidean distance for each one
        p_sorted = self.fast_non_dominated_sort(temp_pop)
        if len(p_sorted) == 1 and len(p_sorted[0]) == 0:
            print(f"ERR: {p_sorted}")

        # sort individual in a front
        p_selected: List[_NSGAIndividual] = []
        for rank, P_front in enumerate(p_sorted):
            if len(P_front) == 0:
                break
            individuals = self.sort_font(P_front)  # only assign distance for nsga
            p_selected.extend(individuals)
            if len(p_selected) >= self.population_size:
                break

        # ensure population size
        p_cmp_sorted = list(sorted(p_selected, key=cmp_to_key(self.cmp_operator), reverse=True))
        p_final = p_cmp_sorted[:self.population_size]
        logger.debug(f"Individual {p_cmp_sorted[self.population_size-1: ]} have been removed from population,"
                     f" sorted population ={p_cmp_sorted}")

        return p_final

    def dominate(self, ind1: _NSGAIndividual, ind2: _NSGAIndividual, pop: List[_NSGAIndividual]):
        return pareto_dominate(x1=ind1.scores, x2=ind2.scores, directions=self.directions)

    @staticmethod
    def cmp_operator(s1: _NSGAIndividual, s2: _NSGAIndividual):
        if s1.rank < s2.rank:
            return 1
        elif s1.rank == s2.rank:
            if s1.distance > s2.distance:  # the larger the distance the better
                return 1
            elif s1.distance == s2.distance:
                return 0
            else:
                return -1
        else:
            return -1

    def calc_nondominated_set(self, population: List[_NSGAIndividual]):
        def find_non_dominated_solu(indi):
            if (np.array(indi.scores) == None).any():  # illegal individual for the None scores
                return False
            for indi_ in population:
                if indi_ == indi:
                    continue
                if self.dominate(ind1=indi_, ind2=indi, pop=population):
                    return False
            return True  # this is a pareto optimal

        # find non-dominated solution for every solution
        ns = list(filter(lambda s: find_non_dominated_solu(s), population))

        return ns


class _NSGAIIBasedSearcher(MOOSearcher):
    def __init__(self, space_fn, objectives, survival, recombination, mutate_probability,
                 space_sample_validation_fn, random_state):

        super().__init__(space_fn=space_fn, objectives=objectives, use_meta_learner=False,
                         space_sample_validation_fn=space_sample_validation_fn)

        self.population: List[_NSGAIndividual] = []
        self.random_state = random_state if random_state is not None else get_random_state()

        if recombination is None:
            self.recombination = create_recombination(const.COMBINATION_SINGLE_POINT, random_state=self.random_state)
        else:
            self.recombination = recombination

        self.mutation = SinglePointMutation(self.random_state, mutate_probability)

        self.survival = survival

        self._historical_individuals: List[_NSGAIndividual] = []

    def binary_tournament_select(self, population):
        indi_inx = self.random_state.randint(low=0, high=len(population) - 1, size=2)  # fixme: maybe duplicated inx

        p1 = population[indi_inx[0]]
        p2 = population[indi_inx[1]]

        # select the first parent
        if self.survival.cmp_operator(p1, p2) >= 0:
            first_inx = indi_inx[0]
        else:
            first_inx = indi_inx[1]

        # select the second parent
        indi_inx = self.random_state.randint(low=0, high=len(population) - 1, size=2)
        try_times = 0
        while first_inx in indi_inx:  # exclude the first individual
            if try_times > 10000:
                raise RuntimeError("too many times for selecting individual to mat. ")
            indi_inx = self.random_state.randint(low=0, high=len(population) - 1, size=2)
            try_times = try_times + 1

        if self.survival.cmp_operator(p1, p2) >= 0:
            second_inx = indi_inx[0]
        else:
            second_inx = indi_inx[1]

        return population[first_inx], population[second_inx]

    @property
    def directions(self):
        return [o.direction for o in self.objectives]

    def sample(self, space_options=None):
        if space_options is None:
            space_options = {}

        if len(self.population) < self.survival.population_size:
            return self._sample_and_check(self._random_sample, space_options=space_options)

        # binary tournament selection operation
        p1, p2 = self.binary_tournament_select(self.population)

        if self.recombination.check_parents(p1, p2):
            offspring = self.recombination.do(p1, p2, self.space_fn(**space_options))
            final_offspring = self.mutation.do(offspring, self.space_fn(**space_options))
        else:
            final_offspring = self.mutation.do(p1.dna, self.space_fn(**space_options), proba=1)

        return final_offspring

    def get_best(self):
        return list(map(lambda v: v.dna, self.get_nondominated_set()))

    def update_result(self, space, result):
        indi = _NSGAIndividual(space, result, self.random_state)
        self._historical_individuals.append(indi)  # add to history
        p = self.survival.update(pop=self.population,  challengers=[indi])
        self.population = p

        challengers = [indi]

        if challengers[0] in self.population:
            logger.debug(f"new individual{challengers} is accepted by population, current population {self.population}")
        else:
            logger.debug(f"new individual{challengers[0]} is not accepted by population, "
                         f"current population {self.population}")

    def get_nondominated_set(self):
        population = self.get_historical_population()
        ns = self.survival.calc_nondominated_set(population)
        return ns

    def get_historical_population(self):
        return self._historical_individuals

    def get_population(self) -> List[Individual]:
        return self.population

    def _sub_plot_ranking(self, ax, historical_individuals):
        p_sorted = self.survival.fast_non_dominated_sort(historical_individuals)
        colors = ['c', 'm', 'y', 'r', 'g']
        n_colors = len(colors)
        for i, front in enumerate(p_sorted[: n_colors]):
            scores = np.array([_.scores for _ in front])
            ax.scatter(scores[:, 0], scores[:, 1], color=colors[i], label=f"rank={i + 1}")

        if len(p_sorted) > n_colors:
            others = []
            for front in p_sorted[n_colors:]:
                others.extend(front)
            scores = np.array([_.scores for _ in others])
            ax.scatter(scores[:, 0], scores[:, 1], color='b', label='others')
        ax.set_title(f"individuals(total={len(historical_individuals)}) ranking plot")
        objective_names = [_.name for _ in self.objectives]
        ax.set_xlabel(objective_names[0])
        ax.set_ylabel(objective_names[1])
        ax.legend()

    def _plot_population(self, figsize=(6, 6), **kwargs):
        from matplotlib import pyplot as plt

        figs, axes = plt.subplots(3, 1, figsize=(figsize[0], figsize[0] * 3))
        historical_individuals = self.get_historical_population()

        # 1. ranking plot
        self._sub_plot_ranking(axes[0], historical_individuals)

        # 2. population plot
        self._sub_plot_pop(axes[1], historical_individuals)

        # 3. dominated plot
        self._plot_pareto(axes[2], historical_individuals)

        return figs, axes

    def reset(self):
        pass

    def export(self):
        pass

    def __repr__(self):
        return f"{self.__class__.__name__}(objectives={self.objectives}, " \
               f"recombination={self.recombination}), " \
               f"mutation={self.mutation}), " \
               f"survival={self.survival}), " \
               f"random_state={self.random_state}"


[docs]class NSGAIISearcher(_NSGAIIBasedSearcher): """An implementation of "NSGA-II". Parameters ---------- space_fn: callable, required A search space function which when called returns a `HyperSpace` instance objectives: List[Objective], optional, (default to NumOfFeatures instance) The optimization objectives. recombination: Recombination, required the strategy to recombine DNA of parents to generate offspring. Builtin strategies: - ShuffleCrossOver - UniformCrossover - SinglePointCrossOver mutate_probability: float, optional, default to 0.7 the probability of genetic variation for offspring, when the parents can not recombine, it will definitely mutate a gene for the generated offspring. population_size: int, default to 30 size of population space_sample_validation_fn: callable or None, (default=None) used to verify the validity of samples from the search space, and can be used to add specific constraint rules to the search space to reduce the size of the space. random_state: np.RandomState, optional used to reproduce the search process References ---------- [1] K. Deb, A. Pratap, S. Agarwal and T. Meyarivan, "A fast and elitist multiobjective genetic algorithm: NSGA-II," in IEEE Transactions on Evolutionary Computation, vol. 6, no. 2, pp. 182-197, April 2002, doi: 10.1109/4235.996017. """ def __init__(self, space_fn, objectives, recombination=None, mutate_probability=0.7, population_size=30, space_sample_validation_fn=None, random_state=None): survival = _RankAndCrowdSortSurvival(directions=[o.direction for o in objectives], population_size=population_size, random_state=random_state) super(NSGAIISearcher, self).__init__(space_fn=space_fn, objectives=objectives, survival=survival, recombination=recombination, mutate_probability=mutate_probability, space_sample_validation_fn=space_sample_validation_fn, random_state=random_state)
class _RDominanceSurvival(_RankAndCrowdSortSurvival): def __init__(self, directions, population_size, random_state, ref_point, weights, threshold): super(_RDominanceSurvival, self).__init__(directions, population_size=population_size, random_state=random_state) self.ref_point = ref_point self.weights = weights # enables the DM to control the selection pressure of the r-dominance relation. self.threshold = threshold def dominate(self, ind1: _NSGAIndividual, ind2: _NSGAIndividual, pop: List[_NSGAIndividual], directions=None): # check pareto dominate if pareto_dominate(ind1.scores, ind2.scores, directions=directions): return True if pareto_dominate(ind2.scores, ind1.scores, directions=directions): return False # in case of pareto-equivalent, compare distance scores = np.array([_.scores for _ in pop]) scores_extend = np.max(scores, axis=0) - np.min(scores, axis=0) distances = [] for indi in pop: # Calculate weighted Euclidean distance of two solution. # Note: if ref_point is infeasible value, distance maybe larger than 1 indi.distance = np.sqrt(np.sum(np.square((np.asarray(indi.scores) - self.ref_point) / scores_extend) * self.weights)) distances.append(indi.distance) dist_extent = np.max(distances) - np.min(distances) return (ind1.distance - ind2.distance) / dist_extent < -self.threshold def sort_font(self, front: List[_NSGAIndividual]): return sorted(front, key=lambda v: v.distance, reverse=False) def sort_population(self, population: List[_NSGAIndividual]): return sorted(population, key=cmp_to_key(self.cmp_operator), reverse=True) @staticmethod def cmp_operator(s1: _NSGAIndividual, s2: _NSGAIndividual): if s1.rank < s2.rank: return 1 elif s1.rank == s2.rank: if s1.distance < s2.distance: # the smaller the distance the better return 1 elif s1.distance == s2.distance: return 0 else: return -1 else: return -1 def __repr__(self): return f"{self.__class__.__name__}(ref_point={self.ref_point}, weights={self.weights}, " \ f"threshold={self.threshold}, random_state={self.random_state})"
[docs]class RNSGAIISearcher(_NSGAIIBasedSearcher): """An implementation of R-NSGA-II which is a variant of NSGA-II algorithm. Parameters ---------- space_fn: callable, required A search space function which when called returns a `HyperSpace` instance objectives: List[Objective], optional, (default to NumOfFeatures instance) The optimization objectives. ref_point: Tuple[float], required user-specified reference point, used to guide the search toward the desired region. weights: Tuple[float], optional, default to uniform weights vector, provides more detailed information about what Pareto optimal to converge to. dominance_threshold: float, optional, default to 0.3 distance threshold, in case of pareto-equivalent, compare distance between two solutions. recombination: Recombination, required the strategy to recombine DNA of parents to generate offspring. Builtin strategies: - ShuffleCrossOver - UniformCrossover - SinglePointCrossOver mutate_probability: float, optional, default to 0.7 the probability of genetic variation for offspring, when the parents can not recombine, it will definitely mutate a gene for the generated offspring. population_size: int, default to 30 size of population space_sample_validation_fn: callable or None, (default=None) used to verify the validity of samples from the search space, and can be used to add specific constraint rules to the search space to reduce the size of the space. random_state: np.RandomState, optional used to reproduce the search process References ---------- [1] L. Ben Said, S. Bechikh and K. Ghedira, "The r-Dominance: A New Dominance Relation for Interactive Evolutionary Multicriteria Decision Making," in IEEE Transactions on Evolutionary Computation, vol. 14, no. 5, pp. 801-818, Oct. 2010, doi: 10.1109/TEVC.2010.2041060. """ def __init__(self, space_fn, objectives, ref_point=None, weights=None, dominance_threshold=0.3, recombination=None, mutate_probability=0.7, population_size=30, space_sample_validation_fn=None, random_state=None): """ """ n_objectives = len(objectives) ref_point = ref_point if ref_point is not None else [0.0] * n_objectives weights = weights if weights is not None else [1 / n_objectives] * n_objectives directions = [o.direction for o in objectives] random_state if random_state is not None else get_random_state() survival = _RDominanceSurvival(random_state=random_state, population_size=population_size, ref_point=ref_point, weights=weights, threshold=dominance_threshold, directions=directions) super(RNSGAIISearcher, self).__init__(space_fn=space_fn, objectives=objectives, recombination=recombination, survival=survival, mutate_probability=mutate_probability, space_sample_validation_fn=space_sample_validation_fn, random_state=random_state) def _plot_population(self, figsize=(6, 6), show_ref_point=True, show_weights=False, **kwargs): from matplotlib import pyplot as plt def attach(ax): if show_ref_point: ref_point = self.survival.ref_point ax.scatter([ref_point[0]], [ref_point[1]], c='green', marker="*", label='ref point') if show_weights: weights = self.survival.weights # plot a vector ax.quiver(0, 0, weights[0], weights[1], angles='xy', scale_units='xy', label='weights') figs, axes = plt.subplots(2, 2, figsize=(figsize[0] * 2, figsize[0] * 2)) historical_individuals = self.get_historical_population() # 1. ranking plot ax1 = axes[0][0] self._sub_plot_ranking(ax1, historical_individuals) attach(ax1) # 2. population plot ax2 = axes[0][1] self._sub_plot_pop(ax2, historical_individuals) attach(ax2) # 3. r-dominated plot ax3 = axes[1][0] n_set = self.get_nondominated_set() d_set: List[Individual] = list(filter(lambda v: v not in n_set, historical_individuals)) self._do_plot(n_set, color='red', label='non-dominated', ax=ax3, marker="o") # , marker="o" self._do_plot(d_set, color='blue', label='dominated', ax=ax3, marker="o") ax3.set_title(f"non-dominated solution (total={len(historical_individuals)}) in R-dominance scene") objective_names = [_.name for _ in self.objectives] ax3.set_xlabel(objective_names[0]) ax3.set_ylabel(objective_names[1]) ax3.legend() attach(ax3) # 4. pareto dominated plot ax4 = axes[1][1] self._plot_pareto(ax4, historical_individuals) attach(ax4) return figs, axes