source: java/main/src/main/java/com/framsticks/remote/RemoteTree.java @ 193

Last change on this file since 193 was 193, checked in by Maciej Komosinski, 10 years ago

Set svn:eol-style native for all textual files

  • Property svn:eol-style set to native
File size: 10.8 KB
Line 
1package com.framsticks.remote;
2
3import com.framsticks.communication.*;
4import com.framsticks.communication.queries.CallRequest;
5import com.framsticks.communication.queries.GetRequest;
6import com.framsticks.communication.queries.InfoRequest;
7import com.framsticks.communication.queries.SetRequest;
8import com.framsticks.params.*;
9import com.framsticks.params.EventListener;
10import com.framsticks.params.annotations.FramsClassAnnotation;
11import com.framsticks.params.annotations.ParamAnnotation;
12import com.framsticks.params.types.EventParam;
13import com.framsticks.params.types.ProcedureParam;
14import com.framsticks.parsers.Loaders;
15import com.framsticks.structure.AbstractTree;
16import com.framsticks.structure.Path;
17import com.framsticks.structure.messages.ListChange;
18import com.framsticks.structure.messages.ValueChange;
19import com.framsticks.util.*;
20import com.framsticks.util.dispatching.AtOnceDispatcher;
21import com.framsticks.util.dispatching.Dispatching;
22import com.framsticks.util.dispatching.DispatchingFuture;
23import com.framsticks.util.dispatching.FutureHandler;
24import com.framsticks.util.dispatching.Future;
25import com.framsticks.util.dispatching.Joinable;
26import com.framsticks.util.dispatching.JoinableParent;
27import com.framsticks.util.dispatching.JoinableState;
28import com.framsticks.util.dispatching.ThrowExceptionHandler;
29import com.framsticks.util.dispatching.RunAt;
30
31import static com.framsticks.structure.TreeOperations.*;
32
33import java.util.*;
34
35import javax.annotation.Nonnull;
36
37import org.apache.logging.log4j.Logger;
38import org.apache.logging.log4j.LogManager;
39
40/**
41 * @author Piotr Sniegowski
42 */
43@FramsClassAnnotation
44public final class RemoteTree extends AbstractTree implements JoinableParent {
45
46        private final static Logger log = LogManager.getLogger(RemoteTree.class);
47
48        protected ClientSideManagedConnection connection;
49
50        public RemoteTree() {
51                bufferedDispatcher.setBuffer(true);
52                registry.registerAndBuild(ListChange.class);
53                registry.registerAndBuild(ValueChange.class);
54        }
55
56        public void setAddress(Address address) {
57                this.connection = Connection.to(new ClientSideManagedConnection(), address);
58                this.connection.setExceptionHandler(this);
59                this.connection.setNeedFileAcceptor(this);
60        }
61
62        @ParamAnnotation
63        public void setAddress(String address) {
64                setAddress(new Address(address));
65        }
66
67        @ParamAnnotation(flags = ParamFlags.USERREADONLY)
68        public String getAddress() {
69                return connection == null ? "<disconnected>" : connection.getAddress().toString();
70        }
71
72        public final ClientSideManagedConnection getConnection() {
73                return connection;
74        }
75
76
77        @Override
78        public String toString() {
79                return super.toString() + "[" + getAddress() + "]";
80        }
81
82        protected ExceptionHandler pathRemoveHandler(final Path path, final ExceptionHandler handler) {
83                return new ExceptionHandler() {
84
85                        @Override
86                        public void handle(final FramsticksException exception) {
87                                Dispatching.dispatchIfNotActive(RemoteTree.this, new RunAt<RemoteTree>(RemoteTree.this) {
88
89                                        @Override
90                                        protected void runAt() {
91                                                assert path.getTree().isActive();
92                                                log.info("path is invalid (removing): {}", path);
93                                                bindAccess(path.getUnder()).set(path.getTop().getParam(), null);
94                                                handler.handle(exception);
95                                        }
96                                });
97                        }
98                };
99        }
100
101        // @Override
102        // public void get(final Path path, final ValueParam param, final Future<Object> future) {
103        //      assert isActive();
104        //      assert param != null;
105        //      // assert path.isResolved();
106        //      connection.send(new GetRequest().field(param.getId()).path(path.getTextual()), this, new ClientSideResponseFuture(pathRemoveHandler(path, future)) {
107        //              @Override
108        //              protected void processOk(Response response) {
109        //                      assert isActive();
110        //                      processFetchedValues(path, response.getFiles());
111        //                      future.pass(bindAccess(path.tryResolveIfNeeded()).get(param, Object.class));
112        //              }
113        //      });
114        // }
115
116        protected final Map<String, Set<FutureHandler<FramsClass>>> infoRequests = new HashMap<String, Set<FutureHandler<FramsClass>>>();
117
118
119        /** Fetch information.
120         *
121         * This method does not check whether the info is already in the cache, it will always issue a request or join
122         * with other already issued but returned info request. Too issue request only when needed, use TreeOperations.findInfo()
123         *
124         * */
125        @Override
126        public void info(final Path path, final FutureHandler<FramsClass> future) {
127
128                final String name = path.getTop().getParam().getContainedTypeName();
129
130                if (infoRequests.containsKey(name)) {
131                        infoRequests.get(name).add(future);
132                        return;
133                }
134
135                log.debug("issuing info request for {}", name);
136                final Set<FutureHandler<FramsClass>> futures = new HashSet<FutureHandler<FramsClass>>();
137                futures.add(future);
138                infoRequests.put(name, futures);
139
140                final FutureHandler<FramsClass> compositeFuture = DispatchingFuture.create(this, new FutureHandler<FramsClass>() {
141
142                        @Override
143                        public void handle(FramsticksException exception) {
144                                assert isActive();
145                                infoRequests.remove(name);
146                                for (FutureHandler<FramsClass> f : futures) {
147                                        f.handle(exception);
148                                }
149                        }
150
151                        @Override
152                        protected void result(FramsClass framsClass) {
153                                assert isActive();
154                                putInfoIntoCache(framsClass);
155                                infoRequests.remove(name);
156                                for (FutureHandler<FramsClass> f : futures) {
157                                        f.pass(framsClass);
158                                }
159                        }
160                });
161
162                connection.send(new InfoRequest().path(path.getTextual()), AtOnceDispatcher.getInstance(), new ClientSideResponseFuture(compositeFuture) {
163                        @Override
164                        protected void processOk(Response response) {
165                                assert connection.getReceiverDispatcher().isActive();
166
167                                if (response.getFiles().size() != 1) {
168                                        throw new FramsticksException().msg("invalid number of files in response").arg("files", response.getFiles().size());
169                                }
170                                if (!path.isTheSame(response.getFiles().get(0).getPath())) {
171                                        throw new FramsticksException().msg("path mismatch").arg("returned path", response.getFiles().get(0).getPath());
172                                }
173                                FramsClass framsClass = Loaders.loadFramsClass(response.getFiles().get(0).getContent());
174
175                                CompositeParam thisParam = path.getTop().getParam();
176                                if (!thisParam.isMatchingContainedName(framsClass.getId())) {
177                                        throw new FramsticksException().msg("framsclass id mismatch").arg("request", thisParam.getContainedTypeName()).arg("fetched", framsClass.getId());
178                                }
179                                compositeFuture.pass(framsClass);
180                        }
181                });
182        }
183
184        @Override
185        public void get(final Path path, final FutureHandler<Path> future) {
186                assert isActive();
187                final ExceptionHandler remover = pathRemoveHandler(path, future);
188
189                log.trace("fetching values for {}", path);
190                findInfo(path, new Future<FramsClass>(remover) {
191                        @Override
192                        protected void result(FramsClass result) {
193
194                                final Access access = registry.prepareAccess(path.getTop().getParam(), false);
195                                connection.send(new GetRequest().path(path.getTextual()), AtOnceDispatcher.getInstance(), new ClientSideResponseFuture(remover) {
196                                        @Override
197                                        protected void processOk(Response response) {
198                                                processFetchedValues(path, response.getFiles(), access, future);
199                                        }
200                                });
201                        }
202                });
203        }
204
205        @Override
206        public void set(final Path path, final PrimitiveParam<?> param, final Object value, final FutureHandler<Integer> future) {
207                assert isActive();
208                final Integer flag = bindAccess(path).set(param, value);
209
210                log.trace("storing value {} for {}", param, path);
211
212                connection.send(new SetRequest().value(value.toString()).field(param.getId()).path(path.getTextual()), this, new ClientSideResponseFuture(future) {
213                        @Override
214                        protected void processOk(Response response) {
215                                future.pass(flag);
216                        }
217                });
218        }
219
220        @Override
221        protected void joinableStart() {
222                Dispatching.use(connection, this);
223                super.joinableStart();
224
225                bufferedDispatcher.getTargetDispatcher().dispatch(new RunAt<RemoteTree>(ThrowExceptionHandler.getInstance()) {
226                        @Override
227                        protected void runAt() {
228
229                                connection.send(new InfoRequest().path("/"), bufferedDispatcher.getTargetDispatcher(), new ClientSideResponseFuture(this) {
230                                        @Override
231                                        protected void processOk(Response response) {
232                                                assert bufferedDispatcher.getTargetDispatcher().isActive();
233                                                FramsClass framsClass = Loaders.loadFramsClass(response.getFiles().get(0).getContent());
234                                                putInfoIntoCache(framsClass);
235
236                                                assignRootParam(Param.build().name("Tree").id(RemoteTree.this.getName()).type("o " + framsClass.getId()).finish(CompositeParam.class));
237                                                bufferedDispatcher.setBuffer(false);
238                                        }
239                                });
240
241                        }
242                });
243        }
244
245        @Override
246        protected void joinableInterrupt() {
247                Dispatching.drop(connection, this);
248                super.joinableInterrupt();
249        }
250
251        @Override
252        protected void joinableFinish() {
253                super.joinableFinish();
254
255        }
256
257        @Override
258        public void joinableJoin() throws InterruptedException {
259                Dispatching.join(connection);
260                super.joinableJoin();
261        }
262
263        @Override
264        public void childChangedState(Joinable joinable, JoinableState state) {
265                proceedToState(state);
266        }
267
268        @Override
269        public <R> void call(@Nonnull final Path path, @Nonnull final ProcedureParam procedure, @Nonnull Object[] arguments, final Class<R> resultType, final FutureHandler<R> future) {
270                assert isActive();
271                assert path.isResolved();
272
273                //TODO validate arguments type using params
274                connection.send(new CallRequest().procedure(procedure.getId()).arguments(Arrays.asList(arguments)).path(path.getTextual()), this, new ClientSideResponseFuture(future) {
275                        @Override
276                        protected void processOk(Response response) {
277                                assert isActive();
278
279                                if (response.getFiles().size() == 0) {
280                                        future.pass(null);
281                                        return;
282                                }
283                                if (response.getFiles().size() == 1) {
284                                        future.pass(AccessOperations.convert(resultType, response.getFiles().get(0), registry));
285                                        return;
286                                }
287                                throw new FramsticksUnsupportedOperationException().msg("call result with multiple files");
288
289                        }
290                });
291
292        }
293
294        protected final Map<EventListener<?>, EventListener<File>> proxyListeners = new IdentityHashMap<>();
295
296        public <A> void addListener(Path path, EventParam param, final EventListener<A> listener, final Class<A> argumentType, final FutureHandler<Void> future) {
297                assert isActive();
298                assert path.isResolved();
299                if ((!argumentType.equals(Object.class)) && (null == registry.getFramsClassForJavaClass(argumentType))) {
300                        registry.registerAndBuild(argumentType);
301                }
302
303                final EventListener<File> proxyListener = new EventListener<File>() {
304
305                        @Override
306                        public void action(final File file) {
307                                Dispatching.dispatchIfNotActive(RemoteTree.this, new RunAt<RemoteTree>(RemoteTree.this) {
308
309                                        @Override
310                                        protected void runAt() {
311                                                assert isActive();
312
313                                                listener.action(AccessOperations.convert(argumentType, file, registry));
314                                        }
315                                });
316                        }
317                };
318
319                proxyListeners.put(listener, proxyListener);
320
321                connection.addListener(Path.appendString(path.getTextual(), param.getId()), proxyListener, this, future);
322        }
323
324        public void removeListener(Path path, EventParam param, EventListener<?> listener, FutureHandler<Void> future) {
325                assert isActive();
326                EventListener<File> proxyListener = proxyListeners.get(listener);
327                connection.removeListener(proxyListener, this, future);
328        }
329
330}
Note: See TracBrowser for help on using the repository browser.