source: java/main/src/main/java/com/framsticks/util/dispatching/Dispatching.java @ 102

Last change on this file since 102 was 102, checked in by psniegowski, 11 years ago

HIGHLIGHTS:

for Joinables running

CHANGELOG:
Add WorkPackageLogic? and classes representing prime experiment state.

Add classes for PrimeExperiment? state.

Extract single netload routine in Simulator.

Working netload with dummy content in PrimeExperiment?.

More development with NetLoadSaveLogic? and PrimeExperiment?.

Improvement around prime.

Improve BufferedDispatcher?.isActive logic.

Add prime-all.xml configuration.

Manual connecting to existing simulators from GUI.

Guard in SimulatorConnector? against expdef mismatch.

Guard against empty target dispatcher in BufferedDispatcher?.

Make BufferedDispatcher? a Dispatcher (and Joinable).

Minor improvements.

Done StackedJoinable?, improve Experiment.

Develop StackedJoinable?.

Add StackedJoinable? utility joinables controller.

Add dependency on apache-commons-lang.

Add ready ListChange? on Simulators.

Improve hints in ListChange?.

Several improvements.

Found bug with dispatching in Experiment.

Minor improvements.

Fix bug with early finishing Server.

Many changes in Dispatching.

Fix bug with connection.

Do not obfuscate log with socket related exceptions.

Add SocketClosedException?.

Add SimulatorConnector?.

Work out conception of experiment composing of logics building blocks.

Rename SinkInterface? to Sink.

Move saving of Accesses into AccessOperations?.

Some improvements to Experiment.

Improve joinables.

Fix issue with joinables closing.

Add direct and managed consoles to popup menu.

