Ignore:
Timestamp:
07/16/13 23:31:35 (11 years ago)
Author:
psniegowski
Message:

HIGHLIGHTS:

for Joinables running

CHANGELOG:
Add WorkPackageLogic? and classes representing prime experiment state.

Add classes for PrimeExperiment? state.

Extract single netload routine in Simulator.

Working netload with dummy content in PrimeExperiment?.

More development with NetLoadSaveLogic? and PrimeExperiment?.

Improvement around prime.

Improve BufferedDispatcher?.isActive logic.

Add prime-all.xml configuration.

Manual connecting to existing simulators from GUI.

Guard in SimulatorConnector? against expdef mismatch.

Guard against empty target dispatcher in BufferedDispatcher?.

Make BufferedDispatcher? a Dispatcher (and Joinable).

Minor improvements.

Done StackedJoinable?, improve Experiment.

Develop StackedJoinable?.

Add StackedJoinable? utility joinables controller.

Add dependency on apache-commons-lang.

Add ready ListChange? on Simulators.

Improve hints in ListChange?.

Several improvements.

Found bug with dispatching in Experiment.

Minor improvements.

Fix bug with early finishing Server.

Many changes in Dispatching.

Fix bug with connection.

Do not obfuscate log with socket related exceptions.

Add SocketClosedException?.

Add SimulatorConnector?.

Work out conception of experiment composing of logics building blocks.

Rename SinkInterface? to Sink.

Move saving of Accesses into AccessOperations?.

Some improvements to Experiment.

Improve joinables.

Fix issue with joinables closing.

Add direct and managed consoles to popup menu.

Location:
java/main/src/main/java/com/framsticks/util/dispatching
Files:
2 added
7 edited

Legend:

