source: java/main/src/main/java/com/framsticks/experiment/WorkPackageLogic.java @ 193

Last change on this file since 193 was 193, checked in by Maciej Komosinski, 10 years ago

Set svn:eol-style native for all textual files

  • Property svn:eol-style set to native
File size: 5.2 KB
Line 
1package com.framsticks.experiment;
2
3import java.util.IdentityHashMap;
4import java.util.LinkedList;
5import java.util.Map;
6
7import org.apache.logging.log4j.Logger;
8import org.apache.logging.log4j.LogManager;
9
10import com.framsticks.params.EventListener;
11import com.framsticks.params.MessageLogger;
12import com.framsticks.params.SimplePrimitive;
13import com.framsticks.params.annotations.FramsClassAnnotation;
14import com.framsticks.params.annotations.ParamAnnotation;
15import com.framsticks.structure.messages.Message;
16import com.framsticks.structure.messages.ValueChange;
17import com.framsticks.util.dispatching.FutureHandler;
18import com.framsticks.util.dispatching.Future;
19
20@FramsClassAnnotation(order = {"netLoadSaveLogic", "newestTaskScheduled", "newestResultReceived"})
21public abstract class WorkPackageLogic<WP extends WorkPackage<WP>> extends AbstractExperimentLogic {
22
23        private static final Logger log = LogManager.getLogger(WorkPackageLogic.class);
24
25        @ParamAnnotation
26        public final NetLoadSaveLogic<WP> netLoadSaveLogic;
27
28        protected final Class<WP> packageJavaClass;
29
30        protected final MessageLogger messages = new MessageLogger(WorkPackageLogic.class);
31
32        protected final Map<Simulator, WP> sentPackages = new IdentityHashMap<>();
33        protected final LinkedList<WP> queuedPackages = new LinkedList<>();
34
35        protected final SimplePrimitive<String> newestTaskSent = new SimplePrimitive<>();
36        protected final SimplePrimitive<String> newestResultReceived = new SimplePrimitive<>();
37
38
39        /**
40         * @param experiment
41         */
42        public WorkPackageLogic(Experiment experiment, Class<WP> packageJavaClass) {
43                super(experiment);
44                this.packageJavaClass = packageJavaClass;
45
46                netLoadSaveLogic = new NetLoadSaveLogic<WP>(experiment, packageJavaClass) {
47
48                        @Override
49                        public void netload(final Simulator simulator, final FutureHandler<WP> netFuture) {
50                                assert experiment.isActive();
51                                log.debug("providing netload file for {}", simulator);
52
53                                findNextPackage(new Future<WP> (netFuture) {
54
55                                        @Override
56                                        protected void result(WP net) {
57                                                assert experiment.isActive();
58
59                                                if (net == null) {
60                                                        log.debug("no more packages left");
61                                                        return;
62                                                }
63                                                WorkPackageLogic.this.messages.info("netload", net.sumUpTask());
64                                                log.debug("sending package: {}", net);
65                                                newestTaskSent.set(net.sumUpTask());
66                                                sentPackages.put(simulator, net);
67
68                                                netFuture.pass(net);
69                                        }
70                                });
71
72                        }
73
74                        @Override
75                        public void netsave(Simulator simulator, WP netResult) {
76                                assert experiment.isActive();
77
78                                log.info("received package from {}: {}", simulator, netResult);
79                                WorkPackageLogic.this.messages.info("netsave", netResult.toString());
80
81                                WP netSent = sentPackages.get(simulator);
82                                if (netSent == null) {
83                                        log.error("no task found in {} for received result {}", simulator, netResult);
84                                        return;
85                                }
86                                sentPackages.remove(simulator);
87
88                                try {
89                                        WP netRemainder = netSent.getRemainder(netResult);
90                                        if (netRemainder != null) {
91                                                log.warn("queueing remainder: {}", netRemainder);
92                                                queuedPackages.add(netRemainder);
93                                        }
94                                } catch (InvalidWorkPackage e) {
95                                        log.error("in simulator {}, result {} is in no relation to task {} ({}), rescheduling", simulator, netResult, netSent, e.getShortMessage(new StringBuilder()));
96                                        queuedPackages.add(netSent);
97                                        return;
98                                }
99
100                                returnPackage(netResult);
101                                // processFile();
102                        }
103                };
104        }
105
106
107
108        protected void findNextPackage(final FutureHandler<WP> future) {
109                if (!queuedPackages.isEmpty()) {
110                        WP workPackage = queuedPackages.pollFirst();
111                        future.pass(workPackage);
112                        return;
113                }
114
115                generateNextPackage(new Future<WP>(experiment) {
116
117                        @Override
118                        protected void result(WP result) {
119                                if (result == null) {
120                                        log.info("no more packages left");
121                                        future.pass(null);
122                                        return;
123                                }
124                                future.pass(result);
125                        }
126                });
127        }
128
129        protected abstract void generateNextPackage(FutureHandler<WP> future);
130        protected abstract void returnPackage(WP workPackage);
131
132        /**
133         * @return the newestTaskScheduled
134         */
135        @ParamAnnotation(id = "newest_task_sent")
136        public String getNewestTaskSent() {
137                return newestTaskSent.get();
138        }
139
140        /**
141         * @return the newestResultReceived
142         */
143        @ParamAnnotation(id = "newest_result_received")
144        public String getNewestResultReceived() {
145                return newestResultReceived.get();
146        }
147
148        @ParamAnnotation(id = "newest_task_sent_changed")
149        public void addNewestTaskSentListener(EventListener<ValueChange> listener) {
150                newestTaskSent.addListener(listener);
151        }
152
153        @ParamAnnotation(id = "newest_task_sent_changed")
154        public void removeNewestTaskSentListener(EventListener<ValueChange> listener) {
155                newestTaskSent.removeListener(listener);
156        }
157
158        @ParamAnnotation(id = "newest_result_received_changed")
159        public void addNewestResultReceivedListener(EventListener<ValueChange> listener) {
160                newestResultReceived.addListener(listener);
161        }
162
163        @ParamAnnotation(id = "newest_result_received_changed")
164        public void removeNewestResultReceivedListener(EventListener<ValueChange> listener) {
165                newestResultReceived.removeListener(listener);
166        }
167
168        @ParamAnnotation(id = "messages")
169        public void addMessageListener(EventListener<Message> listener) {
170                messages.add(listener);
171        }
172
173        @ParamAnnotation(id = "messages")
174        public void removeMessageListener(EventListener<Message> listener) {
175                messages.remove(listener);
176        }
177
178
179
180}
Note: See TracBrowser for help on using the repository browser.