source: java/main/src/main/java/com/framsticks/experiment/WorkPackageLogic.java @ 105

Last change on this file since 105 was 105, checked in by psniegowski, 11 years ago

HIGHLIGHTS:

  • import refactorization: move Tree, Path, etc.

from core to structure package

  • initial serialization implementation
  • improve PrimeExperiment? test
  • many organizational changes and convenience improvements

CHANGELOG:
Make registry in AbstractTree? final.

Move most classes from core to structure package.

Minor changes.

Switch names of Future and FutureHandler?.

Rename ExceptionResultHandler? to ExceptionHandler?.

Rename ExceptionHandler? to ExceptionDispatcherHandler?.

Fix bug in ParamCandidate? cache.

Add missing synchronization to the BufferedDispatcher?.

Develop @Serialized support.

Rework serialization further.

Add serialization/deserialization interface to ValueParam?.

Move getStorageType and isNumeric from Param down to params hierarchy.

Minor changes.

Improve param type induction.

Add TestSerializedClass? for testing new serialization.

Add info files gor GenePool? and Population.

Add standard.expt exemplary netfile.

Add type name field to PropertiesObject?.

Use PropertiesObject? for PropertiesAccess? instead of ordinary map.

Hide getFramsClass is several more places.

More unification accross FramsClass?, Access and Path.

Add ParamCollection?.

Simplify interface for getting params from FramsClass?, Access
or Path.

Make Access.call() interface variadic.

Add arguments(args) convenience wrapper around new Object[] {args}.

Upgrade to apache.commons.lang version 3.1

Minor improvement with Response constructors.

Develop proper result printing in ClientAtServer?.

Add experimentNetsave to PrimeExperiment?.

