source: java/main/src/main/java/com/framsticks/remote/RemoteTree.java @ 102

Last change on this file since 102 was 102, checked in by psniegowski, 11 years ago

HIGHLIGHTS:

for Joinables running

CHANGELOG:
Add WorkPackageLogic? and classes representing prime experiment state.

Add classes for PrimeExperiment? state.

Extract single netload routine in Simulator.

Working netload with dummy content in PrimeExperiment?.

More development with NetLoadSaveLogic? and PrimeExperiment?.

Improvement around prime.

Improve BufferedDispatcher?.isActive logic.

Add prime-all.xml configuration.

Manual connecting to existing simulators from GUI.

Guard in SimulatorConnector? against expdef mismatch.

Guard against empty target dispatcher in BufferedDispatcher?.

Make BufferedDispatcher? a Dispatcher (and Joinable).

Minor improvements.

Done StackedJoinable?, improve Experiment.

Develop StackedJoinable?.

Add StackedJoinable? utility joinables controller.

Add dependency on apache-commons-lang.

Add ready ListChange? on Simulators.

Improve hints in ListChange?.

Several improvements.

Found bug with dispatching in Experiment.

Minor improvements.

Fix bug with early finishing Server.

Many changes in Dispatching.

Fix bug with connection.

Do not obfuscate log with socket related exceptions.

Add SocketClosedException?.

Add SimulatorConnector?.

Work out conception of experiment composing of logics building blocks.

Rename SinkInterface? to Sink.

Move saving of Accesses into AccessOperations?.

Some improvements to Experiment.

Improve joinables.

Fix issue with joinables closing.

Add direct and managed consoles to popup menu.

