source: java/main/src/main/java/com/framsticks/util/dispatching/Dispatching.java @ 193

Last change on this file since 193 was 193, checked in by Maciej Komosinski, 10 years ago

Set svn:eol-style native for all textual files

  • Property svn:eol-style set to native
File size: 7.4 KB
Line 
1package com.framsticks.util.dispatching;
2
3import java.util.Timer;
4
5import org.apache.logging.log4j.Level;
6import org.apache.logging.log4j.Logger;
7import org.apache.logging.log4j.LogManager;
8
9import com.framsticks.util.ExceptionHandler;
10import com.framsticks.util.FramsticksException;
11import com.framsticks.util.Misc;
12
13/**
14 * @author Piotr Sniegowski
15 */
16public abstract class Dispatching {
17        private static final Logger log = LogManager.getLogger(Dispatching.class);
18
19        protected static final Timer timer = new Timer();
20
21        public static Timer getTimer() {
22                return timer;
23        }
24
25        public static boolean isThreadSafe() {
26                return true;
27        }
28
29        public static <C> void dispatchIfNotActive(Dispatcher<C> dispatcher, RunAt<? extends C> runnable) {
30                if (dispatcher.isActive()) {
31                        runnable.runAt();
32                        return;
33                }
34                dispatcher.dispatch(runnable);
35        }
36
37        public static <C> void dispatch(Dispatcher<C> dispatcher, RunAt<? extends C> runnable) {
38                dispatcher.dispatch(runnable);
39        }
40
41        // public static boolean assertInvokeLater(Dispatcher dispatcher, RunAt runnable) {
42        //      dispatcher.invokeLater(runnable);
43        //      return true;
44        // }
45
46        public static <P, C> void invokeDispatch(Dispatcher<P> dispatcher, final Dispatcher<C> finalDispatcher, final RunAt<C> runnable) {
47                dispatcher.dispatch(new RunAt<P>(runnable) {
48                        @Override
49                        protected void runAt() {
50                                finalDispatcher.dispatch(runnable);
51                        }
52                });
53        }
54
55        public static void sleep(double seconds) {
56                log.debug("sleeping");
57                try {
58                        java.lang.Thread.sleep((long) (seconds * 1000));
59                } catch (InterruptedException e) {
60
61                }
62                log.debug("slept");
63        }
64
65        @SuppressWarnings("unchecked")
66        public static void dispatcherGuardedInvoke(Joinable joinable, RunAt<?> runnable) {
67                if (joinable instanceof Dispatcher) {
68                        dispatchIfNotActive(Dispatcher.class.cast(joinable), runnable);
69                        return;
70                }
71                runnable.runAt();
72        }
73
74        public static void use(final Joinable joinable, final JoinableParent owner) {
75                Misc.throwIfNull(joinable);
76                log.debug("using {} by {}", joinable, owner);
77                if (joinable.use(owner)) {
78                        log.debug("started {}", joinable);
79                } else {
80                        log.debug("start of {} already happened", joinable);
81                }
82        }
83
84        public static void drop(final Joinable joinable, final JoinableParent owner) {
85                Misc.throwIfNull(joinable);
86                log.debug("droping {} by {}", joinable, owner);
87                if (joinable.drop(owner)) {
88                        log.debug("stoped {}", joinable);
89                } else {
90                        log.debug("stop of {} deferred", joinable);
91                }
92        }
93
94        public static void join(Joinable joinable) throws InterruptedException {
95                Misc.throwIfNull(joinable);
96                log.debug("joining {}", joinable);
97                try {
98                        joinable.join();
99                } catch (InterruptedException e) {
100                        log.debug("failed to join {}", joinable);
101                        throw e;
102                }
103                log.debug("joined {}", joinable);
104        }
105
106        public static void childChangedState(final JoinableParent parent, final Joinable joinable, final JoinableState state) {
107                if (state.ordinal() < JoinableState.RUNNING.ordinal()) {
108                        return;
109                }
110                dispatcherGuardedInvoke(joinable, new RunAt<Object>(ThrowExceptionHandler.getInstance()) {
111                        @Override
112                        protected void runAt() {
113                                log.debug("joinable {} is notifying parent {} about change to {}", joinable, parent, state);
114                                parent.childChangedState(joinable, state);
115                        }
116                });
117        }
118
119        public static void wait(Object object, long millis) {
120                try {
121                        synchronized (object) {
122                                object.wait(millis);
123                        }
124                } catch (InterruptedException e) {
125                        log.debug("failed to wait on {} because of: {}", object, e.getMessage());
126                }
127        }
128
129        public static void joinAbsolutely(Joinable joinable) {
130                log.debug("joining absolutely {}", joinable);
131                while (true) {
132                        try {
133                                Dispatching.join(joinable);
134                                return;
135                        } catch (InterruptedException e) {
136                                log.debug("failed to join {} because of: {}", joinable, e.getMessage());
137                                // throw new FramsticksException().msg("failed to join").arg("dispatcher", dispatcher).cause(e);
138                        }
139                        log.debug("waiting for {}", joinable);
140                        wait(joinable, 500);
141                }
142        }
143
144        public interface Query<T> extends ExceptionHandler {
145                T get();
146        }
147
148        public static abstract class QueryHandler<T> implements Query<T> {
149                ExceptionHandler handler;
150
151                /**
152                 * @param handler
153                 */
154                public QueryHandler(ExceptionHandler handler) {
155                        this.handler = handler;
156                }
157
158                @Override
159                public void handle(FramsticksException exception) {
160                        handler.handle(exception);
161                }
162        }
163
164        public static class QueryRunner<T, C> extends RunAt<C> {
165                protected final Query<T> query;
166                T result;
167                boolean ready = false;
168
169                /**
170                 * @param query
171                 */
172                public QueryRunner(Query<T> query) {
173                        super(query);
174                        this.query = query;
175                }
176
177                @Override
178                protected void runAt() {
179                        result = query.get();
180                        synchronized (this) {
181                                ready = true;
182                                this.notifyAll();
183                        }
184                }
185
186                public T get() {
187                        synchronized (this) {
188                                while (!ready) {
189                                        try {
190                                                this.wait(100);
191                                        } catch (InterruptedException e) {
192                                        }
193                                }
194                        }
195                        return result;
196                }
197        }
198
199        public static <T, C> T get(Dispatcher<C> dispatcher, Query<T> query) {
200                QueryRunner<T, C> runner = new QueryRunner<T, C>(query);
201                dispatcher.dispatch(runner);
202                return runner.get();
203        }
204
205        // public static class DispatcherWaiter<C, T extends Dispatcher<C> & Joinable> implements Dispatcher<C> {
206        //      // protected boolean done = false;
207        //      protected final T dispatcher;
208        //      protected RunAt<? extends C> runnable;
209
210        //      /**
211        //       * @param joinable
212        //       */
213        //      public DispatcherWaiter(T dispatcher) {
214        //              this.dispatcher = dispatcher;
215        //      }
216
217        //      public synchronized void waitFor() {
218        //              while ((runnable == null) && (dispatcher.getState().ordinal() <= JoinableState.RUNNING.ordinal())) {
219        //                      try {
220        //                              this.wait();
221        //                      } catch (InterruptedException e) {
222        //                      }
223        //              }
224        //              if (runnable != null) {
225        //                      runnable.run();
226        //              }
227
228        //      }
229
230        //      @Override
231        //      public boolean isActive() {
232        //              return dispatcher.isActive();
233        //      }
234
235        //      @Override
236        //      public synchronized void dispatch(RunAt<? extends C> runnable) {
237        //              this.runnable = runnable;
238        //              this.notifyAll();
239        //      }
240
241        // }
242
243        public static class Waiter {
244                protected boolean done = false;
245
246                protected final double timeOut;
247                protected final ExceptionHandler handler;
248
249                /**
250                 * @param timeOut
251                 */
252                public Waiter(double timeOut, ExceptionHandler handler) {
253                        this.timeOut = timeOut;
254                        this.handler = handler;
255                }
256
257                public synchronized void pass() {
258                        done = true;
259                        this.notify();
260                }
261
262                public synchronized void waitFor() {
263                        long end = System.currentTimeMillis() + (int)(timeOut * 1000);
264                        while ((!done) && System.currentTimeMillis() < end) {
265                                try {
266                                        this.wait(end - System.currentTimeMillis());
267                                } catch (InterruptedException e) {
268                                        break;
269                                }
270                        }
271                        if (!done) {
272                                handler.handle(new FramsticksException().msg("waiter timed out"));
273                        }
274                }
275
276                public <T> FutureHandler<T> passInFuture(Class<T> type) {
277                        return new Future<T>(handler) {
278                                @Override
279                                protected void result(T result) {
280                                        Waiter.this.pass();
281                                }
282                        };
283                }
284        }
285
286
287        public static <C> void synchronize(Dispatcher<C> dispatcher, double seconds) {
288                final Waiter waiter = new Waiter(seconds, ThrowExceptionHandler.getInstance());
289                dispatcher.dispatch(new RunAt<C>(ThrowExceptionHandler.getInstance()) {
290                        @Override
291                        protected void runAt() {
292                                waiter.pass();
293                        }
294                });
295                waiter.waitFor();
296        }
297
298        public static <C> void dispatchLog(final Dispatcher<C> dispatcher, final Logger logger, final Level level, final Object text) {
299                dispatcher.dispatch(new RunAt<C>(ThrowExceptionHandler.getInstance()) {
300
301                        @Override
302                        protected void runAt() {
303                                logger.log(level, "message dispatched into {}: {}", dispatcher, text);
304                        }
305                });
306
307        }
308
309}
Note: See TracBrowser for help on using the repository browser.