File size: 7.4 KB
Line 
1package com.framsticks.util.dispatching;
2
3import java.util.Timer;
4
5import org.apache.logging.log4j.Level;
6import org.apache.logging.log4j.Logger;
7import org.apache.logging.log4j.LogManager;
8
9import com.framsticks.util.FramsticksException;
10import com.framsticks.util.Misc;
11
12/**
13 * @author Piotr Sniegowski
14 */
15public abstract class Dispatching {
16        private static final Logger log = LogManager.getLogger(Dispatching.class);
17
18        protected static final Timer timer = new Timer();
19
20        public static Timer getTimer() {
21                return timer;
22        }
23
24        public static boolean isThreadSafe() {
25                return true;
26        }
27
28        public static <C> void dispatchIfNotActive(Dispatcher<C> dispatcher, RunAt<? extends C> runnable) {
29                if (dispatcher.isActive()) {
30                        runnable.runAt();
31                        return;
32                }
33                dispatcher.dispatch(runnable);
34        }
35
36        public static <C> void dispatch(Dispatcher<C> dispatcher, RunAt<? extends C> runnable) {
37                dispatcher.dispatch(runnable);
38        }
39
40        // public static boolean assertInvokeLater(Dispatcher dispatcher, RunAt runnable) {
41        //      dispatcher.invokeLater(runnable);
42        //      return true;
43        // }
44
45        public static <P, C> void invokeDispatch(Dispatcher<P> dispatcher, final Dispatcher<C> finalDispatcher, final RunAt<C> runnable) {
46                dispatcher.dispatch(new RunAt<P>(runnable) {
47                        @Override
48                        protected void runAt() {
49                                finalDispatcher.dispatch(runnable);
50                        }
51                });
52        }
53
54        public static void sleep(double seconds) {
55                log.debug("sleeping");
56                try {
57                        java.lang.Thread.sleep((long) (seconds * 1000));
58                } catch (InterruptedException e) {
59
60                }
61                log.debug("slept");
62        }
63
64        @SuppressWarnings("unchecked")
65        public static void dispatcherGuardedInvoke(Joinable joinable, RunAt<?> runnable) {
66                if (joinable instanceof Dispatcher) {
67                        dispatchIfNotActive(Dispatcher.class.cast(joinable), runnable);
68                        return;
69                }
70                runnable.runAt();
71        }
72
73        public static void use(final Joinable joinable, final JoinableParent owner) {
74                Misc.throwIfNull(joinable);
75                log.debug("using {} by {}", joinable, owner);
76                if (joinable.use(owner)) {
77                        log.debug("started {}", joinable);
78                } else {
79                        log.debug("start of {} already happened", joinable);
80                }
81        }
82
83        public static void drop(final Joinable joinable, final JoinableParent owner) {
84                Misc.throwIfNull(joinable);
85                log.debug("droping {} by {}", joinable, owner);
86                if (joinable.drop(owner)) {
87                        log.debug("stoped {}", joinable);
88                } else {
89                        log.debug("stop of {} deferred", joinable);
90                }
91        }
92
93        public static void join(Joinable joinable) throws InterruptedException {
94                Misc.throwIfNull(joinable);
95                log.debug("joining {}", joinable);
96                try {
97                        joinable.join();
98                } catch (InterruptedException e) {
99                        log.debug("failed to join {}", joinable);
100                        throw e;
101                }
102                log.debug("joined {}", joinable);
103        }
104
105        public static void childChangedState(final JoinableParent parent, final Joinable joinable, final JoinableState state) {
106                if (state.ordinal() < JoinableState.RUNNING.ordinal()) {
107                        return;
108                }
109                dispatcherGuardedInvoke(joinable, new RunAt<Object>(ThrowExceptionHandler.getInstance()) {
110                        @Override
111                        protected void runAt() {
112                                log.debug("joinable {} is notifying parent {} about change to {}", joinable, parent, state);
113                                parent.childChangedState(joinable, state);
114                        }
115                });
116        }
117
118        public static void wait(Object object, long millis) {
119                try {
120                        synchronized (object) {
121                                object.wait(millis);
122                        }
123                } catch (InterruptedException e) {
124                        log.debug("failed to wait on {} because of: {}", object, e.getMessage());
125                }
126        }
127
128        public static void joinAbsolutely(Joinable joinable) {
129                log.debug("joining absolutely {}", joinable);
130                while (true) {
131                        try {
132                                Dispatching.join(joinable);
133                                return;
134                        } catch (InterruptedException e) {
135                                log.debug("failed to join {} because of: {}", joinable, e.getMessage());
136                                // throw new FramsticksException().msg("failed to join").arg("dispatcher", dispatcher).cause(e);
137                        }
138                        log.debug("waiting for {}", joinable);
139                        wait(joinable, 500);
140                }
141        }
142
143        public interface Query<T> extends ExceptionResultHandler {
144                T get();
145        }
146
147        public static abstract class QueryHandler<T> implements Query<T> {
148                ExceptionResultHandler handler;
149
150                /**
151                 * @param handler
152                 */
153                public QueryHandler(ExceptionResultHandler handler) {
154                        this.handler = handler;
155                }
156
157                @Override
158                public void handle(FramsticksException exception) {
159                        handler.handle(exception);
160                }
161        }
162
163        public static class QueryRunner<T, C> extends RunAt<C> {
164                protected final Query<T> query;
165                T result;
166                boolean ready = false;
167
168                /**
169                 * @param query
170                 */
171                public QueryRunner(Query<T> query) {
172                        super(query);
173                        this.query = query;
174                }
175
176                @Override
177                protected void runAt() {
178                        result = query.get();
179                        synchronized (this) {
180                                ready = true;
181                                this.notifyAll();
182                        }
183                }
184
185                public T get() {
186                        synchronized (this) {
187                                while (!ready) {
188                                        try {
189                                                this.wait(100);
190                                        } catch (InterruptedException e) {
191                                        }
192                                }
193                        }
194                        return result;
195                }
196        }
197
198        public static <T, C> T get(Dispatcher<C> dispatcher, Query<T> query) {
199                QueryRunner<T, C> runner = new QueryRunner<T, C>(query);
200                dispatcher.dispatch(runner);
201                return runner.get();
202        }
203
204        // public static class DispatcherWaiter<C, T extends Dispatcher<C> & Joinable> implements Dispatcher<C> {
205        //      // protected boolean done = false;
206        //      protected final T dispatcher;
207        //      protected RunAt<? extends C> runnable;
208
209        //      /**
210        //       * @param joinable
211        //       */
212        //      public DispatcherWaiter(T dispatcher) {
213        //              this.dispatcher = dispatcher;
214        //      }
215
216        //      public synchronized void waitFor() {
217        //              while ((runnable == null) && (dispatcher.getState().ordinal() <= JoinableState.RUNNING.ordinal())) {
218        //                      try {
219        //                              this.wait();
220        //                      } catch (InterruptedException e) {
221        //                      }
222        //              }
223        //              if (runnable != null) {
224        //                      runnable.run();
225        //              }
226
227        //      }
228
229        //      @Override
230        //      public boolean isActive() {
231        //              return dispatcher.isActive();
232        //      }
233
234        //      @Override
235        //      public synchronized void dispatch(RunAt<? extends C> runnable) {
236        //              this.runnable = runnable;
237        //              this.notifyAll();
238        //      }
239
240        // }
241
242        public static class Waiter {
243                protected boolean done = false;
244
245                protected final double timeOut;
246                protected final ExceptionResultHandler handler;
247
248                /**
249                 * @param timeOut
250                 */
251                public Waiter(double timeOut, ExceptionResultHandler handler) {
252                        this.timeOut = timeOut;
253                        this.handler = handler;
254                }
255
256                public synchronized void pass() {
257                        done = true;
258                        this.notify();
259                }
260
261                public synchronized void waitFor() {
262                        long end = System.currentTimeMillis() + (int)(timeOut * 1000);
263                        while ((!done) && System.currentTimeMillis() < end) {
264                                try {
265                                        this.wait(end - System.currentTimeMillis());
266                                } catch (InterruptedException e) {
267                                        break;
268                                }
269                        }
270                        if (!done) {
271                                handler.handle(new FramsticksException().msg("waiter timed out"));
272                        }
273                }
274
275                public <T> Future<T> passInFuture(Class<T> type) {
276                        return new FutureHandler<T>(handler) {
277                                @Override
278                                protected void result(T result) {
279                                        Waiter.this.pass();
280                                }
281                        };
282                }
283        }
284
285
286        public static <C> void synchronize(Dispatcher<C> dispatcher, double seconds) {
287                final Waiter waiter = new Waiter(seconds, ThrowExceptionHandler.getInstance());
288                dispatcher.dispatch(new RunAt<C>(ThrowExceptionHandler.getInstance()) {
289                        @Override
290                        protected void runAt() {
291                                waiter.pass();
292                        }
293                });
294                waiter.waitFor();
295        }
296
297        public static <C> void dispatchLog(final Dispatcher<C> dispatcher, final Logger logger, final Level level, final Object text) {
298                dispatcher.dispatch(new RunAt<C>(ThrowExceptionHandler.getInstance()) {
299
300                        @Override
301                        protected void runAt() {
302                                logger.log(level, "message dispatched into {}: {}", dispatcher, text);
303                        }
304                });
305
306        }
307
308}
Note: See TracBrowser for help on using the repository browser.