source: java/main/src/main/java/com/framsticks/remote/RemoteTree.java @ 193

Last change on this file since 193 was 193, checked in by Maciej Komosinski, 10 years ago

Set svn:eol-style native for all textual files

  • Property svn:eol-style set to native
File size: 10.8 KB
RevLine 
[97]1package com.framsticks.remote;
2
3import com.framsticks.communication.*;
4import com.framsticks.communication.queries.CallRequest;
5import com.framsticks.communication.queries.GetRequest;
6import com.framsticks.communication.queries.InfoRequest;
7import com.framsticks.communication.queries.SetRequest;
8import com.framsticks.params.*;
[99]9import com.framsticks.params.EventListener;
[97]10import com.framsticks.params.annotations.FramsClassAnnotation;
11import com.framsticks.params.annotations.ParamAnnotation;
12import com.framsticks.params.types.EventParam;
13import com.framsticks.params.types.ProcedureParam;
[100]14import com.framsticks.parsers.Loaders;
[105]15import com.framsticks.structure.AbstractTree;
16import com.framsticks.structure.Path;
17import com.framsticks.structure.messages.ListChange;
18import com.framsticks.structure.messages.ValueChange;
[97]19import com.framsticks.util.*;
[100]20import com.framsticks.util.dispatching.AtOnceDispatcher;
[97]21import com.framsticks.util.dispatching.Dispatching;
[100]22import com.framsticks.util.dispatching.DispatchingFuture;
[105]23import com.framsticks.util.dispatching.FutureHandler;
[97]24import com.framsticks.util.dispatching.Future;
25import com.framsticks.util.dispatching.Joinable;
26import com.framsticks.util.dispatching.JoinableParent;
27import com.framsticks.util.dispatching.JoinableState;
[98]28import com.framsticks.util.dispatching.ThrowExceptionHandler;
[97]29import com.framsticks.util.dispatching.RunAt;
30
[105]31import static com.framsticks.structure.TreeOperations.*;
32
[97]33import java.util.*;
34
35import javax.annotation.Nonnull;
36
[100]37import org.apache.logging.log4j.Logger;
38import org.apache.logging.log4j.LogManager;
[97]39
40/**
41 * @author Piotr Sniegowski
42 */
43@FramsClassAnnotation
[99]44public final class RemoteTree extends AbstractTree implements JoinableParent {
[97]45
[100]46        private final static Logger log = LogManager.getLogger(RemoteTree.class);
[97]47
48        protected ClientSideManagedConnection connection;
49
50        public RemoteTree() {
[101]51                bufferedDispatcher.setBuffer(true);
52                registry.registerAndBuild(ListChange.class);
53                registry.registerAndBuild(ValueChange.class);
[97]54        }
55
[102]56        public void setAddress(Address address) {
57                this.connection = Connection.to(new ClientSideManagedConnection(), address);
[101]58                this.connection.setExceptionHandler(this);
59                this.connection.setNeedFileAcceptor(this);
[97]60        }
61
[102]62        @ParamAnnotation
63        public void setAddress(String address) {
64                setAddress(new Address(address));
65        }
66
[101]67        @ParamAnnotation(flags = ParamFlags.USERREADONLY)
[97]68        public String getAddress() {
69                return connection == null ? "<disconnected>" : connection.getAddress().toString();
70        }
71
[98]72        public final ClientSideManagedConnection getConnection() {
73                return connection;
[97]74        }
75
[102]76
[97]77        @Override
78        public String toString() {
[102]79                return super.toString() + "[" + getAddress() + "]";
[97]80        }
81
[105]82        protected ExceptionHandler pathRemoveHandler(final Path path, final ExceptionHandler handler) {
83                return new ExceptionHandler() {
[100]84
[97]85                        @Override
[100]86                        public void handle(final FramsticksException exception) {
87                                Dispatching.dispatchIfNotActive(RemoteTree.this, new RunAt<RemoteTree>(RemoteTree.this) {
88
89                                        @Override
90                                        protected void runAt() {
91                                                assert path.getTree().isActive();
92                                                log.info("path is invalid (removing): {}", path);
93                                                bindAccess(path.getUnder()).set(path.getTop().getParam(), null);
94                                                handler.handle(exception);
95                                        }
96                                });
[97]97                        }
[100]98                };
[97]99        }
100
[100]101        // @Override
102        // public void get(final Path path, final ValueParam param, final Future<Object> future) {
103        //      assert isActive();
104        //      assert param != null;
105        //      // assert path.isResolved();
106        //      connection.send(new GetRequest().field(param.getId()).path(path.getTextual()), this, new ClientSideResponseFuture(pathRemoveHandler(path, future)) {
107        //              @Override
108        //              protected void processOk(Response response) {
109        //                      assert isActive();
110        //                      processFetchedValues(path, response.getFiles());
111        //                      future.pass(bindAccess(path.tryResolveIfNeeded()).get(param, Object.class));
112        //              }
113        //      });
114        // }
115
[105]116        protected final Map<String, Set<FutureHandler<FramsClass>>> infoRequests = new HashMap<String, Set<FutureHandler<FramsClass>>>();
[97]117
118
[102]119        /** Fetch information.
120         *
121         * This method does not check whether the info is already in the cache, it will always issue a request or join
122         * with other already issued but returned info request. Too issue request only when needed, use TreeOperations.findInfo()
123         *
124         * */
[97]125        @Override
[105]126        public void info(final Path path, final FutureHandler<FramsClass> future) {
[97]127
128                final String name = path.getTop().getParam().getContainedTypeName();
129
130                if (infoRequests.containsKey(name)) {
131                        infoRequests.get(name).add(future);
132                        return;
133                }
134
[100]135                log.debug("issuing info request for {}", name);
[105]136                final Set<FutureHandler<FramsClass>> futures = new HashSet<FutureHandler<FramsClass>>();
[97]137                futures.add(future);
138                infoRequests.put(name, futures);
139
[105]140                final FutureHandler<FramsClass> compositeFuture = DispatchingFuture.create(this, new FutureHandler<FramsClass>() {
[97]141
142                        @Override
143                        public void handle(FramsticksException exception) {
144                                assert isActive();
145                                infoRequests.remove(name);
[105]146                                for (FutureHandler<FramsClass> f : futures) {
[97]147                                        f.handle(exception);
148                                }
149                        }
150
151                        @Override
152                        protected void result(FramsClass framsClass) {
153                                assert isActive();
[100]154                                putInfoIntoCache(framsClass);
[97]155                                infoRequests.remove(name);
[105]156                                for (FutureHandler<FramsClass> f : futures) {
[97]157                                        f.pass(framsClass);
158                                }
159                        }
[100]160                });
[97]161
[100]162                connection.send(new InfoRequest().path(path.getTextual()), AtOnceDispatcher.getInstance(), new ClientSideResponseFuture(compositeFuture) {
[97]163                        @Override
164                        protected void processOk(Response response) {
[100]165                                assert connection.getReceiverDispatcher().isActive();
[97]166
167                                if (response.getFiles().size() != 1) {
168                                        throw new FramsticksException().msg("invalid number of files in response").arg("files", response.getFiles().size());
169                                }
170                                if (!path.isTheSame(response.getFiles().get(0).getPath())) {
171                                        throw new FramsticksException().msg("path mismatch").arg("returned path", response.getFiles().get(0).getPath());
172                                }
[100]173                                FramsClass framsClass = Loaders.loadFramsClass(response.getFiles().get(0).getContent());
[97]174
175                                CompositeParam thisParam = path.getTop().getParam();
176                                if (!thisParam.isMatchingContainedName(framsClass.getId())) {
177                                        throw new FramsticksException().msg("framsclass id mismatch").arg("request", thisParam.getContainedTypeName()).arg("fetched", framsClass.getId());
178                                }
179                                compositeFuture.pass(framsClass);
180                        }
181                });
182        }
183
184        @Override
[105]185        public void get(final Path path, final FutureHandler<Path> future) {
[97]186                assert isActive();
[105]187                final ExceptionHandler remover = pathRemoveHandler(path, future);
[97]188
[100]189                log.trace("fetching values for {}", path);
[105]190                findInfo(path, new Future<FramsClass>(remover) {
[97]191                        @Override
[98]192                        protected void result(FramsClass result) {
193
[107]194                                final Access access = registry.prepareAccess(path.getTop().getParam(), false);
[100]195                                connection.send(new GetRequest().path(path.getTextual()), AtOnceDispatcher.getInstance(), new ClientSideResponseFuture(remover) {
[98]196                                        @Override
197                                        protected void processOk(Response response) {
[100]198                                                processFetchedValues(path, response.getFiles(), access, future);
[98]199                                        }
200                                });
[97]201                        }
202                });
203        }
204
205        @Override
[105]206        public void set(final Path path, final PrimitiveParam<?> param, final Object value, final FutureHandler<Integer> future) {
[97]207                assert isActive();
[98]208                final Integer flag = bindAccess(path).set(param, value);
[97]209
[100]210                log.trace("storing value {} for {}", param, path);
[102]211
[97]212                connection.send(new SetRequest().value(value.toString()).field(param.getId()).path(path.getTextual()), this, new ClientSideResponseFuture(future) {
213                        @Override
214                        protected void processOk(Response response) {
215                                future.pass(flag);
216                        }
217                });
218        }
219
220        @Override
221        protected void joinableStart() {
222                Dispatching.use(connection, this);
223                super.joinableStart();
[98]224
[101]225                bufferedDispatcher.getTargetDispatcher().dispatch(new RunAt<RemoteTree>(ThrowExceptionHandler.getInstance()) {
[98]226                        @Override
227                        protected void runAt() {
228
[101]229                                connection.send(new InfoRequest().path("/"), bufferedDispatcher.getTargetDispatcher(), new ClientSideResponseFuture(this) {
[98]230                                        @Override
231                                        protected void processOk(Response response) {
[102]232                                                assert bufferedDispatcher.getTargetDispatcher().isActive();
233                                                FramsClass framsClass = Loaders.loadFramsClass(response.getFiles().get(0).getContent());
234                                                putInfoIntoCache(framsClass);
235
[98]236                                                assignRootParam(Param.build().name("Tree").id(RemoteTree.this.getName()).type("o " + framsClass.getId()).finish(CompositeParam.class));
[101]237                                                bufferedDispatcher.setBuffer(false);
[98]238                                        }
239                                });
240
241                        }
242                });
[97]243        }
244
245        @Override
246        protected void joinableInterrupt() {
247                Dispatching.drop(connection, this);
248                super.joinableInterrupt();
249        }
250
251        @Override
252        protected void joinableFinish() {
253                super.joinableFinish();
254
255        }
256
257        @Override
258        public void joinableJoin() throws InterruptedException {
259                Dispatching.join(connection);
260                super.joinableJoin();
261        }
262
263        @Override
264        public void childChangedState(Joinable joinable, JoinableState state) {
265                proceedToState(state);
266        }
267
268        @Override
[105]269        public <R> void call(@Nonnull final Path path, @Nonnull final ProcedureParam procedure, @Nonnull Object[] arguments, final Class<R> resultType, final FutureHandler<R> future) {
[97]270                assert isActive();
271                assert path.isResolved();
272
273                //TODO validate arguments type using params
274                connection.send(new CallRequest().procedure(procedure.getId()).arguments(Arrays.asList(arguments)).path(path.getTextual()), this, new ClientSideResponseFuture(future) {
275                        @Override
276                        protected void processOk(Response response) {
277                                assert isActive();
[103]278
279                                if (response.getFiles().size() == 0) {
280                                        future.pass(null);
281                                        return;
282                                }
283                                if (response.getFiles().size() == 1) {
284                                        future.pass(AccessOperations.convert(resultType, response.getFiles().get(0), registry));
285                                        return;
286                                }
287                                throw new FramsticksUnsupportedOperationException().msg("call result with multiple files");
288
[97]289                        }
290                });
291
292        }
293
[99]294        protected final Map<EventListener<?>, EventListener<File>> proxyListeners = new IdentityHashMap<>();
[97]295
[105]296        public <A> void addListener(Path path, EventParam param, final EventListener<A> listener, final Class<A> argumentType, final FutureHandler<Void> future) {
[99]297                assert isActive();
298                assert path.isResolved();
299                if ((!argumentType.equals(Object.class)) && (null == registry.getFramsClassForJavaClass(argumentType))) {
300                        registry.registerAndBuild(argumentType);
301                }
302
303                final EventListener<File> proxyListener = new EventListener<File>() {
304
305                        @Override
306                        public void action(final File file) {
307                                Dispatching.dispatchIfNotActive(RemoteTree.this, new RunAt<RemoteTree>(RemoteTree.this) {
308
309                                        @Override
310                                        protected void runAt() {
311                                                assert isActive();
312
[103]313                                                listener.action(AccessOperations.convert(argumentType, file, registry));
[99]314                                        }
315                                });
316                        }
317                };
318
319                proxyListeners.put(listener, proxyListener);
320
321                connection.addListener(Path.appendString(path.getTextual(), param.getId()), proxyListener, this, future);
322        }
323
[105]324        public void removeListener(Path path, EventParam param, EventListener<?> listener, FutureHandler<Void> future) {
[99]325                assert isActive();
326                EventListener<File> proxyListener = proxyListeners.get(listener);
327                connection.removeListener(proxyListener, this, future);
328        }
329
[97]330}
Note: See TracBrowser for help on using the repository browser.