- Timestamp:
- 07/18/13 23:52:25 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
java/main/src/main/java/com/framsticks/experiment/WorkPackageLogic.java
r102 r103 1 1 package com.framsticks.experiment; 2 3 import java.util.IdentityHashMap; 4 import java.util.LinkedList; 5 import java.util.Map; 2 6 3 7 import org.apache.logging.log4j.Logger; 4 8 import org.apache.logging.log4j.LogManager; 5 9 6 import com.framsticks.communication.File; 7 import com.framsticks.params.ListSource; 10 import com.framsticks.core.Message; 11 import com.framsticks.core.ValueChange; 12 import com.framsticks.params.EventListener; 13 import com.framsticks.params.MessageLogger; 14 import com.framsticks.params.SimplePrimitive; 8 15 import com.framsticks.params.annotations.FramsClassAnnotation; 9 16 import com.framsticks.params.annotations.ParamAnnotation; … … 11 18 import com.framsticks.util.dispatching.FutureHandler; 12 19 13 @FramsClassAnnotation 14 public abstract class WorkPackageLogic<WP extends WorkPackage > extends AbstractExperimentLogic {20 @FramsClassAnnotation(order = {"netLoadSaveLogic", "newestTaskScheduled", "newestResultReceived"}) 21 public abstract class WorkPackageLogic<WP extends WorkPackage<WP>> extends AbstractExperimentLogic { 15 22 16 23 private static final Logger log = LogManager.getLogger(WorkPackageLogic.class); 17 24 18 25 @ParamAnnotation 19 public final NetLoadSaveLogic netLoadSaveLogic; 26 public final NetLoadSaveLogic<WP> netLoadSaveLogic; 27 28 protected final Class<WP> packageJavaClass; 29 30 protected final MessageLogger messages = new MessageLogger(WorkPackageLogic.class); 31 32 protected final Map<Simulator, WP> sentPackages = new IdentityHashMap<>(); 33 protected final LinkedList<WP> queuedPackages = new LinkedList<>(); 34 35 protected final SimplePrimitive<String> newestTaskSent = new SimplePrimitive<>(); 36 protected final SimplePrimitive<String> newestResultReceived = new SimplePrimitive<>(); 37 20 38 21 39 /** 22 40 * @param experiment 23 41 */ 24 public WorkPackageLogic(Experiment experiment ) {42 public WorkPackageLogic(Experiment experiment, Class<WP> packageJavaClass) { 25 43 super(experiment); 44 this.packageJavaClass = packageJavaClass; 26 45 27 netLoadSaveLogic = new NetLoadSaveLogic (experiment) {46 netLoadSaveLogic = new NetLoadSaveLogic<WP>(experiment, packageJavaClass) { 28 47 29 48 @Override 30 public void netload( Simulator simulator, final Future<File> net) {49 public void netload(final Simulator simulator, final Future<WP> netFuture) { 31 50 assert experiment.isActive(); 32 51 log.debug("providing netload file for {}", simulator); 33 52 34 generateNextPackage(new FutureHandler<WP> (net) {53 findNextPackage(new FutureHandler<WP> (netFuture) { 35 54 36 55 @Override 37 protected void result(WP result) { 38 log.debug("sending package: {}", result); 39 File file = new File("test-file", ListSource.createFrom("# a test")); 56 protected void result(WP net) { 57 assert experiment.isActive(); 40 58 41 net.pass(file); 59 if (net == null) { 60 log.debug("no more packages left"); 61 return; 62 } 63 WorkPackageLogic.this.messages.info("netload", net.sumUpTask()); 64 log.debug("sending package: {}", net); 65 newestTaskSent.set(net.sumUpTask()); 66 sentPackages.put(simulator, net); 67 68 netFuture.pass(net); 42 69 } 43 70 }); … … 46 73 47 74 @Override 48 public void netsave(Simulator simulator, File net) { 75 public void netsave(Simulator simulator, WP netResult) { 76 assert experiment.isActive(); 77 78 log.debug("received package from {}: {}", simulator, netResult); 79 WorkPackageLogic.this.messages.info("netsave", netResult.toString()); 80 81 WP netSent = sentPackages.get(simulator); 82 if (netSent == null) { 83 log.error("no task found in {} for received result {}", simulator, netResult); 84 return; 85 } 86 sentPackages.remove(simulator); 87 88 try { 89 WP netRemainder = netSent.getRemainder(netResult); 90 if (netRemainder != null) { 91 log.warn("queueing remainder: {}", netRemainder); 92 queuedPackages.add(netRemainder); 93 } 94 } catch (InvalidWorkPackage e) { 95 log.error("in simulator {}, result {} is in no relation to task {} ({}), rescheduling", simulator, netResult, netSent, e.getShortMessage(new StringBuilder())); 96 queuedPackages.add(netSent); 97 return; 98 } 99 100 returnPackage(netResult); 49 101 // processFile(); 50 102 } … … 53 105 54 106 107 108 protected void findNextPackage(final Future<WP> future) { 109 if (!queuedPackages.isEmpty()) { 110 WP workPackage = queuedPackages.pollFirst(); 111 future.pass(workPackage); 112 return; 113 } 114 115 generateNextPackage(new FutureHandler<WP>(experiment) { 116 117 @Override 118 protected void result(WP result) { 119 if (result == null) { 120 log.info("no more packages left"); 121 future.pass(null); 122 return; 123 } 124 future.pass(result); 125 } 126 }); 127 } 128 55 129 protected abstract void generateNextPackage(Future<WP> future); 130 protected abstract void returnPackage(WP workPackage); 131 132 /** 133 * @return the newestTaskScheduled 134 */ 135 @ParamAnnotation(id = "newest_task_sent") 136 public String getNewestTaskSent() { 137 return newestTaskSent.get(); 138 } 139 140 /** 141 * @return the newestResultReceived 142 */ 143 @ParamAnnotation(id = "newest_result_received") 144 public String getNewestResultReceived() { 145 return newestResultReceived.get(); 146 } 147 148 @ParamAnnotation(id = "newest_task_sent_changed") 149 public void addNewestTaskSentListener(EventListener<ValueChange> listener) { 150 newestTaskSent.addListener(listener); 151 } 152 153 @ParamAnnotation(id = "newest_task_sent_changed") 154 public void removeNewestTaskSentListener(EventListener<ValueChange> listener) { 155 newestTaskSent.removeListener(listener); 156 } 157 158 @ParamAnnotation(id = "newest_result_received_changed") 159 public void addNewestResultReceivedListener(EventListener<ValueChange> listener) { 160 newestResultReceived.addListener(listener); 161 } 162 163 @ParamAnnotation(id = "newest_result_received_changed") 164 public void removeNewestResultReceivedListener(EventListener<ValueChange> listener) { 165 newestResultReceived.removeListener(listener); 166 } 167 168 @ParamAnnotation(id = "messages") 169 public void addMessageListener(EventListener<Message> listener) { 170 messages.add(listener); 171 } 172 173 @ParamAnnotation(id = "messages") 174 public void removeMessageListener(EventListener<Message> listener) { 175 messages.remove(listener); 176 } 177 178 56 179 57 180 }
Note: See TracChangeset
for help on using the changeset viewer.