source: framspy/evolalg/base/experiment_niching_abc.py @ 1271

Last change on this file since 1271 was 1271, checked in by Maciej Komosinski, 16 months ago

Improved boundary checks, descriptions and comments

File size: 10.0 KB
Line 
1import time
2from abc import ABC, abstractmethod
3
4import numpy as np
5from deap import base, tools
6from deap.tools.emo import assignCrowdingDist
7
8from ..constants import BAD_FITNESS
9from ..structures.individual import Individual
10from .experiment_abc import ExperimentABC
11from .remove_diagonal import remove_diagonal
12
13
14class DeapFitness(base.Fitness):
15    weights = (1, 1)
16
17    def __init__(self, *args, **kwargs):
18        super(DeapFitness, self).__init__(*args, **kwargs)
19
20
21class ExperimentNiching(ExperimentABC, ABC):
22    fit: str = "niching"
23    normalize: str = "None"
24    archive_size: int = None
25
26    def __init__(self, fit, normalize, popsize, hof_size, save_only_best=True, knn_niching=5, knn_nslc=10, archive_size=0) -> None:
27        ExperimentABC.__init__(self,popsize=popsize, hof_size=hof_size, save_only_best=save_only_best)
28        self.fit = fit
29        self.normalize = normalize
30        self.knn_niching = knn_niching # this parameter is used for local novelty and local niching
31        self.knn_nslc = knn_nslc
32        self.archive_size=archive_size
33
34        # np.argpartition requires these parameters to be at most popsize-2; popsize is decreased by 1 because we remove_diagonal()
35        if self.knn_niching > popsize - 2:
36            raise ValueError("knn_niching (%d) should be at most popsize-2 (%d)" % (self.knn_niching, popsize-2))
37        if self.knn_nslc > popsize - 2:
38            raise ValueError("knn_nslc (%d) should be at most popsize-2 (%d)" % (self.knn_nslc, popsize-2))
39
40
41    def transform_indexes(self, i, index_array):
42        return [x+1 if x >= i else x for x in index_array]
43
44    def normalize_dissim(self, dissim_matrix):
45        dissim_matrix = remove_diagonal(np.array(dissim_matrix)) # on the diagonal we usually have zeros (an individual is identical with itself, so the dissimilarity is 0). In some techniques we need to find "k" most similar individuals, so we remove the diagonal so that self-similarity of individuals does not interfere with finding "k" other most similar individuals. The matrix from square n*n turns into n*(n-1).
46        if self.normalize == "none":
47            return dissim_matrix
48        elif self.normalize == "max":
49            divide_by = np.max(dissim_matrix)
50        elif self.normalize == "sum":
51            divide_by = np.sum(dissim_matrix)
52        else:
53            raise Exception(f"Wrong normalization method,", self.normalize)
54        if divide_by != 0:
55            return dissim_matrix/divide_by
56        else:
57            return dissim_matrix
58
59    def do_niching(self, population_structures):
60        population_archive = population_structures.population + population_structures.archive
61        dissim_matrix = self.dissimilarity(population_archive)
62        if "knn" not in self.fit:
63            dissim_list = np.mean(self.normalize_dissim(dissim_matrix), axis=1)
64        else:
65            dissim_list = np.mean(np.partition(
66                self.normalize_dissim(dissim_matrix), self.knn_niching)[:, :self.knn_niching], axis=1)
67
68        if "niching" in self.fit:
69            for i, d in zip(population_archive, dissim_list):
70                i.fitness = i.rawfitness * d
71        elif "novelty" in self.fit:
72            for i, d in zip(population_archive, dissim_list):
73                i.fitness = d
74        else:
75            raise Exception("Wrong fit type: ", self.fit,
76                            f" choose the correct one or implement a new behavior.")
77        population_structures.update_archive(dissim_matrix, population_archive)
78
79    def do_nsga2_dissim(self, population):
80        dissim_matrix = self.dissimilarity(population)
81        dissim_list = np.mean(self.normalize_dissim(dissim_matrix), axis=1)
82        for i, d in zip(population, dissim_list):
83            i.fitness = DeapFitness(tuple((d, i.rawfitness)))
84
85    def do_nslc_dissim(self, population):
86        dissim_matrix = self.dissimilarity(population)
87        normalized_matrix = self.normalize_dissim(dissim_matrix)
88        for i in range(len(normalized_matrix)):
89            temp_dissim = normalized_matrix[i]
90            index_array = np.argpartition(temp_dissim, kth=self.knn_nslc, axis=-1)[:self.knn_nslc]
91            dissim_value = np.mean(np.take_along_axis(
92                temp_dissim, index_array, axis=-1))
93            temp_fitness = population[i].rawfitness
94            population_of_most_similar = list(
95                map(population.__getitem__, self.transform_indexes(i, index_array)))
96            temp_ind_fit = sum(
97                [1 for ind in population_of_most_similar if ind.rawfitness < temp_fitness])
98            population[i].fitness = DeapFitness(
99                tuple((dissim_value, temp_ind_fit)))
100
101    def make_new_population_nsga2(self, population, prob_mut, prob_xov):
102        expected_mut = int(self.popsize * prob_mut)
103        expected_xov = int(self.popsize * prob_xov)
104        assert expected_mut + expected_xov <= self.popsize, "If probabilities of mutation (%g) and crossover (%g) added together exceed 1.0, then the population would grow every generation..." % (prob_mut, prob_xov)
105        assignCrowdingDist(population)
106        offspring = tools.selTournamentDCD(population, self.popsize)
107
108        def addGenotypeIfValid(ind_list, genotype):
109            new_individual = Individual()
110            new_individual.set_and_evaluate(genotype, self.evaluate)
111            if new_individual.fitness is not BAD_FITNESS:
112                ind_list.append(new_individual)
113
114        counter = 0
115
116        def get_individual(pop, c):
117            if c < len(pop):
118                ind = pop[c]
119                c += 1
120                return ind, c
121            else:
122                c = 0
123                ind = pop[c]
124                c += 1
125                return ind, c
126
127        newpop = []
128        while len(newpop) < expected_mut:
129            ind, counter = get_individual(offspring, counter)
130            addGenotypeIfValid(newpop, self.mutate(ind.genotype))
131
132        # adding valid crossovers of selected individuals...
133        while len(newpop) < expected_mut + expected_xov:
134            ind1, counter = get_individual(offspring, counter)
135            ind2, counter = get_individual(offspring, counter)
136            addGenotypeIfValid(newpop, self.cross_over(ind1.genotype, ind2.genotype))
137
138        # select clones to fill up the new population until we reach the same size as the input population
139        while len(newpop) < len(population):
140            ind, counter = get_individual(offspring, counter)
141            newpop.append(Individual().copyFrom(ind))
142
143        pop_offspring = population+newpop
144        print(len(pop_offspring))
145        if self.fit == "nslc":
146            self.do_nslc_dissim(pop_offspring)
147        elif self.fit == "nsga2":
148            self.do_nsga2_dissim(pop_offspring)
149        out_pop = tools.selNSGA2(pop_offspring, len(population))
150        return out_pop
151
152    def evolve(self, hof_savefile, generations, initialgenotype, pmut, pxov, tournament_size):
153        file_name = self.get_state_filename(hof_savefile)
154        state = self.load_state(file_name)
155        if state is not None:  # loaded state from file
156            # saved generation has been completed, start with the next one
157            self.current_generation += 1
158            print("...Resuming from saved state: population size = %d, hof size = %d, stats size = %d, archive size = %d, generation = %d/%d" % (len(self.population_structures.population), len(self.hof),
159                                                                                                                                                 len(self.stats),  (len(self.population_structures.archive)), self.current_generation, generations))  # self.current_generation (and g) are 0-based, parsed_args.generations is 1-based
160        else:
161            self.initialize_evolution(self.genformat, initialgenotype)
162
163        time0 = time.process_time()
164        for g in range(self.current_generation, generations):
165            if self.fit != "raw" and self.fit != "nsga2" and self.fit != "nslc":
166                self.do_niching(self.population_structures)
167
168            if type(self.population_structures.population[0].fitness) == DeapFitness:
169                self.population_structures.population = self.make_new_population_nsga2(  # used both for nsga2 and nslc
170                    self.population_structures.population, pmut, pxov)
171            else:
172                self.population_structures.population = self.make_new_population(
173                    self.population_structures.population, pmut, pxov, tournament_size)
174
175            self.update_stats(g, self.population_structures.population)
176
177            if hof_savefile is not None:
178                self.current_generation = g
179                self.time_elapsed += time.process_time() - time0
180                self.save_state(file_name)
181        if hof_savefile is not None:
182            self.save_genotypes(hof_savefile)
183        return self.population_structures.population, self.stats
184
185    @staticmethod
186    def get_args_for_parser():
187        parser = ExperimentABC.get_args_for_parser()
188        parser.add_argument("-dissim",type= int, default=1,
189                   help="Dissimilarity measure type. Available: -3:freq, -2:dens, -1:Leven, 1:frams-struct (default}, 2:frams-descr")
190        parser.add_argument("-fit",type= str, default="raw",
191                        help="Fitness type, availible types: niching, novelty, knn_niching (local), knn_novelty (local), nsga2, nslc and raw (default)")
192        parser.add_argument("-archive",type= int, default=50,
193                            help="Maximum archive size")
194        parser.add_argument("-normalize",type= str, default= "max",
195                            help="What normalization to use for the dissimilarity matrix: max (default}, sum, or none")
196        parser.add_argument("-knn_niching",type= int, default=5,
197                        help="The number of nearest neighbors for local novelty/niching. If knn==0, global is performed. Default: 5")
198        parser.add_argument("-knn_nslc",type= int, default=5,
199                        help="The number of nearest neighbors for NSLC. If knn==0, global is performed. Default: 5")
200        return parser
201       
202    @abstractmethod
203    def dissimilarity(self, population: list):
204        pass
Note: See TracBrowser for help on using the repository browser.