source: java/main/src/main/java/com/framsticks/util/dispatching/Dispatching.java @ 105

Last change on this file since 105 was 105, checked in by psniegowski, 11 years ago

HIGHLIGHTS:

  • import refactorization: move Tree, Path, etc.

from core to structure package

  • initial serialization implementation
  • improve PrimeExperiment? test
  • many organizational changes and convenience improvements

CHANGELOG:
Make registry in AbstractTree? final.

Move most classes from core to structure package.

Minor changes.

Switch names of Future and FutureHandler?.

Rename ExceptionResultHandler? to ExceptionHandler?.

Rename ExceptionHandler? to ExceptionDispatcherHandler?.

Fix bug in ParamCandidate? cache.

Add missing synchronization to the BufferedDispatcher?.

Develop @Serialized support.

Rework serialization further.

Add serialization/deserialization interface to ValueParam?.

Move getStorageType and isNumeric from Param down to params hierarchy.

Minor changes.

Improve param type induction.

Add TestSerializedClass? for testing new serialization.

Add info files gor GenePool? and Population.

Add standard.expt exemplary netfile.

Add type name field to PropertiesObject?.

Use PropertiesObject? for PropertiesAccess? instead of ordinary map.

Hide getFramsClass is several more places.

More unification accross FramsClass?, Access and Path.

Add ParamCollection?.

Simplify interface for getting params from FramsClass?, Access
or Path.

Make Access.call() interface variadic.

Add arguments(args) convenience wrapper around new Object[] {args}.

Upgrade to apache.commons.lang version 3.1

Minor improvement with Response constructors.

Develop proper result printing in ClientAtServer?.

Add experimentNetsave to PrimeExperiment?.

