1 | import time |
---|
2 | from abc import ABC, abstractmethod |
---|
3 | |
---|
4 | import numpy as np |
---|
5 | from deap import base, tools |
---|
6 | from deap.tools.emo import assignCrowdingDist |
---|
7 | |
---|
8 | from ..constants import BAD_FITNESS |
---|
9 | from ..structures.individual import Individual |
---|
10 | from .experiment_abc import ExperimentABC |
---|
11 | from .remove_diagonal import remove_diagonal |
---|
12 | from FramsticksLib import DissimMethod # since the descendant ExperimentFramsNiching class does not introduce any Framsticks-specific dissimilarity methods, all of them must be known here (in ExperimentNiching) |
---|
13 | |
---|
14 | |
---|
15 | class DeapFitness(base.Fitness): |
---|
16 | weights = (1, 1) |
---|
17 | |
---|
18 | def __init__(self, *args, **kwargs): |
---|
19 | super(DeapFitness, self).__init__(*args, **kwargs) |
---|
20 | |
---|
21 | |
---|
22 | class ExperimentNiching(ExperimentABC, ABC): |
---|
23 | fit: str = "niching" |
---|
24 | normalize: str = "None" |
---|
25 | archive_size: int = None |
---|
26 | |
---|
27 | def __init__(self, fit, normalize, popsize, hof_size, save_only_best, knn_niching, knn_nslc, archive_size, crowding_dissim = None) -> None: |
---|
28 | ExperimentABC.__init__(self,popsize=popsize, hof_size=hof_size, save_only_best=save_only_best) |
---|
29 | self.fit = fit |
---|
30 | self.normalize = normalize |
---|
31 | self.knn_niching = knn_niching # this parameter is used for local novelty and local niching |
---|
32 | self.knn_nslc = knn_nslc |
---|
33 | self.archive_size=archive_size |
---|
34 | self.crowding_dissim = crowding_dissim |
---|
35 | |
---|
36 | # np.argpartition requires these parameters to be at most popsize-2; popsize is decreased by 1 because we remove_diagonal() |
---|
37 | if self.knn_niching > popsize - 2: |
---|
38 | raise ValueError("knn_niching (%d) should be at most popsize-2 (%d)" % (self.knn_niching, popsize-2)) |
---|
39 | if self.knn_nslc > popsize - 2: |
---|
40 | raise ValueError("knn_nslc (%d) should be at most popsize-2 (%d)" % (self.knn_nslc, popsize-2)) |
---|
41 | |
---|
42 | |
---|
43 | def transform_indexes(self, i, index_array): |
---|
44 | return [x+1 if x >= i else x for x in index_array] |
---|
45 | |
---|
46 | def normalize_dissim(self, dissim_matrix): |
---|
47 | dissim_matrix = remove_diagonal(np.array(dissim_matrix)) # on the diagonal we usually have zeros (an individual is identical with itself, so the dissimilarity is 0). In some techniques we need to find "k" most similar individuals, so we remove the diagonal so that self-similarity of individuals does not interfere with finding "k" other most similar individuals. The matrix from square n*n turns into n*(n-1). |
---|
48 | if self.normalize == "none": |
---|
49 | return dissim_matrix |
---|
50 | elif self.normalize == "max": |
---|
51 | divide_by = np.max(dissim_matrix) |
---|
52 | elif self.normalize == "sum": |
---|
53 | divide_by = np.sum(dissim_matrix) |
---|
54 | else: |
---|
55 | raise ValueError("Wrong normalization method: '%s'" % self.normalize) |
---|
56 | if divide_by != 0: |
---|
57 | return dissim_matrix/divide_by |
---|
58 | else: |
---|
59 | return dissim_matrix |
---|
60 | |
---|
61 | def do_niching(self, population_structures): |
---|
62 | population_archive = population_structures.population + population_structures.archive |
---|
63 | dissim_matrix = self.dissimilarity(population_archive) |
---|
64 | if "knn" not in self.fit: |
---|
65 | dissim_list = np.mean(self.normalize_dissim(dissim_matrix), axis=1) |
---|
66 | else: |
---|
67 | dissim_list = np.mean(np.partition( |
---|
68 | self.normalize_dissim(dissim_matrix), self.knn_niching)[:, :self.knn_niching], axis=1) |
---|
69 | |
---|
70 | if Individual.fitness_set_negative_to_zero is False and ("niching" in self.fit or "novelty" in self.fit): |
---|
71 | raise ValueError("Negative fitness values not tested in combination with niching or novelty. When using these techniques, verify formulas or consider using the flag -fitness_set_negative_to_zero") # once the formulas are verified/improved, the command-line flag and this conditional check can be removed. |
---|
72 | |
---|
73 | if "niching" in self.fit: |
---|
74 | for i, d in zip(population_archive, dissim_list): |
---|
75 | i.fitness = i.rawfitness * d |
---|
76 | elif "novelty" in self.fit: |
---|
77 | for i, d in zip(population_archive, dissim_list): |
---|
78 | i.fitness = d |
---|
79 | else: |
---|
80 | raise ValueError("Unsupported fit type: '%s'. Use the correct type or implement a new behavior." % self.fit) |
---|
81 | population_structures.update_archive(dissim_matrix, population_archive) |
---|
82 | |
---|
83 | def do_nsga2_dissim(self, population): |
---|
84 | dissim_matrix = self.dissimilarity(population) |
---|
85 | dissim_list = np.mean(self.normalize_dissim(dissim_matrix), axis=1) |
---|
86 | for i, d in zip(population, dissim_list): |
---|
87 | i.fitness = DeapFitness(tuple((d, i.rawfitness))) |
---|
88 | |
---|
89 | def do_nslc_dissim(self, population_structures, pop_offspring): |
---|
90 | population_archive = population_structures.archive + pop_offspring |
---|
91 | dissim_matrix = self.dissimilarity(population_archive) |
---|
92 | normalized_matrix = self.normalize_dissim(dissim_matrix) |
---|
93 | for i in range(len(normalized_matrix)): |
---|
94 | temp_dissim = normalized_matrix[i] |
---|
95 | index_array = np.argpartition(temp_dissim, kth=self.knn_nslc, axis=-1)[:self.knn_nslc] |
---|
96 | dissim_value = np.mean(np.take_along_axis( |
---|
97 | temp_dissim, index_array, axis=-1)) |
---|
98 | temp_fitness = population_archive[i].rawfitness |
---|
99 | population_of_most_similar = list( |
---|
100 | map(population_archive.__getitem__, self.transform_indexes(i, index_array))) |
---|
101 | temp_ind_fit = sum( |
---|
102 | [1 for ind in population_of_most_similar if ind.rawfitness < temp_fitness]) |
---|
103 | population_archive[i].fitness = DeapFitness( |
---|
104 | tuple((dissim_value, temp_ind_fit))) |
---|
105 | population_structures.update_archive(dissim_matrix, population_archive) |
---|
106 | |
---|
107 | def assignCrowdingDistFramspy(self, individuals): |
---|
108 | """Assign a crowding distance to each individual's fitness. The |
---|
109 | crowding distance can be retrieved via the :attr:`crowding_dist` |
---|
110 | attribute of each individual's fitness. |
---|
111 | """ |
---|
112 | if len(individuals) == 0: |
---|
113 | return |
---|
114 | |
---|
115 | if self.crowding_dissim is DissimMethod.FITNESS: # if crowding dissim was not specified (DissimMethod.FITNESS is our default) or was set to fitness, use the default DEAP implementation that relies on fitness |
---|
116 | return assignCrowdingDist(individuals) |
---|
117 | |
---|
118 | dissim_matrix = self.crowding_distance_dissimilarity(individuals) |
---|
119 | assert len(dissim_matrix) == len(individuals), f'Dissimilarity matrix does not match the size of the population {len(dissim_matrix)}:{len(individuals)}' |
---|
120 | |
---|
121 | for i in range(len(individuals)): |
---|
122 | individuals[i].fitness.crowding_dist = np.mean(dissim_matrix[i]) |
---|
123 | |
---|
124 | |
---|
125 | def make_new_population_nsga2(self, population_structures, prob_mut, prob_xov): |
---|
126 | expected_mut = int(self.popsize * prob_mut) |
---|
127 | expected_xov = int(self.popsize * prob_xov) |
---|
128 | assert expected_mut + expected_xov <= self.popsize, "If probabilities of mutation (%g) and crossover (%g) added together exceed 1.0, then the population would grow every generation..." % (prob_mut, prob_xov) |
---|
129 | self.assignCrowdingDistFramspy(population_structures.population) |
---|
130 | offspring = tools.selTournamentDCD(population_structures.population, self.popsize) |
---|
131 | |
---|
132 | def addGenotypeIfValid(ind_list, genotype): |
---|
133 | new_individual = Individual() |
---|
134 | new_individual.set_and_evaluate(genotype, self.evaluate) |
---|
135 | if new_individual.fitness is not BAD_FITNESS: |
---|
136 | ind_list.append(new_individual) |
---|
137 | |
---|
138 | counter = 0 |
---|
139 | |
---|
140 | def get_individual(pop, c): |
---|
141 | if c < len(pop): |
---|
142 | ind = pop[c] |
---|
143 | c += 1 |
---|
144 | return ind, c |
---|
145 | else: |
---|
146 | c = 0 |
---|
147 | ind = pop[c] |
---|
148 | c += 1 |
---|
149 | return ind, c |
---|
150 | |
---|
151 | newpop = [] |
---|
152 | while len(newpop) < expected_mut: |
---|
153 | ind, counter = get_individual(offspring, counter) |
---|
154 | addGenotypeIfValid(newpop, self.mutate(ind.genotype)) |
---|
155 | |
---|
156 | # adding valid crossovers of selected individuals... |
---|
157 | while len(newpop) < expected_mut + expected_xov: |
---|
158 | ind1, counter = get_individual(offspring, counter) |
---|
159 | ind2, counter = get_individual(offspring, counter) |
---|
160 | addGenotypeIfValid(newpop, self.cross_over(ind1.genotype, ind2.genotype)) |
---|
161 | |
---|
162 | # select clones to fill up the new population until we reach the same size as the input population |
---|
163 | while len(newpop) < len(population_structures.population): |
---|
164 | ind, counter = get_individual(offspring, counter) |
---|
165 | newpop.append(Individual().copyFrom(ind)) |
---|
166 | |
---|
167 | pop_offspring = population_structures.population + newpop # used both for nsga2 and nslc |
---|
168 | # print(len(pop_offspring)) # for debugging |
---|
169 | if self.fit == "nslc": |
---|
170 | self.do_nslc_dissim(population_structures, pop_offspring) |
---|
171 | elif self.fit == "nsga2": |
---|
172 | self.do_nsga2_dissim(pop_offspring) |
---|
173 | out_pop = tools.selNSGA2(pop_offspring, len(population_structures.population)) |
---|
174 | return out_pop |
---|
175 | |
---|
176 | def evolve(self, hof_savefile, generations, initialgenotype, pmut, pxov, tournament_size): |
---|
177 | file_name = self.get_state_filename(hof_savefile) |
---|
178 | state = self.load_state(file_name) |
---|
179 | if state is not None: # loaded state from file |
---|
180 | # saved generation has been completed, start with the next one |
---|
181 | self.current_generation += 1 |
---|
182 | print("...Resuming from saved state: population size = %d, hof size = %d, stats size = %d, archive size = %d, generation = %d/%d" % (len(self.population_structures.population), len(self.hof), len(self.stats), (len(self.population_structures.archive)), self.current_generation, generations)) # self.current_generation (and g) are 0-based, parsed_args.generations is 1-based |
---|
183 | else: |
---|
184 | self.initialize_evolution(self.genformat, initialgenotype) |
---|
185 | |
---|
186 | time0 = time.process_time() |
---|
187 | for g in range(self.current_generation, generations): |
---|
188 | if self.fit != "raw" and self.fit != "nsga2" and self.fit != "nslc": |
---|
189 | self.do_niching(self.population_structures) |
---|
190 | |
---|
191 | if type(self.population_structures.population[0].fitness) == DeapFitness: |
---|
192 | self.population_structures.population = self.make_new_population_nsga2( # used both for nsga2 and nslc |
---|
193 | self.population_structures, pmut, pxov) |
---|
194 | else: |
---|
195 | self.population_structures.population = self.make_new_population( |
---|
196 | self.population_structures.population, pmut, pxov, tournament_size) |
---|
197 | |
---|
198 | self.update_stats(g, self.population_structures.population) |
---|
199 | |
---|
200 | if hof_savefile is not None: |
---|
201 | self.current_generation = g |
---|
202 | self.time_elapsed += time.process_time() - time0 |
---|
203 | self.save_state(file_name) |
---|
204 | if hof_savefile is not None: |
---|
205 | self.save_genotypes(hof_savefile) |
---|
206 | return self.population_structures.population, self.stats |
---|
207 | |
---|
208 | @staticmethod |
---|
209 | def get_args_for_parser(): |
---|
210 | parser = ExperimentABC.get_args_for_parser() |
---|
211 | parser.add_argument("-dissim", type = lambda arg: DissimMethod[arg], choices = DissimMethod, |
---|
212 | default=DissimMethod.PHENE_STRUCT_OPTIM, |
---|
213 | help="The type of the dissimilarity measure to be used in novelty and niching diversity control. Available: " + str(DissimMethod._member_names_)) |
---|
214 | |
---|
215 | parser.add_argument("-crowding_dissim", type = lambda arg: DissimMethod[arg], choices = DissimMethod, |
---|
216 | default=DissimMethod.FITNESS, |
---|
217 | help="The type of the dissimilarity measure to be used as NSGA2 and NSLC crowding distance. Available: " + str(DissimMethod._member_names_)) |
---|
218 | parser.add_argument("-fit",type= str, default="raw", |
---|
219 | help="Fitness type, availible types: niching, novelty, knn_niching (local), knn_novelty (local), nsga2, nslc and raw (default)") |
---|
220 | parser.add_argument("-archive",type= int, default=50, help="Maximum archive size") |
---|
221 | parser.add_argument("-normalize",type= str, default= "max", |
---|
222 | help="What normalization to use for the dissimilarity matrix: max (default}, sum, or none") |
---|
223 | parser.add_argument("-knn_niching",type= int, default=5, |
---|
224 | help="The number of nearest neighbors for local novelty/niching. If knn==0, global is performed. Default: 5") |
---|
225 | parser.add_argument("-knn_nslc",type= int, default=5, |
---|
226 | help="The number of nearest neighbors for NSLC. If knn==0, global is performed. Default: 5") |
---|
227 | return parser |
---|
228 | |
---|
229 | @abstractmethod |
---|
230 | def dissimilarity(self, population: list): |
---|
231 | """ |
---|
232 | Used for calculating dissimilarity in novelty and niching methods |
---|
233 | """ |
---|
234 | pass |
---|
235 | |
---|
236 | @abstractmethod |
---|
237 | def crowding_distance_dissimilarity(self, population: list): |
---|
238 | """ |
---|
239 | Used for calculating dissimilarity for nsga2 crowding distance, |
---|
240 | currently used in NSGA2 and NSLC |
---|
241 | """ |
---|
242 | pass |
---|