File size: 5.2 KB
Line 
1package com.framsticks.experiment;
2
3import java.util.IdentityHashMap;
4import java.util.LinkedList;
5import java.util.Map;
6
7import org.apache.logging.log4j.Logger;
8import org.apache.logging.log4j.LogManager;
9
10import com.framsticks.params.EventListener;
11import com.framsticks.params.MessageLogger;
12import com.framsticks.params.SimplePrimitive;
13import com.framsticks.params.annotations.FramsClassAnnotation;
14import com.framsticks.params.annotations.ParamAnnotation;
15import com.framsticks.structure.messages.Message;
16import com.framsticks.structure.messages.ValueChange;
17import com.framsticks.util.dispatching.FutureHandler;
18import com.framsticks.util.dispatching.Future;
19
20@FramsClassAnnotation(order = {"netLoadSaveLogic", "newestTaskScheduled", "newestResultReceived"})
21public abstract class WorkPackageLogic<WP extends WorkPackage<WP>> extends AbstractExperimentLogic {
22
23        private static final Logger log = LogManager.getLogger(WorkPackageLogic.class);
24
25        @ParamAnnotation
26        public final NetLoadSaveLogic<WP> netLoadSaveLogic;
27
28        protected final Class<WP> packageJavaClass;
29
30        protected final MessageLogger messages = new MessageLogger(WorkPackageLogic.class);
31
32        protected final Map<Simulator, WP> sentPackages = new IdentityHashMap<>();
33        protected final LinkedList<WP> queuedPackages = new LinkedList<>();
34
35        protected final SimplePrimitive<String> newestTaskSent = new SimplePrimitive<>();
36        protected final SimplePrimitive<String> newestResultReceived = new SimplePrimitive<>();
37
38
39        /**
40         * @param experiment
41         */
42        public WorkPackageLogic(Experiment experiment, Class<WP> packageJavaClass) {
43                super(experiment);
44                this.packageJavaClass = packageJavaClass;
45
46                netLoadSaveLogic = new NetLoadSaveLogic<WP>(experiment, packageJavaClass) {
47
48                        @Override
49                        public void netload(final Simulator simulator, final FutureHandler<WP> netFuture) {
50                                assert experiment.isActive();
51                                log.debug("providing netload file for {}", simulator);
52
53                                findNextPackage(new Future<WP> (netFuture) {
54
55                                        @Override
56                                        protected void result(WP net) {
57                                                assert experiment.isActive();
58
59                                                if (net == null) {
60                                                        log.debug("no more packages left");
61                                                        return;
62                                                }
63                                                WorkPackageLogic.this.messages.info("netload", net.sumUpTask());
64                                                log.debug("sending package: {}", net);
65                                                newestTaskSent.set(net.sumUpTask());
66                                                sentPackages.put(simulator, net);
67
68                                                netFuture.pass(net);
69                                        }
70                                });
71
72                        }
73
74                        @Override
75                        public void netsave(Simulator simulator, WP netResult) {
76                                assert experiment.isActive();
77
78                                log.debug("received package from {}: {}", simulator, netResult);
79                                WorkPackageLogic.this.messages.info("netsave", netResult.toString());
80
81                                WP netSent = sentPackages.get(simulator);
82                                if (netSent == null) {
83                                        log.error("no task found in {} for received result {}", simulator, netResult);
84                                        return;
85                                }
86                                sentPackages.remove(simulator);
87
88                                try {
89                                        WP netRemainder = netSent.getRemainder(netResult);
90                                        if (netRemainder != null) {
91                                                log.warn("queueing remainder: {}", netRemainder);
92                                                queuedPackages.add(netRemainder);
93                                        }
94                                } catch (InvalidWorkPackage e) {
95                                        log.error("in simulator {}, result {} is in no relation to task {} ({}), rescheduling", simulator, netResult, netSent, e.getShortMessage(new StringBuilder()));
96                                        queuedPackages.add(netSent);
97                                        return;
98                                }
99
100                                returnPackage(netResult);
101                                // processFile();
102                        }
103                };
104        }
105
106
107
108        protected void findNextPackage(final FutureHandler<WP> future) {
109                if (!queuedPackages.isEmpty()) {
110                        WP workPackage = queuedPackages.pollFirst();
111                        future.pass(workPackage);
112                        return;
113                }
114
115                generateNextPackage(new Future<WP>(experiment) {
116
117                        @Override
118                        protected void result(WP result) {
119                                if (result == null) {
120                                        log.info("no more packages left");
121                                        future.pass(null);
122                                        return;
123                                }
124                                future.pass(result);
125                        }
126                });
127        }
128
129        protected abstract void generateNextPackage(FutureHandler<WP> future);
130        protected abstract void returnPackage(WP workPackage);
131
132        /**
133         * @return the newestTaskScheduled
134         */
135        @ParamAnnotation(id = "newest_task_sent")
136        public String getNewestTaskSent() {
137                return newestTaskSent.get();
138        }
139
140        /**
141         * @return the newestResultReceived
142         */
143        @ParamAnnotation(id = "newest_result_received")
144        public String getNewestResultReceived() {
145                return newestResultReceived.get();
146        }
147
148        @ParamAnnotation(id = "newest_task_sent_changed")
149        public void addNewestTaskSentListener(EventListener<ValueChange> listener) {
150                newestTaskSent.addListener(listener);
151        }
152
153        @ParamAnnotation(id = "newest_task_sent_changed")
154        public void removeNewestTaskSentListener(EventListener<ValueChange> listener) {
155                newestTaskSent.removeListener(listener);
156        }
157
158        @ParamAnnotation(id = "newest_result_received_changed")
159        public void addNewestResultReceivedListener(EventListener<ValueChange> listener) {
160                newestResultReceived.addListener(listener);
161        }
162
163        @ParamAnnotation(id = "newest_result_received_changed")
164        public void removeNewestResultReceivedListener(EventListener<ValueChange> listener) {
165                newestResultReceived.removeListener(listener);
166        }
167
168        @ParamAnnotation(id = "messages")
169        public void addMessageListener(EventListener<Message> listener) {
170                messages.add(listener);
171        }
172
173        @ParamAnnotation(id = "messages")
174        public void removeMessageListener(EventListener<Message> listener) {
175                messages.remove(listener);
176        }
177
178
179
180}
Note: See TracBrowser for help on using the repository browser.