source: java/main/src/main/java/com/framsticks/util/dispatching/Dispatching.java @ 96

Last change on this file since 96 was 96, checked in by psniegowski, 11 years ago

HIGHLIGHTS:

  • cleanup Instance management
    • extract Instance interface
    • extract Instance common algorithms to InstanceUtils?
  • fix closing issues: Ctrl+C or window close button

properly shutdown whole program

by Java Framsticks framework

  • fix parsing and printing of all request types
  • hide exception passing in special handle method of closures
    • substantially improve readability of closures
    • basically enable use of exception in asynchronous closures

(thrown exception is transported back to the caller)

  • implement call request on both sides

CHANGELOG:
Further improve calling.

Improve instance calling.

Calling is working on both sides.

Improve exception handling in testing.

Waiters do not supercede other apllication exception being thrown.

Finished parsing and printing of all request types (with tests).

Move implementation and tests of request parsing to Request.

Add tests for Requests.

Improve waits in asynchronours tests.

Extract more algorithms to InstanceUtils?.

Extract Instance.resolve to InstanceUtils?.

Improve naming.

Improve passing exception in InstanceClient?.

Hide calling of passed functor in StateCallback?.

Hide Exception passing in asynchronous closures.

Hide exception passing in Future.

Make ResponseCallback? an abstract class.

Make Future an abstract class.

Minor change.

Move getPath to Path.to()

Move bindAccess to InstanceUtils?.

Extract common things to InstanceUtils?.

Fix synchronization bug in Connection.

Move resolve to InstanceUtils?.

Allow names of Joinable to be dynamic.

Add support for set request server side.

More fixes in communication.

Fix issues with parsing in connection.

Cut new line characters when reading.

More improvements.

Migrate closures to FramsticksException?.

Several changes.

Extract resolveAndFetch to InstanceUtils? algorithms.

Test resolving and fetching.

More fixes with function signature deduction.

Do not print default values in SimpleAbstractAccess?.

Add test of FramsClass? printing.

Improve FramsticksException? messages.

Add explicit dispatcher synchronization feature.

Rework assertions in tests.

Previous solution was not generic enough.

Allow addition of joinables to collection after start.

Extract SimulatorInstance? from RemoteInstance?.

Remove PrivateJoinableCollection?.

Improve connections.

Move shutdown hook to inside the Monitor.

It should work in TestNG tests, but it seems that
hooks are not called.

In ServerTest? client connects to testing server.

Move socket initialization to receiver thread.

