Changeset 103 for java/main/src/main/java/com/framsticks/experiment
- Timestamp:
- 07/18/13 23:52:25 (11 years ago)
- Location:
- java/main/src/main/java/com/framsticks/experiment
- Files:
-
- 2 added
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
java/main/src/main/java/com/framsticks/experiment/Experiment.java
r102 r103 45 45 protected String expdef; 46 46 47 47 48 /** 48 49 * … … 53 54 54 55 Dispatching.dispatchLog(this, log, Level.DEBUG, "first task"); 55 56 57 simulators.addListener(new EventListener<ListChange>() {58 @Override59 public void action(ListChange argument) {60 if (argument.getAction() == ListChange.Action.Add) {61 simulators.fireChildrenChange(argument, ListChange.Action.Modify, "ready");62 }63 }64 });65 66 56 } 67 57 … … 114 104 this.expdef = expdef; 115 105 } 106 116 107 117 108 @ParamAnnotation(id = "simulators_changed") … … 129 120 simulators.add(simulator); 130 121 simulatorAsJoinables.add(simulator); 131 132 133 134 122 simulators.fireChildrenChange(simulator, ListChange.Action.Modify, "ready"); 135 123 136 124 } -
java/main/src/main/java/com/framsticks/experiment/NetLoadSaveLogic.java
r102 r103 5 5 import org.apache.logging.log4j.LogManager; 6 6 7 import com.framsticks.communication.File;8 7 import com.framsticks.core.ListChange; 8 import com.framsticks.core.Message; 9 9 import com.framsticks.params.EventListener; 10 import com.framsticks.params.MessageLogger; 10 11 import com.framsticks.params.annotations.FramsClassAnnotation; 11 12 import com.framsticks.params.annotations.ParamAnnotation; … … 14 15 15 16 @FramsClassAnnotation 16 public abstract class NetLoadSaveLogic extends AbstractExperimentLogic {17 public abstract class NetLoadSaveLogic<NF extends NetFile> extends AbstractExperimentLogic { 17 18 private static final Logger log = LogManager.getLogger(NetLoadSaveLogic.class); 18 19 20 protected String option = "an option"; 19 21 20 protected String option = "an option"; 22 protected final Class<NF> netJavaClass; 23 24 protected final MessageLogger messages = new MessageLogger(NetLoadSaveLogic.class); 21 25 22 26 /** 23 27 * @param experiment 24 28 */ 25 public NetLoadSaveLogic(Experiment parentExperiment ) {29 public NetLoadSaveLogic(Experiment parentExperiment, Class<NF> netJavaClassArg) { 26 30 super(parentExperiment); 31 this.netJavaClass = netJavaClassArg; 27 32 28 33 experiment.addSimulatorsListener(new EventListener<ListChange>() { 29 34 30 35 @Override 31 public void action( ListChange argument) {36 public void action(final ListChange change) { 32 37 assert experiment.isActive(); 38 final Simulator simulator = experiment.getSimulators().get(change.getIdentifier()); 39 log.debug("processing list change: {}", change); 33 40 34 if (argument.hasHint("ready")) { 35 final Simulator simulator = experiment.getSimulators().get(argument.getIdentifier()); 36 log.debug("simulator is ready: {}", simulator); 41 if (change.getAction() == ListChange.Action.Add) { 42 log.debug("registering in {}", simulator); 43 simulator.getRemoteTree().getRegistry().registerAndBuild(netJavaClass); 44 } 37 45 38 netload(simulator, new FutureHandler<File>(simulator) { 46 if (!change.hasHint("stoped")) { 47 issueNetloadIfReady(change, simulator); 48 return; 49 } 39 50 40 @Override 41 protected void result(final File file) { 42 simulator.uploadNet(file, new FutureHandler<Object>(this) { 51 log.debug("issuing netsave to: {}", simulator); 52 simulator.netsave(netJavaClass, new FutureHandler<NF>(simulator) { 43 53 44 @Override 45 protected void result(Object result) { 46 log.debug("netload of {} done", file); 47 } 48 }); 49 } 50 }); 51 } 54 @Override 55 protected void result(NF net) { 56 log.debug("netsave of {} done: {}", simulator, net.getShortDescription()); 57 netsave(simulator, net); 58 issueNetloadIfReady(change, simulator); 59 } 60 }); 52 61 } 53 62 }); 54 63 } 55 64 56 public abstract void netload(Simulator simulator, Future<File> net); 65 protected void issueNetloadIfReady(ListChange change, final Simulator simulator) { 66 if (!change.hasHint("ready")) { 67 return; 68 } 69 log.debug("issuing netload to: {}", simulator); 70 netload(simulator, new FutureHandler<NF>(simulator) { 57 71 58 public abstract void netsave(Simulator simulator, File net); 72 @Override 73 protected void result(final NF net) { 74 if (net == null) { 75 log.debug("no file for upload provided - leaving simulator idle"); 76 return; 77 } 78 79 simulator.netload(net, new FutureHandler<Object>(this) { 80 81 @Override 82 protected void result(Object result) { 83 NetLoadSaveLogic.this.messages.info("netload", "done " + net.getShortDescription()); 84 log.debug("netload of {} done", net.getShortDescription()); 85 simulator.start(); 86 } 87 }); 88 } 89 }); 90 } 91 92 public abstract void netload(Simulator simulator, Future<NF> net); 93 94 public abstract void netsave(Simulator simulator, NF net); 59 95 60 96 /** … … 74 110 } 75 111 112 /** 113 * @return the netJavaClass 114 */ 115 @ParamAnnotation(name = "Java class representing netfile") 116 public String getNetJavaClassName() { 117 return netJavaClass.getCanonicalName(); 118 } 119 120 @ParamAnnotation(id = "messages") 121 public void addMessageListener(EventListener<Message> listener) { 122 messages.add(listener); 123 } 124 125 @ParamAnnotation(id = "messages") 126 public void removeMessageListener(EventListener<Message> listener) { 127 messages.remove(listener); 128 } 129 76 130 } -
java/main/src/main/java/com/framsticks/experiment/Simulator.java
r102 r103 4 4 import com.framsticks.communication.queries.NeedFile; 5 5 import com.framsticks.communication.queries.NeedFileAcceptor; 6 import com.framsticks.core.ListChange; 6 7 import com.framsticks.core.Path; 7 8 import com.framsticks.core.Tree; 8 9 import com.framsticks.core.ValueChange; 10 import com.framsticks.params.AccessOperations; 11 import com.framsticks.params.CastFailure; 9 12 import com.framsticks.params.EventListener; 10 13 import com.framsticks.params.FramsClass; … … 12 15 import com.framsticks.params.annotations.FramsClassAnnotation; 13 16 import com.framsticks.params.annotations.ParamAnnotation; 17 import com.framsticks.params.types.BooleanParam; 14 18 import com.framsticks.params.types.EventParam; 15 19 import com.framsticks.params.types.ProcedureParam; … … 59 63 assert experiment.isActive(); 60 64 61 log. debug("simulator ready {}", this);65 log.info("simulator ready {}", this); 62 66 63 67 runningListener = new EventListener<ValueChange>() { 64 68 @Override 65 69 public void action(ValueChange argument) { 66 log.debug("running state of {} changed: {}", this, argument); 70 try { 71 boolean running = simulatorClass.getParamEntry("running", BooleanParam.class).reassign(argument.value, null).getValue(); 72 log.debug("running state of {} changed: {}", Simulator.this, running); 73 if (!running) { 74 Simulator.this.experiment.simulators.fireChildrenChange(Simulator.this, ListChange.Action.Modify, "ready", "stoped"); 75 } 76 } catch (CastFailure e) { 77 log.error("failure: ", e); 78 } 67 79 } 68 80 }; … … 147 159 @ParamAnnotation(paramType = ProcedureParam.class) 148 160 public void start() { 161 log.debug("starting simulator {}", this); 162 call(simulatorPath, "start", new Object[] {}, Object.class, FutureHandler.doNothing(Object.class, this)); 149 163 } 150 164 151 165 @ParamAnnotation(paramType = ProcedureParam.class) 152 166 public void stop() { 167 log.debug("stoping simulator {}", this); 153 168 } 154 169 … … 156 171 public void abort() { 157 172 assert isActive(); 158 log. debug("explicitly aborting {}", this);173 log.info("explicitly aborting {}", this); 159 174 experiment.removeSimulator(this); 160 175 interruptJoinable(); … … 184 199 protected final AtomicInteger netloadIdCounter = new AtomicInteger(); 185 200 186 public void uploadNet(final File file, final Future<Object> future) {201 public <N> void netload(final N net, final Future<Object> future) { 187 202 final String netloadId = "NetLoadSaveLogic" + netloadIdCounter.getAndIncrement(); 188 203 204 final File file = AccessOperations.convert(File.class, net, getRemoteTree().getRegistry()); 189 205 log.debug("uploading file {} to {} identified by {}", file, simulatorPath, netloadId); 190 206 … … 215 231 simulatorPath.getTree().addNeedFileAcceptor(Integer.MIN_VALUE, acceptor.get()); 216 232 217 call(simulatorPath, getFramsClass(simulatorPath).getParamEntry("netload_id", ProcedureParam.class), new Object[] { netloadId }, new FutureHandler<Object>(future) {233 call(simulatorPath, getFramsClass(simulatorPath).getParamEntry("netload_id", ProcedureParam.class), new Object[] { netloadId }, Object.class, new FutureHandler<Object>(future) { 218 234 219 235 @Override … … 225 241 226 242 } 243 244 public <N> void netsave(Class<N> netJavaClass, final Future<N> futureNet) { 245 call(simulatorPath, getFramsClass(simulatorPath).getParamEntry("netsave", ProcedureParam.class), new Object[] { }, netJavaClass, new FutureHandler<N>(futureNet) { 246 247 @Override 248 protected void result(N net) { 249 log.debug("download of {} done", net); 250 futureNet.pass(net); 251 } 252 }); 253 254 } 227 255 } -
java/main/src/main/java/com/framsticks/experiment/WorkPackage.java
r102 r103 1 1 package com.framsticks.experiment; 2 2 3 public interface WorkPackage { 3 public interface WorkPackage<S extends WorkPackage<S>> extends NetFile { 4 5 public String sumUpTask(); 6 public String sumUpResult(); 7 8 S getRemainder(S result); 4 9 5 10 } -
java/main/src/main/java/com/framsticks/experiment/WorkPackageLogic.java
r102 r103 1 1 package com.framsticks.experiment; 2 3 import java.util.IdentityHashMap; 4 import java.util.LinkedList; 5 import java.util.Map; 2 6 3 7 import org.apache.logging.log4j.Logger; 4 8 import org.apache.logging.log4j.LogManager; 5 9 6 import com.framsticks.communication.File; 7 import com.framsticks.params.ListSource; 10 import com.framsticks.core.Message; 11 import com.framsticks.core.ValueChange; 12 import com.framsticks.params.EventListener; 13 import com.framsticks.params.MessageLogger; 14 import com.framsticks.params.SimplePrimitive; 8 15 import com.framsticks.params.annotations.FramsClassAnnotation; 9 16 import com.framsticks.params.annotations.ParamAnnotation; … … 11 18 import com.framsticks.util.dispatching.FutureHandler; 12 19 13 @FramsClassAnnotation 14 public abstract class WorkPackageLogic<WP extends WorkPackage > extends AbstractExperimentLogic {20 @FramsClassAnnotation(order = {"netLoadSaveLogic", "newestTaskScheduled", "newestResultReceived"}) 21 public abstract class WorkPackageLogic<WP extends WorkPackage<WP>> extends AbstractExperimentLogic { 15 22 16 23 private static final Logger log = LogManager.getLogger(WorkPackageLogic.class); 17 24 18 25 @ParamAnnotation 19 public final NetLoadSaveLogic netLoadSaveLogic; 26 public final NetLoadSaveLogic<WP> netLoadSaveLogic; 27 28 protected final Class<WP> packageJavaClass; 29 30 protected final MessageLogger messages = new MessageLogger(WorkPackageLogic.class); 31 32 protected final Map<Simulator, WP> sentPackages = new IdentityHashMap<>(); 33 protected final LinkedList<WP> queuedPackages = new LinkedList<>(); 34 35 protected final SimplePrimitive<String> newestTaskSent = new SimplePrimitive<>(); 36 protected final SimplePrimitive<String> newestResultReceived = new SimplePrimitive<>(); 37 20 38 21 39 /** 22 40 * @param experiment 23 41 */ 24 public WorkPackageLogic(Experiment experiment ) {42 public WorkPackageLogic(Experiment experiment, Class<WP> packageJavaClass) { 25 43 super(experiment); 44 this.packageJavaClass = packageJavaClass; 26 45 27 netLoadSaveLogic = new NetLoadSaveLogic (experiment) {46 netLoadSaveLogic = new NetLoadSaveLogic<WP>(experiment, packageJavaClass) { 28 47 29 48 @Override 30 public void netload( Simulator simulator, final Future<File> net) {49 public void netload(final Simulator simulator, final Future<WP> netFuture) { 31 50 assert experiment.isActive(); 32 51 log.debug("providing netload file for {}", simulator); 33 52 34 generateNextPackage(new FutureHandler<WP> (net) {53 findNextPackage(new FutureHandler<WP> (netFuture) { 35 54 36 55 @Override 37 protected void result(WP result) { 38 log.debug("sending package: {}", result); 39 File file = new File("test-file", ListSource.createFrom("# a test")); 56 protected void result(WP net) { 57 assert experiment.isActive(); 40 58 41 net.pass(file); 59 if (net == null) { 60 log.debug("no more packages left"); 61 return; 62 } 63 WorkPackageLogic.this.messages.info("netload", net.sumUpTask()); 64 log.debug("sending package: {}", net); 65 newestTaskSent.set(net.sumUpTask()); 66 sentPackages.put(simulator, net); 67 68 netFuture.pass(net); 42 69 } 43 70 }); … … 46 73 47 74 @Override 48 public void netsave(Simulator simulator, File net) { 75 public void netsave(Simulator simulator, WP netResult) { 76 assert experiment.isActive(); 77 78 log.debug("received package from {}: {}", simulator, netResult); 79 WorkPackageLogic.this.messages.info("netsave", netResult.toString()); 80 81 WP netSent = sentPackages.get(simulator); 82 if (netSent == null) { 83 log.error("no task found in {} for received result {}", simulator, netResult); 84 return; 85 } 86 sentPackages.remove(simulator); 87 88 try { 89 WP netRemainder = netSent.getRemainder(netResult); 90 if (netRemainder != null) { 91 log.warn("queueing remainder: {}", netRemainder); 92 queuedPackages.add(netRemainder); 93 } 94 } catch (InvalidWorkPackage e) { 95 log.error("in simulator {}, result {} is in no relation to task {} ({}), rescheduling", simulator, netResult, netSent, e.getShortMessage(new StringBuilder())); 96 queuedPackages.add(netSent); 97 return; 98 } 99 100 returnPackage(netResult); 49 101 // processFile(); 50 102 } … … 53 105 54 106 107 108 protected void findNextPackage(final Future<WP> future) { 109 if (!queuedPackages.isEmpty()) { 110 WP workPackage = queuedPackages.pollFirst(); 111 future.pass(workPackage); 112 return; 113 } 114 115 generateNextPackage(new FutureHandler<WP>(experiment) { 116 117 @Override 118 protected void result(WP result) { 119 if (result == null) { 120 log.info("no more packages left"); 121 future.pass(null); 122 return; 123 } 124 future.pass(result); 125 } 126 }); 127 } 128 55 129 protected abstract void generateNextPackage(Future<WP> future); 130 protected abstract void returnPackage(WP workPackage); 131 132 /** 133 * @return the newestTaskScheduled 134 */ 135 @ParamAnnotation(id = "newest_task_sent") 136 public String getNewestTaskSent() { 137 return newestTaskSent.get(); 138 } 139 140 /** 141 * @return the newestResultReceived 142 */ 143 @ParamAnnotation(id = "newest_result_received") 144 public String getNewestResultReceived() { 145 return newestResultReceived.get(); 146 } 147 148 @ParamAnnotation(id = "newest_task_sent_changed") 149 public void addNewestTaskSentListener(EventListener<ValueChange> listener) { 150 newestTaskSent.addListener(listener); 151 } 152 153 @ParamAnnotation(id = "newest_task_sent_changed") 154 public void removeNewestTaskSentListener(EventListener<ValueChange> listener) { 155 newestTaskSent.removeListener(listener); 156 } 157 158 @ParamAnnotation(id = "newest_result_received_changed") 159 public void addNewestResultReceivedListener(EventListener<ValueChange> listener) { 160 newestResultReceived.addListener(listener); 161 } 162 163 @ParamAnnotation(id = "newest_result_received_changed") 164 public void removeNewestResultReceivedListener(EventListener<ValueChange> listener) { 165 newestResultReceived.removeListener(listener); 166 } 167 168 @ParamAnnotation(id = "messages") 169 public void addMessageListener(EventListener<Message> listener) { 170 messages.add(listener); 171 } 172 173 @ParamAnnotation(id = "messages") 174 public void removeMessageListener(EventListener<Message> listener) { 175 messages.remove(listener); 176 } 177 178 56 179 57 180 }
Note: See TracChangeset
for help on using the changeset viewer.