source: framspy/evolalg/base/experiment_niching_abc.py

Last change on this file was 1308, checked in by Maciej Komosinski, 7 months ago

Added the ability to select a dissimilarity measure for crowding distance (for NSGA2 and NSLC) independently from the main dissimilarity measure

File size: 12.7 KB
Line 
1import time
2from abc import ABC, abstractmethod
3
4import numpy as np
5from deap import base, tools
6from deap.tools.emo import assignCrowdingDist
7
8from ..constants import BAD_FITNESS
9from ..structures.individual import Individual
10from .experiment_abc import ExperimentABC
11from .remove_diagonal import remove_diagonal
12from FramsticksLib import DissimMethod  # since the descendant ExperimentFramsNiching class does not introduce any Framsticks-specific dissimilarity methods, all of them must be known here (in ExperimentNiching)
13
14
15class DeapFitness(base.Fitness):
16    weights = (1, 1)
17
18    def __init__(self, *args, **kwargs):
19        super(DeapFitness, self).__init__(*args, **kwargs)
20
21
22class ExperimentNiching(ExperimentABC, ABC):
23    fit: str = "niching"
24    normalize: str = "None"
25    archive_size: int = None
26
27    def __init__(self, fit, normalize, popsize, hof_size, save_only_best, knn_niching, knn_nslc, archive_size, crowding_dissim = None) -> None:
28        ExperimentABC.__init__(self,popsize=popsize, hof_size=hof_size, save_only_best=save_only_best)
29        self.fit = fit
30        self.normalize = normalize
31        self.knn_niching = knn_niching # this parameter is used for local novelty and local niching
32        self.knn_nslc = knn_nslc
33        self.archive_size=archive_size
34        self.crowding_dissim = crowding_dissim
35
36        # np.argpartition requires these parameters to be at most popsize-2; popsize is decreased by 1 because we remove_diagonal()
37        if self.knn_niching > popsize - 2:
38            raise ValueError("knn_niching (%d) should be at most popsize-2 (%d)" % (self.knn_niching, popsize-2))
39        if self.knn_nslc > popsize - 2:
40            raise ValueError("knn_nslc (%d) should be at most popsize-2 (%d)" % (self.knn_nslc, popsize-2))
41
42
43    def transform_indexes(self, i, index_array):
44        return [x+1 if x >= i else x for x in index_array]
45
46    def normalize_dissim(self, dissim_matrix):
47        dissim_matrix = remove_diagonal(np.array(dissim_matrix)) # on the diagonal we usually have zeros (an individual is identical with itself, so the dissimilarity is 0). In some techniques we need to find "k" most similar individuals, so we remove the diagonal so that self-similarity of individuals does not interfere with finding "k" other most similar individuals. The matrix from square n*n turns into n*(n-1).
48        if self.normalize == "none":
49            return dissim_matrix
50        elif self.normalize == "max":
51            divide_by = np.max(dissim_matrix)
52        elif self.normalize == "sum":
53            divide_by = np.sum(dissim_matrix)
54        else:
55            raise ValueError("Wrong normalization method: '%s'" % self.normalize)
56        if divide_by != 0:
57            return dissim_matrix/divide_by
58        else:
59            return dissim_matrix
60
61    def do_niching(self, population_structures):
62        population_archive = population_structures.population + population_structures.archive
63        dissim_matrix = self.dissimilarity(population_archive)
64        if "knn" not in self.fit:
65            dissim_list = np.mean(self.normalize_dissim(dissim_matrix), axis=1)
66        else:
67            dissim_list = np.mean(np.partition(
68                self.normalize_dissim(dissim_matrix), self.knn_niching)[:, :self.knn_niching], axis=1)
69
70        if Individual.fitness_set_negative_to_zero is False and ("niching" in self.fit or "novelty" in self.fit):
71            raise ValueError("Negative fitness values not tested in combination with niching or novelty. When using these techniques, verify formulas or consider using the flag -fitness_set_negative_to_zero") # once the formulas are verified/improved, the command-line flag and this conditional check can be removed.
72
73        if "niching" in self.fit:
74            for i, d in zip(population_archive, dissim_list):
75                i.fitness = i.rawfitness * d
76        elif "novelty" in self.fit:
77            for i, d in zip(population_archive, dissim_list):
78                i.fitness = d
79        else:
80            raise ValueError("Unsupported fit type: '%s'. Use the correct type or implement a new behavior." % self.fit)
81        population_structures.update_archive(dissim_matrix, population_archive)
82
83    def do_nsga2_dissim(self, population):
84        dissim_matrix = self.dissimilarity(population)
85        dissim_list = np.mean(self.normalize_dissim(dissim_matrix), axis=1)
86        for i, d in zip(population, dissim_list):
87            i.fitness = DeapFitness(tuple((d, i.rawfitness)))
88
89    def do_nslc_dissim(self, population_structures, pop_offspring):
90        population_archive = population_structures.archive + pop_offspring
91        dissim_matrix = self.dissimilarity(population_archive)
92        normalized_matrix = self.normalize_dissim(dissim_matrix)
93        for i in range(len(normalized_matrix)):
94            temp_dissim = normalized_matrix[i]
95            index_array = np.argpartition(temp_dissim, kth=self.knn_nslc, axis=-1)[:self.knn_nslc]
96            dissim_value = np.mean(np.take_along_axis(
97                temp_dissim, index_array, axis=-1))
98            temp_fitness = population_archive[i].rawfitness
99            population_of_most_similar = list(
100                map(population_archive.__getitem__, self.transform_indexes(i, index_array)))
101            temp_ind_fit = sum(
102                [1 for ind in population_of_most_similar if ind.rawfitness < temp_fitness])
103            population_archive[i].fitness = DeapFitness(
104                tuple((dissim_value, temp_ind_fit)))
105        population_structures.update_archive(dissim_matrix, population_archive)
106
107    def assignCrowdingDistFramspy(self, individuals):
108        """Assign a crowding distance to each individual's fitness. The
109        crowding distance can be retrieved via the :attr:`crowding_dist`
110        attribute of each individual's fitness.
111        """
112        if len(individuals) == 0:
113            return
114       
115        if self.crowding_dissim is DissimMethod.FITNESS: # if crowding dissim was not specified (DissimMethod.FITNESS is our default) or was set to fitness, use the default DEAP implementation that relies on fitness
116            return assignCrowdingDist(individuals)
117       
118        dissim_matrix = self.crowding_distance_dissimilarity(individuals)
119        assert len(dissim_matrix) == len(individuals), f'Dissimilarity matrix does not match the size of the population {len(dissim_matrix)}:{len(individuals)}'
120       
121        for i in range(len(individuals)):
122            individuals[i].fitness.crowding_dist = np.mean(dissim_matrix[i])
123
124
125    def make_new_population_nsga2(self, population_structures, prob_mut, prob_xov):
126        expected_mut = int(self.popsize * prob_mut)
127        expected_xov = int(self.popsize * prob_xov)
128        assert expected_mut + expected_xov <= self.popsize, "If probabilities of mutation (%g) and crossover (%g) added together exceed 1.0, then the population would grow every generation..." % (prob_mut, prob_xov)
129        self.assignCrowdingDistFramspy(population_structures.population)
130        offspring = tools.selTournamentDCD(population_structures.population, self.popsize)
131
132        def addGenotypeIfValid(ind_list, genotype):
133            new_individual = Individual()
134            new_individual.set_and_evaluate(genotype, self.evaluate)
135            if new_individual.fitness is not BAD_FITNESS:
136                ind_list.append(new_individual)
137
138        counter = 0
139
140        def get_individual(pop, c):
141            if c < len(pop):
142                ind = pop[c]
143                c += 1
144                return ind, c
145            else:
146                c = 0
147                ind = pop[c]
148                c += 1
149                return ind, c
150
151        newpop = []
152        while len(newpop) < expected_mut:
153            ind, counter = get_individual(offspring, counter)
154            addGenotypeIfValid(newpop, self.mutate(ind.genotype))
155
156        # adding valid crossovers of selected individuals...
157        while len(newpop) < expected_mut + expected_xov:
158            ind1, counter = get_individual(offspring, counter)
159            ind2, counter = get_individual(offspring, counter)
160            addGenotypeIfValid(newpop, self.cross_over(ind1.genotype, ind2.genotype))
161
162        # select clones to fill up the new population until we reach the same size as the input population
163        while len(newpop) < len(population_structures.population):
164            ind, counter = get_individual(offspring, counter)
165            newpop.append(Individual().copyFrom(ind))
166
167        pop_offspring = population_structures.population + newpop # used both for nsga2 and nslc
168        # print(len(pop_offspring)) # for debugging
169        if self.fit == "nslc":
170            self.do_nslc_dissim(population_structures, pop_offspring)
171        elif self.fit == "nsga2":
172            self.do_nsga2_dissim(pop_offspring)
173        out_pop = tools.selNSGA2(pop_offspring, len(population_structures.population))
174        return out_pop
175
176    def evolve(self, hof_savefile, generations, initialgenotype, pmut, pxov, tournament_size):
177        file_name = self.get_state_filename(hof_savefile)
178        state = self.load_state(file_name)
179        if state is not None:  # loaded state from file
180            # saved generation has been completed, start with the next one
181            self.current_generation += 1
182            print("...Resuming from saved state: population size = %d, hof size = %d, stats size = %d, archive size = %d, generation = %d/%d" % (len(self.population_structures.population), len(self.hof), len(self.stats), (len(self.population_structures.archive)), self.current_generation, generations))  # self.current_generation (and g) are 0-based, parsed_args.generations is 1-based
183        else:
184            self.initialize_evolution(self.genformat, initialgenotype)
185
186        time0 = time.process_time()
187        for g in range(self.current_generation, generations):
188            if self.fit != "raw" and self.fit != "nsga2" and self.fit != "nslc":
189                self.do_niching(self.population_structures)
190
191            if type(self.population_structures.population[0].fitness) == DeapFitness:
192                self.population_structures.population = self.make_new_population_nsga2(  # used both for nsga2 and nslc
193                    self.population_structures, pmut, pxov)
194            else:
195                self.population_structures.population = self.make_new_population(
196                    self.population_structures.population, pmut, pxov, tournament_size)
197
198            self.update_stats(g, self.population_structures.population)
199
200            if hof_savefile is not None:
201                self.current_generation = g
202                self.time_elapsed += time.process_time() - time0
203                self.save_state(file_name)
204        if hof_savefile is not None:
205            self.save_genotypes(hof_savefile)
206        return self.population_structures.population, self.stats
207
208    @staticmethod
209    def get_args_for_parser():
210        parser = ExperimentABC.get_args_for_parser()
211        parser.add_argument("-dissim", type = lambda arg: DissimMethod[arg], choices = DissimMethod,
212                   default=DissimMethod.PHENE_STRUCT_OPTIM,
213                   help="The type of the dissimilarity measure to be used in novelty and niching diversity control. Available: " + str(DissimMethod._member_names_))
214
215        parser.add_argument("-crowding_dissim", type = lambda arg: DissimMethod[arg], choices = DissimMethod,
216                   default=DissimMethod.FITNESS,
217                   help="The type of the dissimilarity measure to be used as NSGA2 and NSLC crowding distance. Available: " + str(DissimMethod._member_names_))
218        parser.add_argument("-fit",type= str, default="raw",
219                        help="Fitness type, availible types: niching, novelty, knn_niching (local), knn_novelty (local), nsga2, nslc and raw (default)")
220        parser.add_argument("-archive",type= int, default=50, help="Maximum archive size")
221        parser.add_argument("-normalize",type= str, default= "max",
222                            help="What normalization to use for the dissimilarity matrix: max (default}, sum, or none")
223        parser.add_argument("-knn_niching",type= int, default=5,
224                        help="The number of nearest neighbors for local novelty/niching. If knn==0, global is performed. Default: 5")
225        parser.add_argument("-knn_nslc",type= int, default=5,
226                        help="The number of nearest neighbors for NSLC. If knn==0, global is performed. Default: 5")
227        return parser
228       
229    @abstractmethod
230    def dissimilarity(self, population: list):
231        """
232            Used for calculating dissimilarity in novelty and niching methods
233        """
234        pass
235
236    @abstractmethod
237    def crowding_distance_dissimilarity(self, population: list):
238        """
239            Used for calculating dissimilarity for nsga2 crowding distance,
240            currently used in NSGA2 and NSLC
241        """
242        pass
Note: See TracBrowser for help on using the repository browser.