package com.framsticks.util.dispatching; import java.util.Timer; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; import com.framsticks.util.ExceptionHandler; import com.framsticks.util.FramsticksException; import com.framsticks.util.Misc; /** * @author Piotr Sniegowski */ public abstract class Dispatching { private static final Logger log = LogManager.getLogger(Dispatching.class); protected static final Timer timer = new Timer(); public static Timer getTimer() { return timer; } public static boolean isThreadSafe() { return true; } public static void dispatchIfNotActive(Dispatcher dispatcher, RunAt runnable) { if (dispatcher.isActive()) { runnable.runAt(); return; } dispatcher.dispatch(runnable); } public static void dispatch(Dispatcher dispatcher, RunAt runnable) { dispatcher.dispatch(runnable); } // public static boolean assertInvokeLater(Dispatcher dispatcher, RunAt runnable) { // dispatcher.invokeLater(runnable); // return true; // } public static void invokeDispatch(Dispatcher

dispatcher, final Dispatcher finalDispatcher, final RunAt runnable) { dispatcher.dispatch(new RunAt

(runnable) { @Override protected void runAt() { finalDispatcher.dispatch(runnable); } }); } public static void sleep(double seconds) { log.debug("sleeping"); try { java.lang.Thread.sleep((long) (seconds * 1000)); } catch (InterruptedException e) { } log.debug("slept"); } @SuppressWarnings("unchecked") public static void dispatcherGuardedInvoke(Joinable joinable, RunAt runnable) { if (joinable instanceof Dispatcher) { dispatchIfNotActive(Dispatcher.class.cast(joinable), runnable); return; } runnable.runAt(); } public static void use(final Joinable joinable, final JoinableParent owner) { Misc.throwIfNull(joinable); log.debug("using {} by {}", joinable, owner); if (joinable.use(owner)) { log.debug("started {}", joinable); } else { log.debug("start of {} already happened", joinable); } } public static void drop(final Joinable joinable, final JoinableParent owner) { Misc.throwIfNull(joinable); log.debug("droping {} by {}", joinable, owner); if (joinable.drop(owner)) { log.debug("stoped {}", joinable); } else { log.debug("stop of {} deferred", joinable); } } public static void join(Joinable joinable) throws InterruptedException { Misc.throwIfNull(joinable); log.debug("joining {}", joinable); try { joinable.join(); } catch (InterruptedException e) { log.debug("failed to join {}", joinable); throw e; } log.debug("joined {}", joinable); } public static void childChangedState(final JoinableParent parent, final Joinable joinable, final JoinableState state) { if (state.ordinal() < JoinableState.RUNNING.ordinal()) { return; } dispatcherGuardedInvoke(joinable, new RunAt(ThrowExceptionHandler.getInstance()) { @Override protected void runAt() { log.debug("joinable {} is notifying parent {} about change to {}", joinable, parent, state); parent.childChangedState(joinable, state); } }); } public static void wait(Object object, long millis) { try { synchronized (object) { object.wait(millis); } } catch (InterruptedException e) { log.debug("failed to wait on {} because of: {}", object, e.getMessage()); } } public static void joinAbsolutely(Joinable joinable) { log.debug("joining absolutely {}", joinable); while (true) { try { Dispatching.join(joinable); return; } catch (InterruptedException e) { log.debug("failed to join {} because of: {}", joinable, e.getMessage()); // throw new FramsticksException().msg("failed to join").arg("dispatcher", dispatcher).cause(e); } log.debug("waiting for {}", joinable); wait(joinable, 500); } } public interface Query extends ExceptionHandler { T get(); } public static abstract class QueryHandler implements Query { ExceptionHandler handler; /** * @param handler */ public QueryHandler(ExceptionHandler handler) { this.handler = handler; } @Override public void handle(FramsticksException exception) { handler.handle(exception); } } public static class QueryRunner extends RunAt { protected final Query query; T result; boolean ready = false; /** * @param query */ public QueryRunner(Query query) { super(query); this.query = query; } @Override protected void runAt() { result = query.get(); synchronized (this) { ready = true; this.notifyAll(); } } public T get() { synchronized (this) { while (!ready) { try { this.wait(100); } catch (InterruptedException e) { } } } return result; } } public static T get(Dispatcher dispatcher, Query query) { QueryRunner runner = new QueryRunner(query); dispatcher.dispatch(runner); return runner.get(); } // public static class DispatcherWaiter & Joinable> implements Dispatcher { // // protected boolean done = false; // protected final T dispatcher; // protected RunAt runnable; // /** // * @param joinable // */ // public DispatcherWaiter(T dispatcher) { // this.dispatcher = dispatcher; // } // public synchronized void waitFor() { // while ((runnable == null) && (dispatcher.getState().ordinal() <= JoinableState.RUNNING.ordinal())) { // try { // this.wait(); // } catch (InterruptedException e) { // } // } // if (runnable != null) { // runnable.run(); // } // } // @Override // public boolean isActive() { // return dispatcher.isActive(); // } // @Override // public synchronized void dispatch(RunAt runnable) { // this.runnable = runnable; // this.notifyAll(); // } // } public static class Waiter { protected boolean done = false; protected final double timeOut; protected final ExceptionHandler handler; /** * @param timeOut */ public Waiter(double timeOut, ExceptionHandler handler) { this.timeOut = timeOut; this.handler = handler; } public synchronized void pass() { done = true; this.notify(); } public synchronized void waitFor() { long end = System.currentTimeMillis() + (int)(timeOut * 1000); while ((!done) && System.currentTimeMillis() < end) { try { this.wait(end - System.currentTimeMillis()); } catch (InterruptedException e) { break; } } if (!done) { handler.handle(new FramsticksException().msg("waiter timed out")); } } public FutureHandler passInFuture(Class type) { return new Future(handler) { @Override protected void result(T result) { Waiter.this.pass(); } }; } } public static void synchronize(Dispatcher dispatcher, double seconds) { final Waiter waiter = new Waiter(seconds, ThrowExceptionHandler.getInstance()); dispatcher.dispatch(new RunAt(ThrowExceptionHandler.getInstance()) { @Override protected void runAt() { waiter.pass(); } }); waiter.waitFor(); } public static void dispatchLog(final Dispatcher dispatcher, final Logger logger, final Level level, final Object text) { dispatcher.dispatch(new RunAt(ThrowExceptionHandler.getInstance()) { @Override protected void runAt() { logger.log(level, "message dispatched into {}: {}", dispatcher, text); } }); } }