source: java/main/src/main/java/com/framsticks/remote/RemoteTree.java @ 107

Last change on this file since 107 was 107, checked in by psniegowski, 11 years ago

HIGHLIGHTS:

  • add SimultorProviders? hierarchy
  • start Framsticks server over SSH
  • FJF compatible with Framsticks 4.0rc3
  • reading and writing of standard.expt
  • a proof-of-concept implementation of StandardExperiment?

CHANGELOG:
Optionally return FreeAccess? from registry.

Add SimulatorRange?.

StandardExperiment? with genotypes circulation.

Automate registration around StandardState?.

More improvements to StandardExperiment?.

Skeleton version of StandardExperiment?.

Test saving of StandardState?.

Standard experiment state is being loaded.

More development towards StandardState? reading.

Work on reading standard experiment state.

Add classes for standard experiment.

Update example standard.expt

Add FreeAccess? and FreeObject?.

Made compatible with version 4.0rc3

Change deserialization policy.

Improve SSH support.

Working running simulator over SSH.

Fix joining bug in Experiment.

Working version of SimulatorRunner?.

Add more SimulatorProviders?.

Working PrimeExperimentTest? with 4.0rc3

Add references to deserialization.

Add OpaqueObject? and it's serialization.

Add deserialization of dictionaries.

Partial implementation of deserialization.

Add more tests for deserialization.

Prepare tests for deserialization.

Add proper result to prime experiment test.

Minor fixes to simulators providers.

Draft version of SimulatorProvider?.

Add SimulatorProvider? interface.