Unmodified
Added
Removed
  • java/main/src/main/java/com/framsticks/util/dispatching/AbstractJoinable.java

    r101 r102  
    1818
    1919        private static final Logger log = LogManager.getLogger(AbstractJoinable.class);
     20        private static final Logger reportLog = LogManager.getLogger(AbstractJoinable.class.getName() + ".Report");
    2021
    2122        protected final Set<JoinableParent> owners = new HashSet<JoinableParent>();
     
    3536        }
    3637
     38
     39
    3740        public static void report() {
     41                if (!reportLog.isDebugEnabled()) {
     42                        return;
     43                }
    3844                StringBuilder b = new StringBuilder();
    3945                synchronized (joinablesRegistry) {
     
    4248                        }
    4349                }
    44                 log.debug("state: {}", b);
     50                reportLog.debug("state: {}", b);
    4551        }
    4652
     
    5359                }
    5460                this.state = state;
    55 
    56                 log.debug("{} is notifying {} parents", this, joinableListeners.size());
     61                log.debug("{} changed state to {}", this, state);
     62
    5763
    5864                List<JoinableParent> parents = new LinkedList<>();
     
    6167                for (JoinableParent p : parents) {
    6268                        if (p != null) {
     69                                log.debug("{} is notifying parent {} about change to {}", this, p, state);
    6370                                Dispatching.childChangedState(p, this, state);
    6471                        }
     
    110117                }
    111118                joinableJoin();
    112                 synchronized (this) {
    113                         this.state = JoinableState.JOINED;
    114                 }
     119                changeState(JoinableState.JOINED);
     120                // synchronized (this) {
     121                //      this.state = JoinableState.JOINED;
     122                // }
    115123        }
    116124
     
    158166                synchronized (this) {
    159167                        if (!owners.contains(owner)) {
    160                                 return false;
    161                                 // throw new FramsticksException().msg("object is not owning that joinable").arg("joinable", this).arg("object", owner);
     168                                // log.error("owner {} is not owning that joinable {}", owner, this);
     169                                // return false;
     170                                throw new FramsticksException().msg("object is not owning that joinable").arg("joinable", this).arg("object", owner);
    162171                        }
    163172                        // assert owners.containsKey(owner);
    164                         log.debug("{} is droping {}", owner, this);
    165173                        owners.remove(owner);
    166174                        stop = owners.isEmpty();
     175                        log.debug("{} is droping {} (users remaining: {})", owner, this, owners);
    167176                }
    168177                if (stop) {
     
    195204        @Override
    196205        public String toString() {
    197                 return getName();
     206                return getClass().getSimpleName() + "(" + getName() + ")";
    198207        }
    199208
     
    203212        }
    204213
    205 
    206214}
  • java/main/src/main/java/com/framsticks/util/dispatching/BufferedDispatcher.java

    r101 r102  
    22
    33
    4 public class BufferedDispatcher<C> {
     4import org.apache.logging.log4j.Logger;
     5import org.apache.logging.log4j.LogManager;
     6
     7import com.framsticks.util.FramsticksException;
     8import com.framsticks.util.lang.Strings;
     9// import static com.framsticks.util.dispatching.AbstractJoinable.wrap;
     10
     11public class BufferedDispatcher<C> extends AbstractJoinable implements Dispatcher<C>, JoinableParent {
     12        private static final Logger log = LogManager.getLogger(BufferedDispatcher.class);
    513
    614        protected final RunnableQueue<C> queue = new RunnableQueue<>();
     
    816        protected Dispatcher<C> targetDispatcher;
    917
    10         protected boolean buffer = false;
     18        protected boolean buffer = true;
     19        protected Dispatcher<C> parent;
     20
     21        /**
     22         * @param parent
     23         */
     24        public BufferedDispatcher(Dispatcher<C> parent) {
     25                this.parent = parent;
     26        }
    1127
    1228        /**
     
    2137         */
    2238        public synchronized void setTargetDispatcher(Dispatcher<C> targetDispatcher) {
     39                if (this.targetDispatcher != null) {
     40                        throw new FramsticksException().msg("dispatcher is already set").arg("dispatcher", this.targetDispatcher).arg("in", this);
     41                }
     42                if (targetDispatcher == null) {
     43                        throw new FramsticksException().msg("trying to set empty target dispatcher").arg("in", this);
     44                }
     45                log.debug("setting {} to {}", this, targetDispatcher);
    2346                this.targetDispatcher = targetDispatcher;
     47                flushQueue();
    2448        }
    2549
    2650        public synchronized boolean isActive() {
    27                 if (targetDispatcher == null) {
     51                if (targetDispatcher == null || buffer) {
    2852                        return false;
    2953                        // throw new FramsticksException().msg("no dispatcher is set for tree yet").arg("tree", this);
     
    4064        }
    4165
     66        protected void flushQueue() {
     67                if (this.buffer || targetDispatcher == null) {
     68                        return;
     69                }
     70                log.debug("flushing {} tasks in {}", queue.size(), this);
     71                while (!queue.isEmpty()) {
     72                        targetDispatcher.dispatch(queue.pollFirst());
     73                }
     74        }
     75
    4276        public synchronized void setBuffer(final boolean buffer) {
    4377                if (this.buffer == buffer) {
     
    4983                }
    5084                this.buffer = false;
    51                 while (!queue.isEmpty()) {
    52                         targetDispatcher.dispatch(queue.pollFirst());
     85                flushQueue();
     86        }
     87
     88        @Override
     89        public String toString() {
     90                return parent + " -> " + Strings.toStringNullProof(targetDispatcher, "<null>");
     91        }
     92
     93        public void createThreadIfNeeded() {
     94                if (targetDispatcher != null) {
     95                        return;
    5396                }
     97                this.setTargetDispatcher(new Thread<C>().setName(parent.getName()));
     98        }
     99
     100        @Override
     101        public String getName() {
     102                return parent + " buffered dispatcher";
     103        }
     104
     105        @Override
     106        protected void joinableStart() {
     107                Dispatching.use(targetDispatcher, this);
     108        }
     109
     110        @Override
     111        protected void joinableInterrupt() {
     112                Dispatching.drop(targetDispatcher, this);
     113
     114                finishJoinable();
     115        }
     116
     117        @Override
     118        protected void joinableFinish() {
    54119
    55120        }
    56121
     122        @Override
     123        protected void joinableJoin() throws InterruptedException {
     124                Dispatching.join(targetDispatcher);
     125        }
    57126
     127        @Override
     128        public void childChangedState(Joinable joinable, JoinableState state) {
     129                proceedToState(state);
     130        }
    58131}
  • java/main/src/main/java/com/framsticks/util/dispatching/Dispatching.java

    r101 r102  
    33import java.util.Timer;
    44
     5import org.apache.logging.log4j.Level;
    56import org.apache.logging.log4j.Logger;
    67import org.apache.logging.log4j.LogManager;
    78
    89import com.framsticks.util.FramsticksException;
     10import com.framsticks.util.Misc;
    911
    1012/**
     
    3234        }
    3335
     36        public static <C> void dispatch(Dispatcher<C> dispatcher, RunAt<? extends C> runnable) {
     37                dispatcher.dispatch(runnable);
     38        }
     39
    3440        // public static boolean assertInvokeLater(Dispatcher dispatcher, RunAt runnable) {
    3541        //      dispatcher.invokeLater(runnable);
     
    6672
    6773        public static void use(final Joinable joinable, final JoinableParent owner) {
     74                Misc.throwIfNull(joinable);
    6875                log.debug("using {} by {}", joinable, owner);
    6976                if (joinable.use(owner)) {
     
    7582
    7683        public static void drop(final Joinable joinable, final JoinableParent owner) {
     84                Misc.throwIfNull(joinable);
    7785                log.debug("droping {} by {}", joinable, owner);
    7886                if (joinable.drop(owner)) {
     
    8492
    8593        public static void join(Joinable joinable) throws InterruptedException {
     94                Misc.throwIfNull(joinable);
    8695                log.debug("joining {}", joinable);
    8796                try {
     
    95104
    96105        public static void childChangedState(final JoinableParent parent, final Joinable joinable, final JoinableState state) {
    97                 if (state.ordinal() <= JoinableState.RUNNING.ordinal()) {
     106                if (state.ordinal() < JoinableState.RUNNING.ordinal()) {
    98107                        return;
    99108                }
     
    113122                        }
    114123                } catch (InterruptedException e) {
     124                        log.debug("failed to wait on {} because of: {}", object, e.getMessage());
    115125                }
    116126        }
     
    123133                                return;
    124134                        } catch (InterruptedException e) {
     135                                log.debug("failed to join {} because of: {}", joinable, e.getMessage());
    125136                                // throw new FramsticksException().msg("failed to join").arg("dispatcher", dispatcher).cause(e);
    126137                        }
     
    192203
    193204        // public static class DispatcherWaiter<C, T extends Dispatcher<C> & Joinable> implements Dispatcher<C> {
    194         //      // protected boolean done = false;
    195         //      protected final T dispatcher;
    196         //      protected RunAt<? extends C> runnable;
    197 
    198         //      /**
    199         //       * @param joinable
    200         //       */
    201         //      public DispatcherWaiter(T dispatcher) {
    202         //              this.dispatcher = dispatcher;
    203         //      }
    204 
    205         //      public synchronized void waitFor() {
    206         //              while ((runnable == null) && (dispatcher.getState().ordinal() <= JoinableState.RUNNING.ordinal())) {
    207         //                      try {
    208         //                              this.wait();
    209         //                      } catch (InterruptedException e) {
    210         //                      }
    211         //              }
    212         //              if (runnable != null) {
    213         //                      runnable.run();
    214         //              }
    215 
    216         //      }
    217 
    218         //      @Override
    219         //      public boolean isActive() {
    220         //              return dispatcher.isActive();
    221         //      }
    222 
    223         //      @Override
    224         //      public synchronized void dispatch(RunAt<? extends C> runnable) {
    225         //              this.runnable = runnable;
    226         //              this.notifyAll();
    227         //      }
     205        //      // protected boolean done = false;
     206        //      protected final T dispatcher;
     207        //      protected RunAt<? extends C> runnable;
     208
     209        //      /**
     210        //       * @param joinable
     211        //       */
     212        //      public DispatcherWaiter(T dispatcher) {
     213        //              this.dispatcher = dispatcher;
     214        //      }
     215
     216        //      public synchronized void waitFor() {
     217        //              while ((runnable == null) && (dispatcher.getState().ordinal() <= JoinableState.RUNNING.ordinal())) {
     218        //                      try {
     219        //                              this.wait();
     220        //                      } catch (InterruptedException e) {
     221        //                      }
     222        //              }
     223        //              if (runnable != null) {
     224        //                      runnable.run();
     225        //              }
     226
     227        //      }
     228
     229        //      @Override
     230        //      public boolean isActive() {
     231        //              return dispatcher.isActive();
     232        //      }
     233
     234        //      @Override
     235        //      public synchronized void dispatch(RunAt<? extends C> runnable) {
     236        //              this.runnable = runnable;
     237        //              this.notifyAll();
     238        //      }
    228239
    229240        // }
     
    284295        }
    285296
     297        public static <C> void dispatchLog(final Dispatcher<C> dispatcher, final Logger logger, final Level level, final Object text) {
     298                dispatcher.dispatch(new RunAt<C>(ThrowExceptionHandler.getInstance()) {
     299
     300                        @Override
     301                        protected void runAt() {
     302                                logger.log(level, "message dispatched into {}: {}", dispatcher, text);
     303                        }
     304                });
     305
     306        }
     307
    286308}
  • java/main/src/main/java/com/framsticks/util/dispatching/JoinableCollection.java

    r101 r102  
    2121        protected final Set<T> joinables = new HashSet<T>();
    2222
    23         protected boolean finishIfOne;
     23        public static enum FinishPolicy {
     24                Never,
     25                OnFirst,
     26                OnAll
     27        };
     28
     29        protected final FinishPolicy finishPolicy;
    2430
    2531        protected String observableName;
    2632
    2733        public JoinableCollection() {
    28                 this(false);
    29         }
    30 
    31         public JoinableCollection(boolean finishIfOne) {
    32                 this.finishIfOne = finishIfOne;
     34                this(FinishPolicy.OnAll);
     35        }
     36
     37
     38        /**
     39         * @param finishPolicy
     40         */
     41        public JoinableCollection(FinishPolicy finishPolicy) {
     42                this.finishPolicy = finishPolicy;
    3343        }
    3444
     
    95105
    96106        protected JoinableState getNextState() {
    97                 if (joinables.isEmpty()) {
     107                if ((finishPolicy == FinishPolicy.Never && state == JoinableState.RUNNING) || joinables.isEmpty()) {
    98108                        return state;
    99109                }
    100                 JoinableState result = finishIfOne ? JoinableState.INITILIAZED : JoinableState.JOINED;
     110                boolean oneIsEnough = (finishPolicy == FinishPolicy.OnFirst);
     111                JoinableState result = oneIsEnough ? JoinableState.INITILIAZED : JoinableState.JOINED;
    101112                for (Joinable j : joinables) {
    102113                        JoinableState s = j.getState();
    103                         if (finishIfOne) {
     114                        if (oneIsEnough) {
    104115                                if (s.ordinal() > result.ordinal()) {
    105116                                        result = s;
  • java/main/src/main/java/com/framsticks/util/dispatching/Monitor.java

    r100 r102  
    55import com.framsticks.util.dispatching.Dispatching;
    66import java.lang.Thread;
     7import java.util.concurrent.atomic.AtomicBoolean;
    78
    89public class Monitor implements JoinableParent {
     
    1213        protected final Joinable joinable;
    1314        protected final Thread shutdownHook;
     15        protected final AtomicBoolean dropped = new AtomicBoolean(false);
     16        protected final AtomicBoolean joining = new AtomicBoolean(false);
    1417
    1518        /**
     
    2225                        @Override
    2326                        public void run() {
    24                                 log.debug("running shutdown hook");
     27                                log.debug("droping and joining");
    2528                                Monitor.this.drop().join();
     29                                log.debug("droped and joined");
     30                                AbstractJoinable.report();
    2631                        }
    2732                });
     
    5459
    5560        public Monitor drop() {
     61                if (!dropped.compareAndSet(false, true)) {
     62                        return this;
     63                }
    5664                log.debug("{} is droping", this);
    5765                Dispatching.drop(joinable, this);
     
    6068
    6169        public Monitor join() {
     70                if (!joining.compareAndSet(false, true)) {
     71                        log.debug("not joining");
     72                        return this;
     73                }
     74
    6275                log.debug("{} is joining", this);
    6376                Dispatching.joinAbsolutely(joinable);
     
    6982                        /** In case IllegalStateException is caught, it means that JVM is in finalization stage */
    7083                }
    71 
    7284                return this;
    7385        }
  • java/main/src/main/java/com/framsticks/util/dispatching/RunnableQueue.java

    r101 r102  
    1919        }
    2020
     21        public int size() {
     22                return queue.size();
     23        }
     24
    2125}
  • java/main/src/main/java/com/framsticks/util/dispatching/Thread.java

    r101 r102  
    106106
    107107        @ParamAnnotation
    108         public void setName(String name) {
     108        public Thread<C> setName(String name) {
    109109                thread.setName(name);
     110                return this;
    110111        }
    111112
     
    119120        }
    120121
    121         @Override
    122         public String toString() {
    123                 return getName();
    124         }
     122        // @Override
     123        // public String toString() {
     124        //      return getName();
     125        // }
    125126
    126127        @Override
Note: See TracChangeset for help on using the changeset viewer.