1 | from subprocess import Popen, PIPE, check_output
2 | from enum import Enum
3 | from typing import List
4 | import json
5 | import sys, os
6 | import argparse
7 | import numpy as np
8 |
9 |
10 | class FramsticksCLI:
11 | """Runs Framsticks CLI (command-line) executable and communicates with it using standard input and output.
12 | You can perform basic operations like mutation, crossover, and evaluation of genotypes.
13 | This way you can perform evolution controlled by python, access and manipulate genotypes.
14 |
15 | You need to provide one or two parameters when you run this class: the path to Framsticks CLI
16 | and the name of the Framsticks CLI executable (if it is non-standard). See::
17 | FramsticksCLI.py -h"""
18 |
19 | PRINT_FRAMSTICKS_OUTPUT: bool = False # set to True for debugging
20 |
22 | OUTPUT_DIR = "scripts_output"
23 | STDOUT_ENDOPER_MARKER = "FileObject.write" # we look for this message on Framsticks CLI stdout to detect when Framsticks created a file with the result we expect
24 |
25 | FILE_PREFIX = 'framspy_'
26 |
27 | RANDOMIZE_CMD = "rnd" + "\n"
28 | SETEXPEDEF_CMD = "expdef standard-eval" + "\n"
29 | GETSIMPLEST_CMD = "getsimplest"
30 | GETSIMPLEST_FILE = FILE_PREFIX + "simplest.gen"
31 | EVALUATE_CMD = "evaluate eval-allcriteria.sim "
32 | EVALUATE_FILE = "genos_eval.json"
33 | CROSSOVER_CMD = "crossover"
34 | CROSSOVER_FILE = FILE_PREFIX + "child.gen"
35 | DISSIMIL_CMD = "dissimil"
36 | DISSIMIL_FILE = FILE_PREFIX + "dissimilarity_matrix.gen"
37 | ISVALID_CMD = "isvalid"
38 | ISVALID_FILE = FILE_PREFIX + "validity.gen"
39 | MUTATE_CMD = "mutate"
40 | MUTATE_FILE = FILE_PREFIX + "mutant.gen"
41 |
42 | CLI_INPUT_FILE = FILE_PREFIX + "genotypes.gen"
43 |
44 |
45 | def __init__(self, framspath, framsexe):
46 | self.frams_path = framspath
47 | self.frams_exe = framsexe if framsexe is not None else 'frams.exe' if os.name == "nt" else 'frams.linux'
48 | self.writing_path = None
49 | mainpath = os.path.join(self.frams_path, self.frams_exe)
50 | exe_call = [mainpath, '-Q', '-s', '-c', '-icliutils.ini'] # -c will be ignored in Windows Framsticks (this option is meaningless because the Windows version does not support color console, so no need to deactivate this feature using -c)
51 | exe_call_to_get_version = [mainpath, '-V']
52 | exe_call_to_get_path = [mainpath, '-?']
53 | try:
54 | print("\n".join(self.__readAllOutput(" ".join(exe_call_to_get_version))))
55 | help = self.__readAllOutput(" ".join(exe_call_to_get_path))
56 | for helpline in help:
57 | if 'dDIRECTORY' in helpline:
58 | self.writing_path = helpline.split("'")[1]
59 | except FileNotFoundError:
60 | print("Could not find Framsticks executable ('%s') in the given location ('%s')." % (self.frams_exe, self.frams_path))
61 | sys.exit(1)
62 | print("Temporary files with results will be saved in detected writable working directory '%s'" % self.writing_path)
63 | self.__spawnFramsticksCLI(exe_call)
64 |
65 |
66 | def __readAllOutput(self, command):
67 | frams_process = Popen(command, stdout=PIPE, stderr=PIPE, stdin=PIPE)
68 | return [line.decode('utf-8').rstrip() for line in iter(frams_process.stdout.readlines())]
69 |
70 |
71 | def __spawnFramsticksCLI(self, args):
72 | # the child app (Framsticks CLI) should not buffer outputs and we need to immediately read its stdout, hence we use pexpect/wexpect
73 | print('Spawning Framsticks CLI for continuous stdin/stdout communication... ', end='')
74 | if os.name == "nt": # Windows:
75 | import wexpect # https://pypi.org/project/wexpect/
76 | # https://github.com/raczben/wexpect/tree/master/examples
77 | self.child = wexpect.spawn(' '.join(args))
78 | else:
79 | import pexpect # https://pexpect.readthedocs.io/en/stable/
80 | self.child = pexpect.spawn(' '.join(args))
81 | self.child.setecho(False) # linux only
82 | print('OK.')
83 |
84 | self.__readFromFramsCLIUntil("UserScripts.autoload")
85 | print('Performing a basic test 1/3... ', end='')
86 | assert self.getSimplest("1") == "X"
87 | print('OK.')
88 | print('Performing a basic test 2/3... ', end='')
89 | assert self.isValid("X[0:0]") == True
90 | print('OK.')
91 | print('Performing a basic test 3/3... ', end='')
92 | assert self.isValid("X[0:0],") == False
93 | print('OK.')
94 | self.child.sendline(self.RANDOMIZE_CMD)
95 | self.child.sendline(self.SETEXPEDEF_CMD)
96 |
97 |
98 | def closeFramsticksCLI(self):
99 | # End gracefully by sending end-of-file character: ^Z or ^D
100 | # Without -Q argument ("quiet mode"), Framsticks CLI would print "Shell closed." for goodbye.
101 | self.child.sendline(chr(26 if os.name == "nt" else 4))
102 |
103 |
104 | def __saveGenotypeToFile(self, genotype, name, mode):
105 | outpath = os.path.join(self.writing_path, name)
106 | outfile = open(outpath, mode)
107 | outfile.write("org:\n")
108 | outfile.write("genotype:~\n")
109 | outfile.write(genotype + "~\n\n") # TODO proper quoting of genotype...
110 | outfile.close()
111 | return name
112 |
113 |
114 | def __saveToFile(self, genotype, name, mode):
115 | outpath = os.path.join(self.writing_path, name)
116 | outfile = open(outpath, mode)
117 | outfile.write(genotype)
118 | outfile.close()
119 | return name
120 |
121 |
122 | def __removeFile(self, path):
123 | filepath = os.path.join(self.writing_path, path)
124 | if os.path.exists(filepath):
125 | os.remove(filepath)
126 |
127 |
128 | def __readFromFramsCLIUntil(self, until_marker: str):
129 | while True:
130 | self.child.expect('\n')
131 | msg = str(self.child.before)
132 | if self.PRINT_FRAMSTICKS_OUTPUT or msg.startswith("[ERROR]"):
133 | print(msg)
134 | if until_marker in msg:
135 | break
136 |
137 |
138 | def __runCommand(self, command, genotypes, result_file_name, saveformat) -> List[str]:
139 | filenames = [] # list of file names with input data for the command
140 | if saveformat == self.GENO_SAVE_FILE_FORMAT["RAWGENO"]:
141 | for i in range(len(genotypes)):
142 | filenames.append(self.__saveToFile(genotypes[i], "genotype" + str(i) + ".gen", "w")) # plain text format = must have a separate file for each genotype
143 | elif saveformat == self.GENO_SAVE_FILE_FORMAT["NATIVEFRAMS"]:
144 | self.__removeFile(self.CLI_INPUT_FILE) # ensure there is nothing left from the last run of the program because we "a"ppend to file in the loop below
145 | for i in range(len(genotypes)):
146 | outfilename = self.__saveGenotypeToFile(genotypes[i], self.CLI_INPUT_FILE, "a")
147 | filenames.append(outfilename) # since we use the same file in the loop above, add this file only once (i.e., outside of the loop)
148 |
149 | if result_file_name != self.EVALUATE_FILE: # all functions except for evaluate provide frams with the file name to write to
150 | self.child.sendline(command + " " + " ".join(filenames) + " " + result_file_name + "\n")
151 | else:
152 | self.child.sendline(command + " " + " ".join(filenames) + "\n")
153 | self.__readFromFramsCLIUntil(self.STDOUT_ENDOPER_MARKER)
154 | filenames.append(os.path.join(self.writing_path, self.OUTPUT_DIR, result_file_name))
155 | return filenames # last element is a path to the file containing results
156 |
157 |
158 | def __cleanUpCommandResults(self, filepaths):
159 | """Deletes files with results created by the command."""
160 | for i in filepaths:
161 | if i == filepaths[-1]:
162 | os.remove(i) # the result is written with its full path and we have used it before so the file surely exists
163 | else:
164 | self.__removeFile(i)
165 |
166 |
167 | def getSimplest(self, genetic_format) -> str:
168 | assert len(genetic_format) == 1, "Genetic format should be a single character"
169 | files = self.__runCommand(self.GETSIMPLEST_CMD + " " + genetic_format + " ", [], self.GETSIMPLEST_FILE, self.GENO_SAVE_FILE_FORMAT["RAWGENO"])
170 | with open(files[-1]) as f:
171 | genotype = "".join(f.readlines())
172 | self.__cleanUpCommandResults(files)
173 | return genotype
174 |
175 |
176 | def evaluate(self, genotype: str):
177 | """
178 | Returns:
179 | Dictionary -- genotype evaluated with self.EVALUATE_COMMAND. Note that for whatever reason (e.g. incorrect genotype),
180 | the dictionary you get may be empty or partially empty and may not have the fields you expected, so handle this case
181 | properly.
182 | """
183 | files = self.__runCommand(self.EVALUATE_CMD, [genotype], self.EVALUATE_FILE, self.GENO_SAVE_FILE_FORMAT["NATIVEFRAMS"])
184 | with open(files[-1]) as f:
185 | data = json.load(f)
186 | if len(data) > 0:
187 | self.__cleanUpCommandResults(files)
188 | return data
189 | else:
190 | print("Evaluating genotype: no performance data was returned in", self.EVALUATE_FILE) # we do not delete files here
191 | return None
192 |
193 |
194 | def mutate(self, genotype: str) -> str:
195 | files = self.__runCommand(self.MUTATE_CMD, [genotype], self.MUTATE_FILE, self.GENO_SAVE_FILE_FORMAT["RAWGENO"])
196 | with open(files[-1]) as f:
197 | newgenotype = "".join(f.readlines())
198 | self.__cleanUpCommandResults(files)
199 | return newgenotype
200 |
201 |
202 | def crossOver(self, genotype1: str, genotype2: str) -> str:
203 | files = self.__runCommand(self.CROSSOVER_CMD, [genotype1, genotype2], self.CROSSOVER_FILE, self.GENO_SAVE_FILE_FORMAT["RAWGENO"])
204 | with open(files[-1]) as f:
205 | child_genotype = "".join(f.readlines())
206 | self.__cleanUpCommandResults(files)
207 | return child_genotype
208 |
209 |
210 | def dissimilarity(self, genotype1: str, genotype2: str) -> float:
211 | files = self.__runCommand(self.DISSIMIL_CMD, [genotype1, genotype2], self.DISSIMIL_FILE, self.GENO_SAVE_FILE_FORMAT["NATIVEFRAMS"])
212 | with open(files[-1]) as f:
213 | dissimilarity_matrix = np.genfromtxt(f, dtype=np.float64, comments='#', encoding=None, delimiter='\t')
214 | # we would like to skip column #1 while reading and read everything else, but... https://stackoverflow.com/questions/36091686/exclude-columns-from-genfromtxt-with-numpy
215 | # too complicated, so strings in column #1 become NaN as floats (unless they
216 | # accidentally are numbers?) - not great, not terrible
217 | EXPECTED_SHAPE = (2, 4)
218 | assert dissimilarity_matrix.shape == EXPECTED_SHAPE, f"Not a correct dissimilarity matrix, expected {EXPECTED_SHAPE} "
219 | for i in range(len(dissimilarity_matrix)):
220 | assert dissimilarity_matrix[i][i + 2] == 0, "Not a correct dissimilarity matrix, diagonal expected to be 0"
221 | assert dissimilarity_matrix[0][3] == dissimilarity_matrix[1][2], "Probably not a correct dissimilarity matrix, expecting symmetry, verify this"
222 | self.__cleanUpCommandResults(files)
223 | return dissimilarity_matrix[0][3]
224 |
225 |
226 | def isValid(self, genotype: str) -> bool:
227 | files = self.__runCommand(self.ISVALID_CMD, [genotype], self.ISVALID_FILE, self.GENO_SAVE_FILE_FORMAT["RAWGENO"])
228 | with open(files[-1]) as f:
229 | valid = f.readline() == "1"
230 | self.__cleanUpCommandResults(files)
231 | return valid
232 |
233 |
234 | def parseArguments():
235 | parser = argparse.ArgumentParser(description='Run this program with "python -u %s" if you want to disable buffering of its output.' % sys.argv[0])
236 | parser.add_argument('-path', type=ensureDir, required=True, help='Path to Framsticks CLI without trailing slash.')
237 | parser.add_argument('-exe', required=False, help='Executable name. If not given, "frams.exe" or "frams.linux" is assumed.')
238 | parser.add_argument('-genformat', required=False, help='Genetic format for the demo run, for example 4, 9, or B. If not given, f1 is assumed.')
239 | return parser.parse_args()
240 |
241 |
242 | def ensureDir(string):
243 | if os.path.isdir(string):
244 | return string
245 | else:
246 | raise NotADirectoryError(string)
247 |
248 |
249 | if __name__ == "__main__":
250 | # A demo run.
251 | parsed_args = parseArguments()
252 | framsCLI = FramsticksCLI(parsed_args.path, parsed_args.exe)
253 |
254 | simplest = framsCLI.getSimplest('1' if parsed_args.genformat is None else parsed_args.genformat)
255 | print("\tSimplest genotype:", simplest)
256 | parent1 = framsCLI.mutate(simplest)
257 | parent2 = parent1
258 | MUTATE_COUNT = 10
259 | for x in range(MUTATE_COUNT): # example of a chain of 20 mutations
260 | parent2 = framsCLI.mutate(parent2)
261 | print("\tParent1 (mutated simplest):", parent1)
262 | print("\tParent2 (Parent1 mutated %d times):" % MUTATE_COUNT, parent2)
263 | offspring = framsCLI.crossOver(parent1, parent2)
264 | print("\tCrossover (Offspring):", offspring)
265 | print('\tDissimilarity of Parent1 and Offspring:', framsCLI.dissimilarity(offspring, parent1))
266 | print('\tPerformance of Offspring:', framsCLI.evaluate(offspring))
267 | print('\tValidity of Offspring:', framsCLI.isValid(offspring))
268 |
269 | framsCLI.closeFramsticksCLI()