source: java/main/src/main/java/com/framsticks/experiment/WorkPackageLogic.java @ 193

Last change on this file since 193 was 193, checked in by Maciej Komosinski, 10 years ago

Set svn:eol-style native for all textual files

  • Property svn:eol-style set to native
File size: 5.2 KB
RevLine 
[102]1package com.framsticks.experiment;
2
[103]3import java.util.IdentityHashMap;
4import java.util.LinkedList;
5import java.util.Map;
6
[102]7import org.apache.logging.log4j.Logger;
8import org.apache.logging.log4j.LogManager;
9
[103]10import com.framsticks.params.EventListener;
11import com.framsticks.params.MessageLogger;
12import com.framsticks.params.SimplePrimitive;
[102]13import com.framsticks.params.annotations.FramsClassAnnotation;
14import com.framsticks.params.annotations.ParamAnnotation;
[105]15import com.framsticks.structure.messages.Message;
16import com.framsticks.structure.messages.ValueChange;
17import com.framsticks.util.dispatching.FutureHandler;
[102]18import com.framsticks.util.dispatching.Future;
19
[103]20@FramsClassAnnotation(order = {"netLoadSaveLogic", "newestTaskScheduled", "newestResultReceived"})
21public abstract class WorkPackageLogic<WP extends WorkPackage<WP>> extends AbstractExperimentLogic {
[102]22
23        private static final Logger log = LogManager.getLogger(WorkPackageLogic.class);
24
25        @ParamAnnotation
[103]26        public final NetLoadSaveLogic<WP> netLoadSaveLogic;
[102]27
[103]28        protected final Class<WP> packageJavaClass;
29
30        protected final MessageLogger messages = new MessageLogger(WorkPackageLogic.class);
31
32        protected final Map<Simulator, WP> sentPackages = new IdentityHashMap<>();
33        protected final LinkedList<WP> queuedPackages = new LinkedList<>();
34
35        protected final SimplePrimitive<String> newestTaskSent = new SimplePrimitive<>();
36        protected final SimplePrimitive<String> newestResultReceived = new SimplePrimitive<>();
37
38
[102]39        /**
40         * @param experiment
41         */
[103]42        public WorkPackageLogic(Experiment experiment, Class<WP> packageJavaClass) {
[102]43                super(experiment);
[103]44                this.packageJavaClass = packageJavaClass;
[102]45
[103]46                netLoadSaveLogic = new NetLoadSaveLogic<WP>(experiment, packageJavaClass) {
[102]47
48                        @Override
[105]49                        public void netload(final Simulator simulator, final FutureHandler<WP> netFuture) {
[102]50                                assert experiment.isActive();
51                                log.debug("providing netload file for {}", simulator);
52
[105]53                                findNextPackage(new Future<WP> (netFuture) {
[102]54
55                                        @Override
[103]56                                        protected void result(WP net) {
57                                                assert experiment.isActive();
[102]58
[103]59                                                if (net == null) {
60                                                        log.debug("no more packages left");
61                                                        return;
62                                                }
63                                                WorkPackageLogic.this.messages.info("netload", net.sumUpTask());
64                                                log.debug("sending package: {}", net);
65                                                newestTaskSent.set(net.sumUpTask());
66                                                sentPackages.put(simulator, net);
67
68                                                netFuture.pass(net);
[102]69                                        }
70                                });
71
72                        }
73
74                        @Override
[103]75                        public void netsave(Simulator simulator, WP netResult) {
76                                assert experiment.isActive();
77
[107]78                                log.info("received package from {}: {}", simulator, netResult);
[103]79                                WorkPackageLogic.this.messages.info("netsave", netResult.toString());
80
81                                WP netSent = sentPackages.get(simulator);
82                                if (netSent == null) {
83                                        log.error("no task found in {} for received result {}", simulator, netResult);
84                                        return;
85                                }
86                                sentPackages.remove(simulator);
87
88                                try {
89                                        WP netRemainder = netSent.getRemainder(netResult);
90                                        if (netRemainder != null) {
91                                                log.warn("queueing remainder: {}", netRemainder);
92                                                queuedPackages.add(netRemainder);
93                                        }
94                                } catch (InvalidWorkPackage e) {
95                                        log.error("in simulator {}, result {} is in no relation to task {} ({}), rescheduling", simulator, netResult, netSent, e.getShortMessage(new StringBuilder()));
96                                        queuedPackages.add(netSent);
97                                        return;
98                                }
99
100                                returnPackage(netResult);
[102]101                                // processFile();
102                        }
103                };
104        }
105
106
[103]107
[105]108        protected void findNextPackage(final FutureHandler<WP> future) {
[103]109                if (!queuedPackages.isEmpty()) {
110                        WP workPackage = queuedPackages.pollFirst();
111                        future.pass(workPackage);
112                        return;
113                }
114
[105]115                generateNextPackage(new Future<WP>(experiment) {
[103]116
117                        @Override
118                        protected void result(WP result) {
119                                if (result == null) {
120                                        log.info("no more packages left");
121                                        future.pass(null);
122                                        return;
123                                }
124                                future.pass(result);
125                        }
126                });
127        }
128
[105]129        protected abstract void generateNextPackage(FutureHandler<WP> future);
[103]130        protected abstract void returnPackage(WP workPackage);
[102]131
[103]132        /**
133         * @return the newestTaskScheduled
134         */
135        @ParamAnnotation(id = "newest_task_sent")
136        public String getNewestTaskSent() {
137                return newestTaskSent.get();
138        }
139
140        /**
141         * @return the newestResultReceived
142         */
143        @ParamAnnotation(id = "newest_result_received")
144        public String getNewestResultReceived() {
145                return newestResultReceived.get();
146        }
147
148        @ParamAnnotation(id = "newest_task_sent_changed")
149        public void addNewestTaskSentListener(EventListener<ValueChange> listener) {
150                newestTaskSent.addListener(listener);
151        }
152
153        @ParamAnnotation(id = "newest_task_sent_changed")
154        public void removeNewestTaskSentListener(EventListener<ValueChange> listener) {
155                newestTaskSent.removeListener(listener);
156        }
157
158        @ParamAnnotation(id = "newest_result_received_changed")
159        public void addNewestResultReceivedListener(EventListener<ValueChange> listener) {
160                newestResultReceived.addListener(listener);
161        }
162
163        @ParamAnnotation(id = "newest_result_received_changed")
164        public void removeNewestResultReceivedListener(EventListener<ValueChange> listener) {
165                newestResultReceived.removeListener(listener);
166        }
167
168        @ParamAnnotation(id = "messages")
169        public void addMessageListener(EventListener<Message> listener) {
170                messages.add(listener);
171        }
172
173        @ParamAnnotation(id = "messages")
174        public void removeMessageListener(EventListener<Message> listener) {
175                messages.remove(listener);
176        }
177
178
179
[102]180}
Note: See TracBrowser for help on using the repository browser.