Changeset 102 for java/main/src/main/java/com/framsticks/util/dispatching
- Timestamp:
- 07/16/13 23:31:35 (11 years ago)
- Location:
- java/main/src/main/java/com/framsticks/util/dispatching
- Files:
-
- 2 added
- 7 edited
Legend:
- Unmodified
- Added
- Removed
-
java/main/src/main/java/com/framsticks/util/dispatching/AbstractJoinable.java
r101 r102 18 18 19 19 private static final Logger log = LogManager.getLogger(AbstractJoinable.class); 20 private static final Logger reportLog = LogManager.getLogger(AbstractJoinable.class.getName() + ".Report"); 20 21 21 22 protected final Set<JoinableParent> owners = new HashSet<JoinableParent>(); … … 35 36 } 36 37 38 39 37 40 public static void report() { 41 if (!reportLog.isDebugEnabled()) { 42 return; 43 } 38 44 StringBuilder b = new StringBuilder(); 39 45 synchronized (joinablesRegistry) { … … 42 48 } 43 49 } 44 log.debug("state: {}", b);50 reportLog.debug("state: {}", b); 45 51 } 46 52 … … 53 59 } 54 60 this.state = state; 55 56 log.debug("{} is notifying {} parents", this, joinableListeners.size()); 61 log.debug("{} changed state to {}", this, state); 62 57 63 58 64 List<JoinableParent> parents = new LinkedList<>(); … … 61 67 for (JoinableParent p : parents) { 62 68 if (p != null) { 69 log.debug("{} is notifying parent {} about change to {}", this, p, state); 63 70 Dispatching.childChangedState(p, this, state); 64 71 } … … 110 117 } 111 118 joinableJoin(); 112 synchronized (this) { 113 this.state = JoinableState.JOINED; 114 } 119 changeState(JoinableState.JOINED); 120 // synchronized (this) { 121 // this.state = JoinableState.JOINED; 122 // } 115 123 } 116 124 … … 158 166 synchronized (this) { 159 167 if (!owners.contains(owner)) { 160 return false; 161 // throw new FramsticksException().msg("object is not owning that joinable").arg("joinable", this).arg("object", owner); 168 // log.error("owner {} is not owning that joinable {}", owner, this); 169 // return false; 170 throw new FramsticksException().msg("object is not owning that joinable").arg("joinable", this).arg("object", owner); 162 171 } 163 172 // assert owners.containsKey(owner); 164 log.debug("{} is droping {}", owner, this);165 173 owners.remove(owner); 166 174 stop = owners.isEmpty(); 175 log.debug("{} is droping {} (users remaining: {})", owner, this, owners); 167 176 } 168 177 if (stop) { … … 195 204 @Override 196 205 public String toString() { 197 return get Name();206 return getClass().getSimpleName() + "(" + getName() + ")"; 198 207 } 199 208 … … 203 212 } 204 213 205 206 214 } -
java/main/src/main/java/com/framsticks/util/dispatching/BufferedDispatcher.java
r101 r102 2 2 3 3 4 public class BufferedDispatcher<C> { 4 import org.apache.logging.log4j.Logger; 5 import org.apache.logging.log4j.LogManager; 6 7 import com.framsticks.util.FramsticksException; 8 import com.framsticks.util.lang.Strings; 9 // import static com.framsticks.util.dispatching.AbstractJoinable.wrap; 10 11 public class BufferedDispatcher<C> extends AbstractJoinable implements Dispatcher<C>, JoinableParent { 12 private static final Logger log = LogManager.getLogger(BufferedDispatcher.class); 5 13 6 14 protected final RunnableQueue<C> queue = new RunnableQueue<>(); … … 8 16 protected Dispatcher<C> targetDispatcher; 9 17 10 protected boolean buffer = false; 18 protected boolean buffer = true; 19 protected Dispatcher<C> parent; 20 21 /** 22 * @param parent 23 */ 24 public BufferedDispatcher(Dispatcher<C> parent) { 25 this.parent = parent; 26 } 11 27 12 28 /** … … 21 37 */ 22 38 public synchronized void setTargetDispatcher(Dispatcher<C> targetDispatcher) { 39 if (this.targetDispatcher != null) { 40 throw new FramsticksException().msg("dispatcher is already set").arg("dispatcher", this.targetDispatcher).arg("in", this); 41 } 42 if (targetDispatcher == null) { 43 throw new FramsticksException().msg("trying to set empty target dispatcher").arg("in", this); 44 } 45 log.debug("setting {} to {}", this, targetDispatcher); 23 46 this.targetDispatcher = targetDispatcher; 47 flushQueue(); 24 48 } 25 49 26 50 public synchronized boolean isActive() { 27 if (targetDispatcher == null ) {51 if (targetDispatcher == null || buffer) { 28 52 return false; 29 53 // throw new FramsticksException().msg("no dispatcher is set for tree yet").arg("tree", this); … … 40 64 } 41 65 66 protected void flushQueue() { 67 if (this.buffer || targetDispatcher == null) { 68 return; 69 } 70 log.debug("flushing {} tasks in {}", queue.size(), this); 71 while (!queue.isEmpty()) { 72 targetDispatcher.dispatch(queue.pollFirst()); 73 } 74 } 75 42 76 public synchronized void setBuffer(final boolean buffer) { 43 77 if (this.buffer == buffer) { … … 49 83 } 50 84 this.buffer = false; 51 while (!queue.isEmpty()) { 52 targetDispatcher.dispatch(queue.pollFirst()); 85 flushQueue(); 86 } 87 88 @Override 89 public String toString() { 90 return parent + " -> " + Strings.toStringNullProof(targetDispatcher, "<null>"); 91 } 92 93 public void createThreadIfNeeded() { 94 if (targetDispatcher != null) { 95 return; 53 96 } 97 this.setTargetDispatcher(new Thread<C>().setName(parent.getName())); 98 } 99 100 @Override 101 public String getName() { 102 return parent + " buffered dispatcher"; 103 } 104 105 @Override 106 protected void joinableStart() { 107 Dispatching.use(targetDispatcher, this); 108 } 109 110 @Override 111 protected void joinableInterrupt() { 112 Dispatching.drop(targetDispatcher, this); 113 114 finishJoinable(); 115 } 116 117 @Override 118 protected void joinableFinish() { 54 119 55 120 } 56 121 122 @Override 123 protected void joinableJoin() throws InterruptedException { 124 Dispatching.join(targetDispatcher); 125 } 57 126 127 @Override 128 public void childChangedState(Joinable joinable, JoinableState state) { 129 proceedToState(state); 130 } 58 131 } -
java/main/src/main/java/com/framsticks/util/dispatching/Dispatching.java
r101 r102 3 3 import java.util.Timer; 4 4 5 import org.apache.logging.log4j.Level; 5 6 import org.apache.logging.log4j.Logger; 6 7 import org.apache.logging.log4j.LogManager; 7 8 8 9 import com.framsticks.util.FramsticksException; 10 import com.framsticks.util.Misc; 9 11 10 12 /** … … 32 34 } 33 35 36 public static <C> void dispatch(Dispatcher<C> dispatcher, RunAt<? extends C> runnable) { 37 dispatcher.dispatch(runnable); 38 } 39 34 40 // public static boolean assertInvokeLater(Dispatcher dispatcher, RunAt runnable) { 35 41 // dispatcher.invokeLater(runnable); … … 66 72 67 73 public static void use(final Joinable joinable, final JoinableParent owner) { 74 Misc.throwIfNull(joinable); 68 75 log.debug("using {} by {}", joinable, owner); 69 76 if (joinable.use(owner)) { … … 75 82 76 83 public static void drop(final Joinable joinable, final JoinableParent owner) { 84 Misc.throwIfNull(joinable); 77 85 log.debug("droping {} by {}", joinable, owner); 78 86 if (joinable.drop(owner)) { … … 84 92 85 93 public static void join(Joinable joinable) throws InterruptedException { 94 Misc.throwIfNull(joinable); 86 95 log.debug("joining {}", joinable); 87 96 try { … … 95 104 96 105 public static void childChangedState(final JoinableParent parent, final Joinable joinable, final JoinableState state) { 97 if (state.ordinal() < =JoinableState.RUNNING.ordinal()) {106 if (state.ordinal() < JoinableState.RUNNING.ordinal()) { 98 107 return; 99 108 } … … 113 122 } 114 123 } catch (InterruptedException e) { 124 log.debug("failed to wait on {} because of: {}", object, e.getMessage()); 115 125 } 116 126 } … … 123 133 return; 124 134 } catch (InterruptedException e) { 135 log.debug("failed to join {} because of: {}", joinable, e.getMessage()); 125 136 // throw new FramsticksException().msg("failed to join").arg("dispatcher", dispatcher).cause(e); 126 137 } … … 192 203 193 204 // public static class DispatcherWaiter<C, T extends Dispatcher<C> & Joinable> implements Dispatcher<C> { 194 // 195 // 196 // 197 198 // 199 // 200 // 201 // 202 // 203 // 204 205 // 206 // 207 // 208 // 209 // 210 // 211 // 212 // 213 // 214 // 215 216 // 217 218 // 219 // 220 // 221 // 222 223 // 224 // 225 // 226 // 227 // 205 // // protected boolean done = false; 206 // protected final T dispatcher; 207 // protected RunAt<? extends C> runnable; 208 209 // /** 210 // * @param joinable 211 // */ 212 // public DispatcherWaiter(T dispatcher) { 213 // this.dispatcher = dispatcher; 214 // } 215 216 // public synchronized void waitFor() { 217 // while ((runnable == null) && (dispatcher.getState().ordinal() <= JoinableState.RUNNING.ordinal())) { 218 // try { 219 // this.wait(); 220 // } catch (InterruptedException e) { 221 // } 222 // } 223 // if (runnable != null) { 224 // runnable.run(); 225 // } 226 227 // } 228 229 // @Override 230 // public boolean isActive() { 231 // return dispatcher.isActive(); 232 // } 233 234 // @Override 235 // public synchronized void dispatch(RunAt<? extends C> runnable) { 236 // this.runnable = runnable; 237 // this.notifyAll(); 238 // } 228 239 229 240 // } … … 284 295 } 285 296 297 public static <C> void dispatchLog(final Dispatcher<C> dispatcher, final Logger logger, final Level level, final Object text) { 298 dispatcher.dispatch(new RunAt<C>(ThrowExceptionHandler.getInstance()) { 299 300 @Override 301 protected void runAt() { 302 logger.log(level, "message dispatched into {}: {}", dispatcher, text); 303 } 304 }); 305 306 } 307 286 308 } -
java/main/src/main/java/com/framsticks/util/dispatching/JoinableCollection.java
r101 r102 21 21 protected final Set<T> joinables = new HashSet<T>(); 22 22 23 protected boolean finishIfOne; 23 public static enum FinishPolicy { 24 Never, 25 OnFirst, 26 OnAll 27 }; 28 29 protected final FinishPolicy finishPolicy; 24 30 25 31 protected String observableName; 26 32 27 33 public JoinableCollection() { 28 this(false); 29 } 30 31 public JoinableCollection(boolean finishIfOne) { 32 this.finishIfOne = finishIfOne; 34 this(FinishPolicy.OnAll); 35 } 36 37 38 /** 39 * @param finishPolicy 40 */ 41 public JoinableCollection(FinishPolicy finishPolicy) { 42 this.finishPolicy = finishPolicy; 33 43 } 34 44 … … 95 105 96 106 protected JoinableState getNextState() { 97 if ( joinables.isEmpty()) {107 if ((finishPolicy == FinishPolicy.Never && state == JoinableState.RUNNING) || joinables.isEmpty()) { 98 108 return state; 99 109 } 100 JoinableState result = finishIfOne ? JoinableState.INITILIAZED : JoinableState.JOINED; 110 boolean oneIsEnough = (finishPolicy == FinishPolicy.OnFirst); 111 JoinableState result = oneIsEnough ? JoinableState.INITILIAZED : JoinableState.JOINED; 101 112 for (Joinable j : joinables) { 102 113 JoinableState s = j.getState(); 103 if ( finishIfOne) {114 if (oneIsEnough) { 104 115 if (s.ordinal() > result.ordinal()) { 105 116 result = s; -
java/main/src/main/java/com/framsticks/util/dispatching/Monitor.java
r100 r102 5 5 import com.framsticks.util.dispatching.Dispatching; 6 6 import java.lang.Thread; 7 import java.util.concurrent.atomic.AtomicBoolean; 7 8 8 9 public class Monitor implements JoinableParent { … … 12 13 protected final Joinable joinable; 13 14 protected final Thread shutdownHook; 15 protected final AtomicBoolean dropped = new AtomicBoolean(false); 16 protected final AtomicBoolean joining = new AtomicBoolean(false); 14 17 15 18 /** … … 22 25 @Override 23 26 public void run() { 24 log.debug(" running shutdown hook");27 log.debug("droping and joining"); 25 28 Monitor.this.drop().join(); 29 log.debug("droped and joined"); 30 AbstractJoinable.report(); 26 31 } 27 32 }); … … 54 59 55 60 public Monitor drop() { 61 if (!dropped.compareAndSet(false, true)) { 62 return this; 63 } 56 64 log.debug("{} is droping", this); 57 65 Dispatching.drop(joinable, this); … … 60 68 61 69 public Monitor join() { 70 if (!joining.compareAndSet(false, true)) { 71 log.debug("not joining"); 72 return this; 73 } 74 62 75 log.debug("{} is joining", this); 63 76 Dispatching.joinAbsolutely(joinable); … … 69 82 /** In case IllegalStateException is caught, it means that JVM is in finalization stage */ 70 83 } 71 72 84 return this; 73 85 } -
java/main/src/main/java/com/framsticks/util/dispatching/RunnableQueue.java
r101 r102 19 19 } 20 20 21 public int size() { 22 return queue.size(); 23 } 24 21 25 } -
java/main/src/main/java/com/framsticks/util/dispatching/Thread.java
r101 r102 106 106 107 107 @ParamAnnotation 108 public voidsetName(String name) {108 public Thread<C> setName(String name) { 109 109 thread.setName(name); 110 return this; 110 111 } 111 112 … … 119 120 } 120 121 121 @Override122 public String toString() {123 return getName();124 }122 // @Override 123 // public String toString() { 124 // return getName(); 125 // } 125 126 126 127 @Override
Note: See TracChangeset
for help on using the changeset viewer.