Add proper closing on Ctrl+C (don't use signals).

Fix bugs with server accepting connections.

Merge Entity into Joinable.

Reworking ServerInstance?.

Extract more algorithm to InstanceUtils?.

Extract some common functionality from AbstractInstance?.

Functions were placed in InstanceUtils?.

Hide registry of Instance.

Use ValueParam? in Instance interface.

Minor change.

Extract Instance interface.

Old Instance is now AbstractInstance?.

File size: 4.9 KB
Line 
1package com.framsticks.util.dispatching;
2
3import org.apache.log4j.Logger;
4
5import com.framsticks.util.FramsticksException;
6import com.framsticks.util.StateFunctor;
7
8/**
9 * @author Piotr Sniegowski
10 */
11public abstract class Dispatching {
12        private static final Logger log = Logger.getLogger(Dispatching.class);
13
14        public static boolean isThreadSafe() {
15                return true;
16        }
17
18        public static <C> void dispatchIfNotActive(Dispatcher<C> dispatcher, RunAt<? extends C> runnable) {
19                if (dispatcher.isActive()) {
20                        runnable.run();
21                        return;
22                }
23                dispatcher.dispatch(runnable);
24        }
25
26        //TODO RunAt StateFunctor
27        public static <C> void dispatchOk(Dispatcher<C> dispatcher, final StateFunctor stateFunctor) {
28                dispatcher.dispatch(new RunAt<C>() {
29                        @Override
30                        public void run() {
31                                stateFunctor.call();
32                        }
33                });
34        }
35
36        // public static boolean assertInvokeLater(Dispatcher dispatcher, RunAt runnable) {
37        //      dispatcher.invokeLater(runnable);
38        //      return true;
39        // }
40
41        public static <P, C> void invokeDispatch(Dispatcher<P> dispatcher, final Dispatcher<C> finalDispatcher, final RunAt<C> runnable) {
42                dispatcher.dispatch(new RunAt<P>() {
43                        @Override
44                        public void run() {
45                                finalDispatcher.dispatch(runnable);
46                        }
47                });
48        }
49
50        public static void sleep(double seconds) {
51                try {
52                        java.lang.Thread.sleep((long) (seconds * 1000));
53                } catch (InterruptedException e) {
54
55                }
56        }
57
58        @SuppressWarnings("unchecked")
59        public static void dispatcherGuardedInvoke(Joinable joinable, RunAt<?> runnable) {
60                if (joinable instanceof Dispatcher) {
61                        dispatchIfNotActive(Dispatcher.class.cast(joinable), runnable);
62                        return;
63                }
64                runnable.run();
65        }
66
67        public static void use(final Joinable joinable, final JoinableParent owner) {
68                log.debug("using " + joinable + " by " + owner);
69                if (joinable.use(owner)) {
70                        log.debug("started " + joinable);
71                } else {
72                        log.debug("start of " + joinable + " already happened");
73                }
74        }
75
76        public static void drop(final Joinable joinable, final JoinableParent owner) {
77                log.debug("droping " + joinable + " by " + owner);
78                if (joinable.drop(owner)) {
79                        log.debug("stoped " + joinable);
80                } else {
81                        log.debug("stop of " + joinable + " deferred");
82                }
83        }
84
85        public static void join(Joinable joinable) throws InterruptedException {
86                log.debug("joining " + joinable);
87                try {
88                        joinable.join();
89                } catch (InterruptedException e) {
90                        log.debug("failed to join " + joinable);
91                        throw e;
92                }
93                log.debug("joined " + joinable);
94        }
95
96        public static void childChangedState(final JoinableParent parent, final Joinable joinable, final JoinableState state) {
97                dispatcherGuardedInvoke(joinable, new RunAt<Object>() {
98                        @Override
99                        public void run() {
100                                log.debug("joinable " + joinable + " is notifying parent " + parent + " about change to " + state);
101                                parent.childChangedState(joinable, state);
102                        }
103                });
104        }
105
106        public static void wait(Object object, long millis) {
107                try {
108                        synchronized (object) {
109                                object.wait(millis);
110                        }
111                } catch (InterruptedException e) {
112                }
113        }
114
115        public static void joinAbsolutely(Joinable joinable) {
116                log.debug("joining absolutely " + joinable);
117                while (true) {
118                        try {
119                                Dispatching.join(joinable);
120                                return;
121                        } catch (InterruptedException e) {
122                                // throw new FramsticksException().msg("failed to join").arg("dispatcher", dispatcher).cause(e);
123                        }
124                        log.debug("waiting for " + joinable);
125                        wait(joinable, 500);
126                }
127        }
128
129        public interface Query<T> {
130                T get();
131        }
132
133        public static class QueryRunner<T, C> extends RunAt<C> {
134                protected final Query<T> query;
135                T result;
136                boolean ready = false;
137
138                /**
139                 * @param query
140                 */
141                public QueryRunner(Query<T> query) {
142                        this.query = query;
143                }
144
145                @Override
146                public void run() {
147                        result = query.get();
148                        synchronized (this) {
149                                ready = true;
150                                this.notifyAll();
151                        }
152                }
153
154                public T get() {
155                        synchronized (this) {
156                                while (!ready) {
157                                        try {
158                                                this.wait(100);
159                                        } catch (InterruptedException e) {
160                                        }
161                                }
162                        }
163                        return result;
164                }
165        }
166
167        public static <T, C> T get(Dispatcher<C> dispatcher, Query<T> query) {
168                QueryRunner<T, C> runner = new QueryRunner<T, C>(query);
169                dispatcher.dispatch(runner);
170                return runner.get();
171        }
172
173        public static class Waiter {
174                protected boolean done = false;
175
176                protected final double timeOut;
177
178                /**
179                 * @param timeOut
180                 */
181                public Waiter(double timeOut) {
182                        this.timeOut = timeOut;
183                }
184
185                public synchronized void pass() {
186                        done = true;
187                        this.notify();
188                }
189
190                public synchronized void waitFor() {
191                        long end = System.currentTimeMillis() + (int)(timeOut * 1000);
192                        while ((!done) && System.currentTimeMillis() < end) {
193                                try {
194                                        this.wait(end - System.currentTimeMillis());
195                                } catch (InterruptedException e) {
196                                        break;
197                                }
198                        }
199                        if (!done) {
200                                throw new FramsticksException().msg("waiter timed out");
201                        }
202                }
203        }
204
205
206        public static <C> void synchronize(Dispatcher<C> dispatcher, double seconds) {
207                final Waiter waiter = new Waiter(seconds);
208                dispatcher.dispatch(new RunAt<C>() {
209                        @Override
210                        public void run() {
211                                waiter.pass();
212                        }
213                });
214                waiter.waitFor();
215        }
216
217}
Note: See TracBrowser for help on using the repository browser.