File size: 10.8 KB
Line 
1package com.framsticks.remote;
2
3import com.framsticks.communication.*;
4import com.framsticks.communication.queries.CallRequest;
5import com.framsticks.communication.queries.GetRequest;
6import com.framsticks.communication.queries.InfoRequest;
7import com.framsticks.communication.queries.SetRequest;
8import com.framsticks.params.*;
9import com.framsticks.params.EventListener;
10import com.framsticks.params.annotations.FramsClassAnnotation;
11import com.framsticks.params.annotations.ParamAnnotation;
12import com.framsticks.params.types.EventParam;
13import com.framsticks.params.types.ProcedureParam;
14import com.framsticks.parsers.Loaders;
15import com.framsticks.structure.AbstractTree;
16import com.framsticks.structure.Path;
17import com.framsticks.structure.messages.ListChange;
18import com.framsticks.structure.messages.ValueChange;
19import com.framsticks.util.*;
20import com.framsticks.util.dispatching.AtOnceDispatcher;
21import com.framsticks.util.dispatching.Dispatching;
22import com.framsticks.util.dispatching.DispatchingFuture;
23import com.framsticks.util.dispatching.FutureHandler;
24import com.framsticks.util.dispatching.Future;
25import com.framsticks.util.dispatching.Joinable;
26import com.framsticks.util.dispatching.JoinableParent;
27import com.framsticks.util.dispatching.JoinableState;
28import com.framsticks.util.dispatching.ThrowExceptionHandler;
29import com.framsticks.util.dispatching.RunAt;
30
31import static com.framsticks.structure.TreeOperations.*;
32
33import java.util.*;
34
35import javax.annotation.Nonnull;
36
37import org.apache.logging.log4j.Logger;
38import org.apache.logging.log4j.LogManager;
39
40/**
41 * @author Piotr Sniegowski
42 */
43@FramsClassAnnotation
44public final class RemoteTree extends AbstractTree implements JoinableParent {
45
46        private final static Logger log = LogManager.getLogger(RemoteTree.class);
47
48        protected ClientSideManagedConnection connection;
49
50        public RemoteTree() {
51                bufferedDispatcher.setBuffer(true);
52                registry.registerAndBuild(ListChange.class);
53                registry.registerAndBuild(ValueChange.class);
54        }
55
56        public void setAddress(Address address) {
57                this.connection = Connection.to(new ClientSideManagedConnection(), address);
58                this.connection.setExceptionHandler(this);
59                this.connection.setNeedFileAcceptor(this);
60        }
61
62        @ParamAnnotation
63        public void setAddress(String address) {
64                setAddress(new Address(address));
65        }
66
67        @ParamAnnotation(flags = ParamFlags.USERREADONLY)
68        public String getAddress() {
69                return connection == null ? "<disconnected>" : connection.getAddress().toString();
70        }
71
72        public final ClientSideManagedConnection getConnection() {
73                return connection;
74        }
75
76
77        @Override
78        public String toString() {
79                return super.toString() + "[" + getAddress() + "]";
80        }
81
82        protected ExceptionHandler pathRemoveHandler(final Path path, final ExceptionHandler handler) {
83                return new ExceptionHandler() {
84
85                        @Override
86                        public void handle(final FramsticksException exception) {
87                                Dispatching.dispatchIfNotActive(RemoteTree.this, new RunAt<RemoteTree>(RemoteTree.this) {
88
89                                        @Override
90                                        protected void runAt() {
91                                                assert path.getTree().isActive();
92                                                log.info("path is invalid (removing): {}", path);
93                                                bindAccess(path.getUnder()).set(path.getTop().getParam(), null);
94                                                handler.handle(exception);
95                                        }
96                                });
97                        }
98                };
99        }
100
101        // @Override
102        // public void get(final Path path, final ValueParam param, final Future<Object> future) {
103        //      assert isActive();
104        //      assert param != null;
105        //      // assert path.isResolved();
106        //      connection.send(new GetRequest().field(param.getId()).path(path.getTextual()), this, new ClientSideResponseFuture(pathRemoveHandler(path, future)) {
107        //              @Override
108        //              protected void processOk(Response response) {
109        //                      assert isActive();
110        //                      processFetchedValues(path, response.getFiles());
111        //                      future.pass(bindAccess(path.tryResolveIfNeeded()).get(param, Object.class));
112        //              }
113        //      });
114        // }
115
116        protected final Map<String, Set<FutureHandler<FramsClass>>> infoRequests = new HashMap<String, Set<FutureHandler<FramsClass>>>();
117
118
119        /** Fetch information.
120         *
121         * This method does not check whether the info is already in the cache, it will always issue a request or join
122         * with other already issued but returned info request. Too issue request only when needed, use TreeOperations.findInfo()
123         *
124         * */
125        @Override
126        public void info(final Path path, final FutureHandler<FramsClass> future) {
127
128                final String name = path.getTop().getParam().getContainedTypeName();
129
130                if (infoRequests.containsKey(name)) {
131                        infoRequests.get(name).add(future);
132                        return;
133                }
134
135                log.debug("issuing info request for {}", name);
136                final Set<FutureHandler<FramsClass>> futures = new HashSet<FutureHandler<FramsClass>>();
137                futures.add(future);
138                infoRequests.put(name, futures);
139
140                final FutureHandler<FramsClass> compositeFuture = DispatchingFuture.create(this, new FutureHandler<FramsClass>() {
141
142                        @Override
143                        public void handle(FramsticksException exception) {
144                                assert isActive();
145                                infoRequests.remove(name);
146                                for (FutureHandler<FramsClass> f : futures) {
147                                        f.handle(exception);
148                                }
149                        }
150
151                        @Override
152                        protected void result(FramsClass framsClass) {
153                                assert isActive();
154                                putInfoIntoCache(framsClass);
155                                infoRequests.remove(name);
156                                for (FutureHandler<FramsClass> f : futures) {
157                                        f.pass(framsClass);
158                                }
159                        }
160                });
161
162                connection.send(new InfoRequest().path(path.getTextual()), AtOnceDispatcher.getInstance(), new ClientSideResponseFuture(compositeFuture) {
163                        @Override
164                        protected void processOk(Response response) {
165                                assert connection.getReceiverDispatcher().isActive();
166
167                                if (response.getFiles().size() != 1) {
168                                        throw new FramsticksException().msg("invalid number of files in response").arg("files", response.getFiles().size());
169                                }
170                                if (!path.isTheSame(response.getFiles().get(0).getPath())) {
171                                        throw new FramsticksException().msg("path mismatch").arg("returned path", response.getFiles().get(0).getPath());
172                                }
173                                FramsClass framsClass = Loaders.loadFramsClass(response.getFiles().get(0).getContent());
174
175                                CompositeParam thisParam = path.getTop().getParam();
176                                if (!thisParam.isMatchingContainedName(framsClass.getId())) {
177                                        throw new FramsticksException().msg("framsclass id mismatch").arg("request", thisParam.getContainedTypeName()).arg("fetched", framsClass.getId());
178                                }
179                                compositeFuture.pass(framsClass);
180                        }
181                });
182        }
183
184        @Override
185        public void get(final Path path, final FutureHandler<Path> future) {
186                assert isActive();
187                final ExceptionHandler remover = pathRemoveHandler(path, future);
188
189                log.trace("fetching values for {}", path);
190                findInfo(path, new Future<FramsClass>(remover) {
191                        @Override
192                        protected void result(FramsClass result) {
193
194                                final Access access = registry.prepareAccess(path.getTop().getParam(), false);
195                                connection.send(new GetRequest().path(path.getTextual()), AtOnceDispatcher.getInstance(), new ClientSideResponseFuture(remover) {
196                                        @Override
197                                        protected void processOk(Response response) {
198                                                processFetchedValues(path, response.getFiles(), access, future);
199                                        }
200                                });
201                        }
202                });
203        }
204
205        @Override
206        public void set(final Path path, final PrimitiveParam<?> param, final Object value, final FutureHandler<Integer> future) {
207                assert isActive();
208                final Integer flag = bindAccess(path).set(param, value);
209
210                log.trace("storing value {} for {}", param, path);
211
212                connection.send(new SetRequest().value(value.toString()).field(param.getId()).path(path.getTextual()), this, new ClientSideResponseFuture(future) {
213                        @Override
214                        protected void processOk(Response response) {
215                                future.pass(flag);
216                        }
217                });
218        }
219
220        @Override
221        protected void joinableStart() {
222                Dispatching.use(connection, this);
223                super.joinableStart();
224
225                bufferedDispatcher.getTargetDispatcher().dispatch(new RunAt<RemoteTree>(ThrowExceptionHandler.getInstance()) {
226                        @Override
227                        protected void runAt() {
228
229                                connection.send(new InfoRequest().path("/"), bufferedDispatcher.getTargetDispatcher(), new ClientSideResponseFuture(this) {
230                                        @Override
231                                        protected void processOk(Response response) {
232                                                assert bufferedDispatcher.getTargetDispatcher().isActive();
233                                                FramsClass framsClass = Loaders.loadFramsClass(response.getFiles().get(0).getContent());
234                                                putInfoIntoCache(framsClass);
235
236                                                assignRootParam(Param.build().name("Tree").id(RemoteTree.this.getName()).type("o " + framsClass.getId()).finish(CompositeParam.class));
237                                                bufferedDispatcher.setBuffer(false);
238                                        }
239                                });
240
241                        }
242                });
243        }
244
245        @Override
246        protected void joinableInterrupt() {
247                Dispatching.drop(connection, this);
248                super.joinableInterrupt();
249        }
250
251        @Override
252        protected void joinableFinish() {
253                super.joinableFinish();
254
255        }
256
257        @Override
258        public void joinableJoin() throws InterruptedException {
259                Dispatching.join(connection);
260                super.joinableJoin();
261        }
262
263        @Override
264        public void childChangedState(Joinable joinable, JoinableState state) {
265                proceedToState(state);
266        }
267
268        @Override
269        public <R> void call(@Nonnull final Path path, @Nonnull final ProcedureParam procedure, @Nonnull Object[] arguments, final Class<R> resultType, final FutureHandler<R> future) {
270                assert isActive();
271                assert path.isResolved();
272
273                //TODO validate arguments type using params
274                connection.send(new CallRequest().procedure(procedure.getId()).arguments(Arrays.asList(arguments)).path(path.getTextual()), this, new ClientSideResponseFuture(future) {
275                        @Override
276                        protected void processOk(Response response) {
277                                assert isActive();
278
279                                if (response.getFiles().size() == 0) {
280                                        future.pass(null);
281                                        return;
282                                }
283                                if (response.getFiles().size() == 1) {
284                                        future.pass(AccessOperations.convert(resultType, response.getFiles().get(0), registry));
285                                        return;
286                                }
287                                throw new FramsticksUnsupportedOperationException().msg("call result with multiple files");
288
289                        }
290                });
291
292        }
293
294        protected final Map<EventListener<?>, EventListener<File>> proxyListeners = new IdentityHashMap<>();
295
296        public <A> void addListener(Path path, EventParam param, final EventListener<A> listener, final Class<A> argumentType, final FutureHandler<Void> future) {
297                assert isActive();
298                assert path.isResolved();
299                if ((!argumentType.equals(Object.class)) && (null == registry.getFramsClassForJavaClass(argumentType))) {
300                        registry.registerAndBuild(argumentType);
301                }
302
303                final EventListener<File> proxyListener = new EventListener<File>() {
304
305                        @Override
306                        public void action(final File file) {
307                                Dispatching.dispatchIfNotActive(RemoteTree.this, new RunAt<RemoteTree>(RemoteTree.this) {
308
309                                        @Override
310                                        protected void runAt() {
311                                                assert isActive();
312
313                                                listener.action(AccessOperations.convert(argumentType, file, registry));
314                                        }
315                                });
316                        }
317                };
318
319                proxyListeners.put(listener, proxyListener);
320
321                connection.addListener(Path.appendString(path.getTextual(), param.getId()), proxyListener, this, future);
322        }
323
324        public void removeListener(Path path, EventParam param, EventListener<?> listener, FutureHandler<Void> future) {
325                assert isActive();
326                EventListener<File> proxyListener = proxyListeners.get(listener);
327                connection.removeListener(proxyListener, this, future);
328        }
329
330}
Note: See TracBrowser for help on using the repository browser.