[84] | 1 | package com.framsticks.util.dispatching; |
---|
| 2 | |
---|
[101] | 3 | import java.util.Timer; |
---|
| 4 | |
---|
[102] | 5 | import org.apache.logging.log4j.Level; |
---|
[100] | 6 | import org.apache.logging.log4j.Logger; |
---|
| 7 | import org.apache.logging.log4j.LogManager; |
---|
[88] | 8 | |
---|
[105] | 9 | import com.framsticks.util.ExceptionHandler; |
---|
[96] | 10 | import com.framsticks.util.FramsticksException; |
---|
[102] | 11 | import com.framsticks.util.Misc; |
---|
[84] | 12 | |
---|
| 13 | /** |
---|
| 14 | * @author Piotr Sniegowski |
---|
| 15 | */ |
---|
| 16 | public abstract class Dispatching { |
---|
[100] | 17 | private static final Logger log = LogManager.getLogger(Dispatching.class); |
---|
[84] | 18 | |
---|
[101] | 19 | protected static final Timer timer = new Timer(); |
---|
| 20 | |
---|
| 21 | public static Timer getTimer() { |
---|
| 22 | return timer; |
---|
| 23 | } |
---|
| 24 | |
---|
[85] | 25 | public static boolean isThreadSafe() { |
---|
| 26 | return true; |
---|
| 27 | } |
---|
[84] | 28 | |
---|
[90] | 29 | public static <C> void dispatchIfNotActive(Dispatcher<C> dispatcher, RunAt<? extends C> runnable) { |
---|
[85] | 30 | if (dispatcher.isActive()) { |
---|
[97] | 31 | runnable.runAt(); |
---|
[85] | 32 | return; |
---|
| 33 | } |
---|
[90] | 34 | dispatcher.dispatch(runnable); |
---|
[85] | 35 | } |
---|
[84] | 36 | |
---|
[102] | 37 | public static <C> void dispatch(Dispatcher<C> dispatcher, RunAt<? extends C> runnable) { |
---|
| 38 | dispatcher.dispatch(runnable); |
---|
| 39 | } |
---|
| 40 | |
---|
[85] | 41 | // public static boolean assertInvokeLater(Dispatcher dispatcher, RunAt runnable) { |
---|
[88] | 42 | // dispatcher.invokeLater(runnable); |
---|
| 43 | // return true; |
---|
[85] | 44 | // } |
---|
[84] | 45 | |
---|
[85] | 46 | public static <P, C> void invokeDispatch(Dispatcher<P> dispatcher, final Dispatcher<C> finalDispatcher, final RunAt<C> runnable) { |
---|
[97] | 47 | dispatcher.dispatch(new RunAt<P>(runnable) { |
---|
[85] | 48 | @Override |
---|
[97] | 49 | protected void runAt() { |
---|
[90] | 50 | finalDispatcher.dispatch(runnable); |
---|
[85] | 51 | } |
---|
| 52 | }); |
---|
| 53 | } |
---|
[84] | 54 | |
---|
[90] | 55 | public static void sleep(double seconds) { |
---|
[97] | 56 | log.debug("sleeping"); |
---|
[88] | 57 | try { |
---|
[90] | 58 | java.lang.Thread.sleep((long) (seconds * 1000)); |
---|
[88] | 59 | } catch (InterruptedException e) { |
---|
| 60 | |
---|
| 61 | } |
---|
[97] | 62 | log.debug("slept"); |
---|
[88] | 63 | } |
---|
| 64 | |
---|
| 65 | @SuppressWarnings("unchecked") |
---|
| 66 | public static void dispatcherGuardedInvoke(Joinable joinable, RunAt<?> runnable) { |
---|
| 67 | if (joinable instanceof Dispatcher) { |
---|
[90] | 68 | dispatchIfNotActive(Dispatcher.class.cast(joinable), runnable); |
---|
[88] | 69 | return; |
---|
| 70 | } |
---|
[97] | 71 | runnable.runAt(); |
---|
[88] | 72 | } |
---|
| 73 | |
---|
| 74 | public static void use(final Joinable joinable, final JoinableParent owner) { |
---|
[102] | 75 | Misc.throwIfNull(joinable); |
---|
[100] | 76 | log.debug("using {} by {}", joinable, owner); |
---|
[88] | 77 | if (joinable.use(owner)) { |
---|
[100] | 78 | log.debug("started {}", joinable); |
---|
[88] | 79 | } else { |
---|
[100] | 80 | log.debug("start of {} already happened", joinable); |
---|
[88] | 81 | } |
---|
| 82 | } |
---|
| 83 | |
---|
| 84 | public static void drop(final Joinable joinable, final JoinableParent owner) { |
---|
[102] | 85 | Misc.throwIfNull(joinable); |
---|
[100] | 86 | log.debug("droping {} by {}", joinable, owner); |
---|
[88] | 87 | if (joinable.drop(owner)) { |
---|
[100] | 88 | log.debug("stoped {}", joinable); |
---|
[88] | 89 | } else { |
---|
[100] | 90 | log.debug("stop of {} deferred", joinable); |
---|
[88] | 91 | } |
---|
| 92 | } |
---|
| 93 | |
---|
| 94 | public static void join(Joinable joinable) throws InterruptedException { |
---|
[102] | 95 | Misc.throwIfNull(joinable); |
---|
[100] | 96 | log.debug("joining {}", joinable); |
---|
[88] | 97 | try { |
---|
| 98 | joinable.join(); |
---|
| 99 | } catch (InterruptedException e) { |
---|
[100] | 100 | log.debug("failed to join {}", joinable); |
---|
[88] | 101 | throw e; |
---|
| 102 | } |
---|
[100] | 103 | log.debug("joined {}", joinable); |
---|
[88] | 104 | } |
---|
| 105 | |
---|
| 106 | public static void childChangedState(final JoinableParent parent, final Joinable joinable, final JoinableState state) { |
---|
[102] | 107 | if (state.ordinal() < JoinableState.RUNNING.ordinal()) { |
---|
[98] | 108 | return; |
---|
| 109 | } |
---|
[97] | 110 | dispatcherGuardedInvoke(joinable, new RunAt<Object>(ThrowExceptionHandler.getInstance()) { |
---|
[88] | 111 | @Override |
---|
[97] | 112 | protected void runAt() { |
---|
[100] | 113 | log.debug("joinable {} is notifying parent {} about change to {}", joinable, parent, state); |
---|
[88] | 114 | parent.childChangedState(joinable, state); |
---|
| 115 | } |
---|
| 116 | }); |
---|
| 117 | } |
---|
| 118 | |
---|
| 119 | public static void wait(Object object, long millis) { |
---|
| 120 | try { |
---|
[90] | 121 | synchronized (object) { |
---|
| 122 | object.wait(millis); |
---|
| 123 | } |
---|
[88] | 124 | } catch (InterruptedException e) { |
---|
[102] | 125 | log.debug("failed to wait on {} because of: {}", object, e.getMessage()); |
---|
[88] | 126 | } |
---|
| 127 | } |
---|
| 128 | |
---|
| 129 | public static void joinAbsolutely(Joinable joinable) { |
---|
[100] | 130 | log.debug("joining absolutely {}", joinable); |
---|
[88] | 131 | while (true) { |
---|
| 132 | try { |
---|
| 133 | Dispatching.join(joinable); |
---|
| 134 | return; |
---|
| 135 | } catch (InterruptedException e) { |
---|
[102] | 136 | log.debug("failed to join {} because of: {}", joinable, e.getMessage()); |
---|
[88] | 137 | // throw new FramsticksException().msg("failed to join").arg("dispatcher", dispatcher).cause(e); |
---|
| 138 | } |
---|
[100] | 139 | log.debug("waiting for {}", joinable); |
---|
[90] | 140 | wait(joinable, 500); |
---|
[88] | 141 | } |
---|
| 142 | } |
---|
| 143 | |
---|
[105] | 144 | public interface Query<T> extends ExceptionHandler { |
---|
[90] | 145 | T get(); |
---|
| 146 | } |
---|
[88] | 147 | |
---|
[98] | 148 | public static abstract class QueryHandler<T> implements Query<T> { |
---|
[105] | 149 | ExceptionHandler handler; |
---|
[98] | 150 | |
---|
| 151 | /** |
---|
| 152 | * @param handler |
---|
| 153 | */ |
---|
[105] | 154 | public QueryHandler(ExceptionHandler handler) { |
---|
[98] | 155 | this.handler = handler; |
---|
| 156 | } |
---|
| 157 | |
---|
| 158 | @Override |
---|
| 159 | public void handle(FramsticksException exception) { |
---|
| 160 | handler.handle(exception); |
---|
| 161 | } |
---|
| 162 | } |
---|
| 163 | |
---|
[90] | 164 | public static class QueryRunner<T, C> extends RunAt<C> { |
---|
| 165 | protected final Query<T> query; |
---|
| 166 | T result; |
---|
| 167 | boolean ready = false; |
---|
[88] | 168 | |
---|
[90] | 169 | /** |
---|
| 170 | * @param query |
---|
| 171 | */ |
---|
| 172 | public QueryRunner(Query<T> query) { |
---|
[98] | 173 | super(query); |
---|
[90] | 174 | this.query = query; |
---|
| 175 | } |
---|
[88] | 176 | |
---|
[90] | 177 | @Override |
---|
[97] | 178 | protected void runAt() { |
---|
[90] | 179 | result = query.get(); |
---|
| 180 | synchronized (this) { |
---|
| 181 | ready = true; |
---|
| 182 | this.notifyAll(); |
---|
| 183 | } |
---|
| 184 | } |
---|
| 185 | |
---|
| 186 | public T get() { |
---|
| 187 | synchronized (this) { |
---|
| 188 | while (!ready) { |
---|
| 189 | try { |
---|
| 190 | this.wait(100); |
---|
| 191 | } catch (InterruptedException e) { |
---|
| 192 | } |
---|
| 193 | } |
---|
| 194 | } |
---|
| 195 | return result; |
---|
| 196 | } |
---|
| 197 | } |
---|
| 198 | |
---|
| 199 | public static <T, C> T get(Dispatcher<C> dispatcher, Query<T> query) { |
---|
| 200 | QueryRunner<T, C> runner = new QueryRunner<T, C>(query); |
---|
| 201 | dispatcher.dispatch(runner); |
---|
| 202 | return runner.get(); |
---|
| 203 | } |
---|
| 204 | |
---|
[101] | 205 | // public static class DispatcherWaiter<C, T extends Dispatcher<C> & Joinable> implements Dispatcher<C> { |
---|
[102] | 206 | // // protected boolean done = false; |
---|
| 207 | // protected final T dispatcher; |
---|
| 208 | // protected RunAt<? extends C> runnable; |
---|
[98] | 209 | |
---|
[102] | 210 | // /** |
---|
| 211 | // * @param joinable |
---|
| 212 | // */ |
---|
| 213 | // public DispatcherWaiter(T dispatcher) { |
---|
| 214 | // this.dispatcher = dispatcher; |
---|
| 215 | // } |
---|
[98] | 216 | |
---|
[102] | 217 | // public synchronized void waitFor() { |
---|
| 218 | // while ((runnable == null) && (dispatcher.getState().ordinal() <= JoinableState.RUNNING.ordinal())) { |
---|
| 219 | // try { |
---|
| 220 | // this.wait(); |
---|
| 221 | // } catch (InterruptedException e) { |
---|
| 222 | // } |
---|
| 223 | // } |
---|
| 224 | // if (runnable != null) { |
---|
| 225 | // runnable.run(); |
---|
| 226 | // } |
---|
[98] | 227 | |
---|
[102] | 228 | // } |
---|
[98] | 229 | |
---|
[102] | 230 | // @Override |
---|
| 231 | // public boolean isActive() { |
---|
| 232 | // return dispatcher.isActive(); |
---|
| 233 | // } |
---|
[98] | 234 | |
---|
[102] | 235 | // @Override |
---|
| 236 | // public synchronized void dispatch(RunAt<? extends C> runnable) { |
---|
| 237 | // this.runnable = runnable; |
---|
| 238 | // this.notifyAll(); |
---|
| 239 | // } |
---|
[98] | 240 | |
---|
[101] | 241 | // } |
---|
[98] | 242 | |
---|
[96] | 243 | public static class Waiter { |
---|
| 244 | protected boolean done = false; |
---|
[90] | 245 | |
---|
[96] | 246 | protected final double timeOut; |
---|
[105] | 247 | protected final ExceptionHandler handler; |
---|
[90] | 248 | |
---|
[96] | 249 | /** |
---|
| 250 | * @param timeOut |
---|
| 251 | */ |
---|
[105] | 252 | public Waiter(double timeOut, ExceptionHandler handler) { |
---|
[96] | 253 | this.timeOut = timeOut; |
---|
[99] | 254 | this.handler = handler; |
---|
[96] | 255 | } |
---|
| 256 | |
---|
| 257 | public synchronized void pass() { |
---|
| 258 | done = true; |
---|
| 259 | this.notify(); |
---|
| 260 | } |
---|
| 261 | |
---|
| 262 | public synchronized void waitFor() { |
---|
| 263 | long end = System.currentTimeMillis() + (int)(timeOut * 1000); |
---|
| 264 | while ((!done) && System.currentTimeMillis() < end) { |
---|
| 265 | try { |
---|
| 266 | this.wait(end - System.currentTimeMillis()); |
---|
| 267 | } catch (InterruptedException e) { |
---|
| 268 | break; |
---|
| 269 | } |
---|
| 270 | } |
---|
| 271 | if (!done) { |
---|
[99] | 272 | handler.handle(new FramsticksException().msg("waiter timed out")); |
---|
[96] | 273 | } |
---|
| 274 | } |
---|
[99] | 275 | |
---|
[105] | 276 | public <T> FutureHandler<T> passInFuture(Class<T> type) { |
---|
| 277 | return new Future<T>(handler) { |
---|
[99] | 278 | @Override |
---|
| 279 | protected void result(T result) { |
---|
| 280 | Waiter.this.pass(); |
---|
| 281 | } |
---|
| 282 | }; |
---|
| 283 | } |
---|
[96] | 284 | } |
---|
| 285 | |
---|
| 286 | |
---|
| 287 | public static <C> void synchronize(Dispatcher<C> dispatcher, double seconds) { |
---|
[99] | 288 | final Waiter waiter = new Waiter(seconds, ThrowExceptionHandler.getInstance()); |
---|
[97] | 289 | dispatcher.dispatch(new RunAt<C>(ThrowExceptionHandler.getInstance()) { |
---|
[96] | 290 | @Override |
---|
[97] | 291 | protected void runAt() { |
---|
[96] | 292 | waiter.pass(); |
---|
| 293 | } |
---|
| 294 | }); |
---|
| 295 | waiter.waitFor(); |
---|
| 296 | } |
---|
| 297 | |
---|
[102] | 298 | public static <C> void dispatchLog(final Dispatcher<C> dispatcher, final Logger logger, final Level level, final Object text) { |
---|
| 299 | dispatcher.dispatch(new RunAt<C>(ThrowExceptionHandler.getInstance()) { |
---|
| 300 | |
---|
| 301 | @Override |
---|
| 302 | protected void runAt() { |
---|
| 303 | logger.log(level, "message dispatched into {}: {}", dispatcher, text); |
---|
| 304 | } |
---|
| 305 | }); |
---|
| 306 | |
---|
| 307 | } |
---|
| 308 | |
---|
[84] | 309 | } |
---|