File size: 7.4 KB
Line 
1package com.framsticks.util.dispatching;
2
3import java.util.Timer;
4
5import org.apache.logging.log4j.Level;
6import org.apache.logging.log4j.Logger;
7import org.apache.logging.log4j.LogManager;
8
9import com.framsticks.util.ExceptionHandler;
10import com.framsticks.util.FramsticksException;
11import com.framsticks.util.Misc;
12
13/**
14 * @author Piotr Sniegowski
15 */
16public abstract class Dispatching {
17        private static final Logger log = LogManager.getLogger(Dispatching.class);
18
19        protected static final Timer timer = new Timer();
20
21        public static Timer getTimer() {
22                return timer;
23        }
24
25        public static boolean isThreadSafe() {
26                return true;
27        }
28
29        public static <C> void dispatchIfNotActive(Dispatcher<C> dispatcher, RunAt<? extends C> runnable) {
30                if (dispatcher.isActive()) {
31                        runnable.runAt();
32                        return;
33                }
34                dispatcher.dispatch(runnable);
35        }
36
37        public static <C> void dispatch(Dispatcher<C> dispatcher, RunAt<? extends C> runnable) {
38                dispatcher.dispatch(runnable);
39        }
40
41        // public static boolean assertInvokeLater(Dispatcher dispatcher, RunAt runnable) {
42        //      dispatcher.invokeLater(runnable);
43        //      return true;
44        // }
45
46        public static <P, C> void invokeDispatch(Dispatcher<P> dispatcher, final Dispatcher<C> finalDispatcher, final RunAt<C> runnable) {
47                dispatcher.dispatch(new RunAt<P>(runnable) {
48                        @Override
49                        protected void runAt() {
50                                finalDispatcher.dispatch(runnable);
51                        }
52                });
53        }
54
55        public static void sleep(double seconds) {
56                log.debug("sleeping");
57                try {
58                        java.lang.Thread.sleep((long) (seconds * 1000));
59                } catch (InterruptedException e) {
60
61                }
62                log.debug("slept");
63        }
64
65        @SuppressWarnings("unchecked")
66        public static void dispatcherGuardedInvoke(Joinable joinable, RunAt<?> runnable) {
67                if (joinable instanceof Dispatcher) {
68                        dispatchIfNotActive(Dispatcher.class.cast(joinable), runnable);
69                        return;
70                }
71                runnable.runAt();
72        }
73
74        public static void use(final Joinable joinable, final JoinableParent owner) {
75                Misc.throwIfNull(joinable);
76                log.debug("using {} by {}", joinable, owner);
77                if (joinable.use(owner)) {
78                        log.debug("started {}", joinable);
79                } else {
80                        log.debug("start of {} already happened", joinable);
81                }
82        }
83
84        public static void drop(final Joinable joinable, final JoinableParent owner) {
85                Misc.throwIfNull(joinable);
86                log.debug("droping {} by {}", joinable, owner);
87                if (joinable.drop(owner)) {
88                        log.debug("stoped {}", joinable);
89                } else {
90                        log.debug("stop of {} deferred", joinable);
91                }
92        }
93
94        public static void join(Joinable joinable) throws InterruptedException {
95                Misc.throwIfNull(joinable);
96                log.debug("joining {}", joinable);
97                try {
98                        joinable.join();
99                } catch (InterruptedException e) {
100                        log.debug("failed to join {}", joinable);
101                        throw e;
102                }
103                log.debug("joined {}", joinable);
104        }
105
106        public static void childChangedState(final JoinableParent parent, final Joinable joinable, final JoinableState state) {
107                if (state.ordinal() < JoinableState.RUNNING.ordinal()) {
108                        return;
109                }
110                dispatcherGuardedInvoke(joinable, new RunAt<Object>(ThrowExceptionHandler.getInstance()) {
111                        @Override
112                        protected void runAt() {
113                                log.debug("joinable {} is notifying parent {} about change to {}", joinable, parent, state);
114                                parent.childChangedState(joinable, state);
115                        }
116                });
117        }
118
119        public static void wait(Object object, long millis) {
120                try {
121                        synchronized (object) {
122                                object.wait(millis);
123                        }
124                } catch (InterruptedException e) {
125                        log.debug("failed to wait on {} because of: {}", object, e.getMessage());
126                }
127        }
128
129        public static void joinAbsolutely(Joinable joinable) {
130                log.debug("joining absolutely {}", joinable);
131                while (true) {
132                        try {
133                                Dispatching.join(joinable);
134                                return;
135                        } catch (InterruptedException e) {
136                                log.debug("failed to join {} because of: {}", joinable, e.getMessage());
137                                // throw new FramsticksException().msg("failed to join").arg("dispatcher", dispatcher).cause(e);
138                        }
139                        log.debug("waiting for {}", joinable);
140                        wait(joinable, 500);
141                }
142        }
143
144        public interface Query<T> extends ExceptionHandler {
145                T get();
146        }
147
148        public static abstract class QueryHandler<T> implements Query<T> {
149                ExceptionHandler handler;
150
151                /**
152                 * @param handler
153                 */
154                public QueryHandler(ExceptionHandler handler) {
155                        this.handler = handler;
156                }
157
158                @Override
159                public void handle(FramsticksException exception) {
160                        handler.handle(exception);
161                }
162        }
163
164        public static class QueryRunner<T, C> extends RunAt<C> {
165                protected final Query<T> query;
166                T result;
167                boolean ready = false;
168
169                /**
170                 * @param query
171                 */
172                public QueryRunner(Query<T> query) {
173                        super(query);
174                        this.query = query;
175                }
176
177                @Override
178                protected void runAt() {
179                        result = query.get();
180                        synchronized (this) {
181                                ready = true;
182                                this.notifyAll();
183                        }
184                }
185
186                public T get() {
187                        synchronized (this) {
188                                while (!ready) {
189                                        try {
190                                                this.wait(100);
191                                        } catch (InterruptedException e) {
192                                        }
193                                }
194                        }
195                        return result;
196                }
197        }
198
199        public static <T, C> T get(Dispatcher<C> dispatcher, Query<T> query) {
200                QueryRunner<T, C> runner = new QueryRunner<T, C>(query);
201                dispatcher.dispatch(runner);
202                return runner.get();
203        }
204
205        // public static class DispatcherWaiter<C, T extends Dispatcher<C> & Joinable> implements Dispatcher<C> {
206        //      // protected boolean done = false;
207        //      protected final T dispatcher;
208        //      protected RunAt<? extends C> runnable;
209
210        //      /**
211        //       * @param joinable
212        //       */
213        //      public DispatcherWaiter(T dispatcher) {
214        //              this.dispatcher = dispatcher;
215        //      }
216
217        //      public synchronized void waitFor() {
218        //              while ((runnable == null) && (dispatcher.getState().ordinal() <= JoinableState.RUNNING.ordinal())) {
219        //                      try {
220        //                              this.wait();
221        //                      } catch (InterruptedException e) {
222        //                      }
223        //              }
224        //              if (runnable != null) {
225        //                      runnable.run();
226        //              }
227
228        //      }
229
230        //      @Override
231        //      public boolean isActive() {
232        //              return dispatcher.isActive();
233        //      }
234
235        //      @Override
236        //      public synchronized void dispatch(RunAt<? extends C> runnable) {
237        //              this.runnable = runnable;
238        //              this.notifyAll();
239        //      }
240
241        // }
242
243        public static class Waiter {
244                protected boolean done = false;
245
246                protected final double timeOut;
247                protected final ExceptionHandler handler;
248
249                /**
250                 * @param timeOut
251                 */
252                public Waiter(double timeOut, ExceptionHandler handler) {
253                        this.timeOut = timeOut;
254                        this.handler = handler;
255                }
256
257                public synchronized void pass() {
258                        done = true;
259                        this.notify();
260                }
261
262                public synchronized void waitFor() {
263                        long end = System.currentTimeMillis() + (int)(timeOut * 1000);
264                        while ((!done) && System.currentTimeMillis() < end) {
265                                try {
266                                        this.wait(end - System.currentTimeMillis());
267                                } catch (InterruptedException e) {
268                                        break;
269                                }
270                        }
271                        if (!done) {
272                                handler.handle(new FramsticksException().msg("waiter timed out"));
273                        }
274                }
275
276                public <T> FutureHandler<T> passInFuture(Class<T> type) {
277                        return new Future<T>(handler) {
278                                @Override
279                                protected void result(T result) {
280                                        Waiter.this.pass();
281                                }
282                        };
283                }
284        }
285
286
287        public static <C> void synchronize(Dispatcher<C> dispatcher, double seconds) {
288                final Waiter waiter = new Waiter(seconds, ThrowExceptionHandler.getInstance());
289                dispatcher.dispatch(new RunAt<C>(ThrowExceptionHandler.getInstance()) {
290                        @Override
291                        protected void runAt() {
292                                waiter.pass();
293                        }
294                });
295                waiter.waitFor();
296        }
297
298        public static <C> void dispatchLog(final Dispatcher<C> dispatcher, final Logger logger, final Level level, final Object text) {
299                dispatcher.dispatch(new RunAt<C>(ThrowExceptionHandler.getInstance()) {
300
301                        @Override
302                        protected void runAt() {
303                                logger.log(level, "message dispatched into {}: {}", dispatcher, text);
304                        }
305                });
306
307        }
308
309}
Note: See TracBrowser for help on using the repository browser.