File size: 11.4 KB
Line 
1package com.framsticks.remote;
2
3import com.framsticks.communication.*;
4import com.framsticks.communication.queries.CallRequest;
5import com.framsticks.communication.queries.GetRequest;
6import com.framsticks.communication.queries.InfoRequest;
7import com.framsticks.communication.queries.SetRequest;
8import com.framsticks.core.AbstractTree;
9import com.framsticks.core.ListChange;
10import com.framsticks.core.Path;
11import com.framsticks.core.ValueChange;
12import com.framsticks.params.*;
13import com.framsticks.params.EventListener;
14import com.framsticks.params.annotations.FramsClassAnnotation;
15import com.framsticks.params.annotations.ParamAnnotation;
16import com.framsticks.params.types.EventParam;
17import com.framsticks.params.types.ProcedureParam;
18import com.framsticks.parsers.Loaders;
19import com.framsticks.parsers.MultiParamLoader;
20import com.framsticks.util.*;
21import com.framsticks.util.dispatching.AtOnceDispatcher;
22import com.framsticks.util.dispatching.Dispatching;
23import com.framsticks.util.dispatching.DispatchingFuture;
24import com.framsticks.util.dispatching.ExceptionResultHandler;
25import com.framsticks.util.dispatching.Future;
26import com.framsticks.util.dispatching.FutureHandler;
27import com.framsticks.util.dispatching.Joinable;
28import com.framsticks.util.dispatching.JoinableParent;
29import com.framsticks.util.dispatching.JoinableState;
30import com.framsticks.util.dispatching.ThrowExceptionHandler;
31import com.framsticks.util.lang.Casting;
32import com.framsticks.util.dispatching.RunAt;
33import static com.framsticks.core.TreeOperations.*;
34
35import java.util.*;
36
37import javax.annotation.Nonnull;
38
39import org.apache.logging.log4j.Logger;
40import org.apache.logging.log4j.LogManager;
41
42/**
43 * @author Piotr Sniegowski
44 */
45@FramsClassAnnotation
46public final class RemoteTree extends AbstractTree implements JoinableParent {
47
48        private final static Logger log = LogManager.getLogger(RemoteTree.class);
49
50        protected ClientSideManagedConnection connection;
51
52        public RemoteTree() {
53                bufferedDispatcher.setBuffer(true);
54                registry.registerAndBuild(ListChange.class);
55                registry.registerAndBuild(ValueChange.class);
56        }
57
58        public void setAddress(Address address) {
59                this.connection = Connection.to(new ClientSideManagedConnection(), address);
60                this.connection.setExceptionHandler(this);
61                this.connection.setNeedFileAcceptor(this);
62        }
63
64        @ParamAnnotation
65        public void setAddress(String address) {
66                setAddress(new Address(address));
67        }
68
69        @ParamAnnotation(flags = ParamFlags.USERREADONLY)
70        public String getAddress() {
71                return connection == null ? "<disconnected>" : connection.getAddress().toString();
72        }
73
74        public final ClientSideManagedConnection getConnection() {
75                return connection;
76        }
77
78
79        @Override
80        public String toString() {
81                return super.toString() + "[" + getAddress() + "]";
82        }
83
84        protected ExceptionResultHandler pathRemoveHandler(final Path path, final ExceptionResultHandler handler) {
85                return new ExceptionResultHandler() {
86
87                        @Override
88                        public void handle(final FramsticksException exception) {
89                                Dispatching.dispatchIfNotActive(RemoteTree.this, new RunAt<RemoteTree>(RemoteTree.this) {
90
91                                        @Override
92                                        protected void runAt() {
93                                                assert path.getTree().isActive();
94                                                log.info("path is invalid (removing): {}", path);
95                                                bindAccess(path.getUnder()).set(path.getTop().getParam(), null);
96                                                handler.handle(exception);
97                                        }
98                                });
99                        }
100                };
101        }
102
103        // @Override
104        // public void get(final Path path, final ValueParam param, final Future<Object> future) {
105        //      assert isActive();
106        //      assert param != null;
107        //      // assert path.isResolved();
108        //      connection.send(new GetRequest().field(param.getId()).path(path.getTextual()), this, new ClientSideResponseFuture(pathRemoveHandler(path, future)) {
109        //              @Override
110        //              protected void processOk(Response response) {
111        //                      assert isActive();
112        //                      processFetchedValues(path, response.getFiles());
113        //                      future.pass(bindAccess(path.tryResolveIfNeeded()).get(param, Object.class));
114        //              }
115        //      });
116        // }
117
118        protected final Map<String, Set<Future<FramsClass>>> infoRequests = new HashMap<String, Set<Future<FramsClass>>>();
119
120
121        /** Fetch information.
122         *
123         * This method does not check whether the info is already in the cache, it will always issue a request or join
124         * with other already issued but returned info request. Too issue request only when needed, use TreeOperations.findInfo()
125         *
126         * */
127        @Override
128        public void info(final Path path, final Future<FramsClass> future) {
129
130                final String name = path.getTop().getParam().getContainedTypeName();
131
132                if (infoRequests.containsKey(name)) {
133                        infoRequests.get(name).add(future);
134                        return;
135                }
136
137                log.debug("issuing info request for {}", name);
138                final Set<Future<FramsClass>> futures = new HashSet<Future<FramsClass>>();
139                futures.add(future);
140                infoRequests.put(name, futures);
141
142                final Future<FramsClass> compositeFuture = DispatchingFuture.create(this, new Future<FramsClass>() {
143
144                        @Override
145                        public void handle(FramsticksException exception) {
146                                assert isActive();
147                                infoRequests.remove(name);
148                                for (Future<FramsClass> f : futures) {
149                                        f.handle(exception);
150                                }
151                        }
152
153                        @Override
154                        protected void result(FramsClass framsClass) {
155                                assert isActive();
156                                putInfoIntoCache(framsClass);
157                                infoRequests.remove(name);
158                                for (Future<FramsClass> f : futures) {
159                                        f.pass(framsClass);
160                                }
161                        }
162                });
163
164                connection.send(new InfoRequest().path(path.getTextual()), AtOnceDispatcher.getInstance(), new ClientSideResponseFuture(compositeFuture) {
165                        @Override
166                        protected void processOk(Response response) {
167                                assert connection.getReceiverDispatcher().isActive();
168
169                                if (response.getFiles().size() != 1) {
170                                        throw new FramsticksException().msg("invalid number of files in response").arg("files", response.getFiles().size());
171                                }
172                                if (!path.isTheSame(response.getFiles().get(0).getPath())) {
173                                        throw new FramsticksException().msg("path mismatch").arg("returned path", response.getFiles().get(0).getPath());
174                                }
175                                FramsClass framsClass = Loaders.loadFramsClass(response.getFiles().get(0).getContent());
176
177                                CompositeParam thisParam = path.getTop().getParam();
178                                if (!thisParam.isMatchingContainedName(framsClass.getId())) {
179                                        throw new FramsticksException().msg("framsclass id mismatch").arg("request", thisParam.getContainedTypeName()).arg("fetched", framsClass.getId());
180                                }
181                                compositeFuture.pass(framsClass);
182                        }
183                });
184        }
185
186        @Override
187        public void get(final Path path, final Future<Path> future) {
188                assert isActive();
189                final ExceptionResultHandler remover = pathRemoveHandler(path, future);
190
191                log.trace("fetching values for {}", path);
192                findInfo(path, new FutureHandler<FramsClass>(remover) {
193                        @Override
194                        protected void result(FramsClass result) {
195
196                                final Access access = registry.prepareAccess(path.getTop().getParam());
197                                connection.send(new GetRequest().path(path.getTextual()), AtOnceDispatcher.getInstance(), new ClientSideResponseFuture(remover) {
198                                        @Override
199                                        protected void processOk(Response response) {
200                                                processFetchedValues(path, response.getFiles(), access, future);
201                                        }
202                                });
203                        }
204                });
205        }
206
207        @Override
208        public void set(final Path path, final PrimitiveParam<?> param, final Object value, final Future<Integer> future) {
209                assert isActive();
210                final Integer flag = bindAccess(path).set(param, value);
211
212                log.trace("storing value {} for {}", param, path);
213
214                connection.send(new SetRequest().value(value.toString()).field(param.getId()).path(path.getTextual()), this, new ClientSideResponseFuture(future) {
215                        @Override
216                        protected void processOk(Response response) {
217                                future.pass(flag);
218                        }
219                });
220        }
221
222        @Override
223        protected void joinableStart() {
224                Dispatching.use(connection, this);
225                super.joinableStart();
226
227                bufferedDispatcher.getTargetDispatcher().dispatch(new RunAt<RemoteTree>(ThrowExceptionHandler.getInstance()) {
228                        @Override
229                        protected void runAt() {
230
231                                connection.send(new InfoRequest().path("/"), bufferedDispatcher.getTargetDispatcher(), new ClientSideResponseFuture(this) {
232                                        @Override
233                                        protected void processOk(Response response) {
234                                                assert bufferedDispatcher.getTargetDispatcher().isActive();
235                                                FramsClass framsClass = Loaders.loadFramsClass(response.getFiles().get(0).getContent());
236                                                putInfoIntoCache(framsClass);
237
238                                                assignRootParam(Param.build().name("Tree").id(RemoteTree.this.getName()).type("o " + framsClass.getId()).finish(CompositeParam.class));
239                                                bufferedDispatcher.setBuffer(false);
240                                        }
241                                });
242
243                        }
244                });
245        }
246
247        @Override
248        protected void joinableInterrupt() {
249                Dispatching.drop(connection, this);
250                super.joinableInterrupt();
251        }
252
253        @Override
254        protected void joinableFinish() {
255                super.joinableFinish();
256
257        }
258
259        @Override
260        public void joinableJoin() throws InterruptedException {
261                Dispatching.join(connection);
262                super.joinableJoin();
263        }
264
265        @Override
266        public void childChangedState(Joinable joinable, JoinableState state) {
267                proceedToState(state);
268        }
269
270        @Override
271        public void call(@Nonnull final Path path, @Nonnull final ProcedureParam procedure, @Nonnull Object[] arguments, final Future<Object> future) {
272                assert isActive();
273                assert path.isResolved();
274
275                //TODO validate arguments type using params
276                connection.send(new CallRequest().procedure(procedure.getId()).arguments(Arrays.asList(arguments)).path(path.getTextual()), this, new ClientSideResponseFuture(future) {
277                        @Override
278                        protected void processOk(Response response) {
279                                assert isActive();
280                                // InstanceUtils.processFetchedValues(path, response.getFiles());
281                                future.pass(null);
282                        }
283                });
284
285        }
286
287        protected final Map<EventListener<?>, EventListener<File>> proxyListeners = new IdentityHashMap<>();
288
289        public <A> void addListener(Path path, EventParam param, final EventListener<A> listener, final Class<A> argumentType, final Future<Void> future) {
290                assert isActive();
291                assert path.isResolved();
292                if ((!argumentType.equals(Object.class)) && (null == registry.getFramsClassForJavaClass(argumentType))) {
293                        registry.registerAndBuild(argumentType);
294                }
295
296                final EventListener<File> proxyListener = new EventListener<File>() {
297
298                        @Override
299                        public void action(final File file) {
300                                Dispatching.dispatchIfNotActive(RemoteTree.this, new RunAt<RemoteTree>(RemoteTree.this) {
301
302                                        @Override
303                                        protected void runAt() {
304                                                assert isActive();
305                                                if (argumentType.equals(File.class)) {
306                                                        listener.action(Casting.tryCast(argumentType, file));
307                                                        return;
308                                                }
309                                                // Access access = registry.createAccess(argumentType);
310
311                                                // log.info("executing event with argument {}", argumentType);
312                                                MultiParamLoader loader = new MultiParamLoader();
313                                                loader.setNewSource(file.getContent());
314                                                loader.addBreakCondition(MultiParamLoader.Status.AfterObject);
315                                                loader.setAccessProvider(registry);
316                                                // loader.addAccess(access);
317                                                loader.go();
318                                                Object argument = loader.getLastAccess().getSelected();
319                                                // Object argument = access.getSelected();
320                                                if (argument == null) {
321                                                        listener.action(null);
322                                                }
323                                                if (!argumentType.isInstance(argument)) {
324                                                        throw new FramsticksException().msg("created argument is of wrong type").arg("expected", argumentType).arg("created", argument.getClass());
325                                                }
326                                                listener.action(argumentType.cast(argument));
327                                        }
328                                });
329                        }
330                };
331
332                proxyListeners.put(listener, proxyListener);
333
334                connection.addListener(Path.appendString(path.getTextual(), param.getId()), proxyListener, this, future);
335        }
336
337        public void removeListener(Path path, EventParam param, EventListener<?> listener, Future<Void> future) {
338                assert isActive();
339                EventListener<File> proxyListener = proxyListeners.get(listener);
340                connection.removeListener(proxyListener, this, future);
341        }
342
343}
Note: See TracBrowser for help on using the repository browser.