source: java/main/src/main/java/com/framsticks/remote/RemoteInstance.java @ 96

Last change on this file since 96 was 96, checked in by psniegowski, 11 years ago

HIGHLIGHTS:

  • cleanup Instance management
    • extract Instance interface
    • extract Instance common algorithms to InstanceUtils?
  • fix closing issues: Ctrl+C or window close button

properly shutdown whole program

by Java Framsticks framework

  • fix parsing and printing of all request types
  • hide exception passing in special handle method of closures
    • substantially improve readability of closures
    • basically enable use of exception in asynchronous closures

(thrown exception is transported back to the caller)

  • implement call request on both sides

CHANGELOG:
Further improve calling.

Improve instance calling.

Calling is working on both sides.

Improve exception handling in testing.

Waiters do not supercede other apllication exception being thrown.

Finished parsing and printing of all request types (with tests).

Move implementation and tests of request parsing to Request.

Add tests for Requests.

Improve waits in asynchronours tests.

Extract more algorithms to InstanceUtils?.

Extract Instance.resolve to InstanceUtils?.

Improve naming.

Improve passing exception in InstanceClient?.

Hide calling of passed functor in StateCallback?.

Hide Exception passing in asynchronous closures.

Hide exception passing in Future.

Make ResponseCallback? an abstract class.

Make Future an abstract class.

Minor change.

Move getPath to Path.to()

Move bindAccess to InstanceUtils?.

Extract common things to InstanceUtils?.

Fix synchronization bug in Connection.

Move resolve to InstanceUtils?.

Allow names of Joinable to be dynamic.

Add support for set request server side.

More fixes in communication.

Fix issues with parsing in connection.

Cut new line characters when reading.

More improvements.

Migrate closures to FramsticksException?.

Several changes.

Extract resolveAndFetch to InstanceUtils? algorithms.

Test resolving and fetching.

More fixes with function signature deduction.

Do not print default values in SimpleAbstractAccess?.

Add test of FramsClass? printing.

Improve FramsticksException? messages.

Add explicit dispatcher synchronization feature.

Rework assertions in tests.

Previous solution was not generic enough.

Allow addition of joinables to collection after start.

Extract SimulatorInstance? from RemoteInstance?.

Remove PrivateJoinableCollection?.

Improve connections.

Move shutdown hook to inside the Monitor.

It should work in TestNG tests, but it seems that
hooks are not called.

In ServerTest? client connects to testing server.

Move socket initialization to receiver thread.

Add proper closing on Ctrl+C (don't use signals).

Fix bugs with server accepting connections.

Merge Entity into Joinable.

Reworking ServerInstance?.

Extract more algorithm to InstanceUtils?.

Extract some common functionality from AbstractInstance?.

Functions were placed in InstanceUtils?.

Hide registry of Instance.

Use ValueParam? in Instance interface.

Minor change.

Extract Instance interface.

Old Instance is now AbstractInstance?.

File size: 13.1 KB
Line 
1package com.framsticks.remote;
2
3import com.framsticks.communication.*;
4import com.framsticks.communication.queries.CallRequest;
5import com.framsticks.communication.queries.GetRequest;
6import com.framsticks.communication.queries.InfoRequest;
7import com.framsticks.communication.queries.SetRequest;
8import com.framsticks.core.AbstractInstance;
9import com.framsticks.core.InstanceUtils;
10import com.framsticks.core.ListChange;
11import com.framsticks.core.Node;
12import com.framsticks.core.Path;
13import com.framsticks.params.*;
14import com.framsticks.params.annotations.FramsClassAnnotation;
15import com.framsticks.params.annotations.ParamAnnotation;
16import com.framsticks.params.types.EventParam;
17import com.framsticks.params.types.ProcedureParam;
18import com.framsticks.parsers.MultiParamLoader;
19import com.framsticks.core.Instance;
20import com.framsticks.util.*;
21import com.framsticks.util.dispatching.Dispatching;
22import com.framsticks.util.dispatching.ExceptionResultHandler;
23import com.framsticks.util.dispatching.Future;
24import com.framsticks.util.dispatching.Joinable;
25import com.framsticks.util.dispatching.JoinableParent;
26import com.framsticks.util.dispatching.JoinableState;
27import com.framsticks.util.lang.Casting;
28import com.framsticks.util.lang.Pair;
29import com.framsticks.util.dispatching.RunAt;
30import static com.framsticks.core.InstanceUtils.*;
31
32import java.util.*;
33
34import javax.annotation.Nonnull;
35
36import org.apache.log4j.Logger;
37
38/**
39 * @author Piotr Sniegowski
40 */
41@FramsClassAnnotation
42public class RemoteInstance extends AbstractInstance implements JoinableParent {
43
44        private final static Logger log = Logger.getLogger(RemoteInstance.class.getName());
45
46        protected ClientConnection connection;
47
48        protected final Set<Pair<Path, Subscription<?>>> subscriptions = new HashSet<>();
49
50        public Pair<Path, Subscription<?>> getSubscription(Path path) {
51                for (Pair<Path, Subscription<?>> s : subscriptions) {
52                        if (s.first.matches(path)) {
53                                return s;
54                        }
55                }
56                return null;
57        }
58
59        public RemoteInstance() {
60        }
61
62        @ParamAnnotation
63        public void setAddress(String address) {
64                setConnection(new ClientConnection(address));
65        }
66
67        @ParamAnnotation
68        public String getAddress() {
69                return connection == null ? "<disconnected>" : connection.getAddress();
70        }
71
72        protected void onProtocolVersionNegotiated() {
73        }
74
75
76        public void setConnection(final ClientConnection connection) {
77                this.connection = connection;
78                final ExceptionResultHandler failure = new ExceptionResultHandler() {
79                        @Override
80                        public void handle(FramsticksException exception) {
81                                log.fatal("failed to establish connection: ", exception);
82                                // log.fatal("unsupported protocol version!\n minimal version is: " + "\nmanager protocol is: " + connection.getProtocolVersion());
83                                Dispatching.drop(connection, RemoteInstance.this);
84                                fireRun(exception);
85                        }
86                };
87
88                this.connection.setConnectedFunctor(new AbstractStateFunctor(failure) {
89                        @Override
90                        public void call() {
91                                connection.negotiateProtocolVersion(new AbstractStateFunctor(failure) {
92                                        @Override
93                                        public void call() {
94                                                onProtocolVersionNegotiated();
95                                        }
96                                });
97                        }
98                });
99        }
100
101        @Override
102        public String toString() {
103                assert Dispatching.isThreadSafe();
104                return "remote instance " + getName() + "(" + getAddress() + ")";
105        }
106
107        public final ClientConnection getConnection() {
108                return connection;
109        }
110
111        @Override
112        public void fetchValue(final Path path, final ValueParam param, final StateFunctor stateFunctor) {
113                assert isActive();
114                assert param != null;
115                assert path.isResolved();
116                connection.send(new GetRequest().field(param.getId()).path(path.getTextual()), this, new ResponseCallback<Instance>() {
117                        @Override
118                        public void process(Response response) {
119                                assert isActive();
120                                if (!response.getOk()) {
121                                        stateFunctor.handle(new FramsticksException().msg("failed to fetch value").arg("comment", response.getComment()));
122                                        return;
123                                }
124                                try {
125                                        InstanceUtils.processFetchedValues(path, response.getFiles());
126                                        stateFunctor.call();
127                                } catch (FramsticksException ex) {
128                                        stateFunctor.handle(ex);
129                                }
130                        }
131                });
132        }
133
134        protected final Map<String, Set<Future<FramsClass>>> infoRequests = new HashMap<String, Set<Future<FramsClass>>>();
135
136        protected void finishInfoRequest(String id, FramsClass result, FramsticksException e) {
137                assert isActive();
138                Set<Future<FramsClass>> futures = infoRequests.get(id);
139                infoRequests.remove(id);
140                for (Future<FramsClass> f : futures) {
141                        Future.passOrHandle(f, result, e);
142                }
143        }
144
145        @Override
146        public void fetchInfo(final Path path, final Future<FramsClass> future) {
147
148                final String name = path.getTop().getParam().getContainedTypeName();
149
150                if (infoRequests.containsKey(name)) {
151                        infoRequests.get(name).add(future);
152                        return;
153                }
154
155                log.debug("issuing info request for " + name);
156                Set<Future<FramsClass>> futures = new HashSet<Future<FramsClass>>();
157                futures.add(future);
158                infoRequests.put(name, futures);
159
160                //TODO: if the info is in the cache, then don't communicate
161                connection.send(new InfoRequest().path(path.getTextual()), this, new ResponseCallback<Instance>() {
162                        @Override
163                        public void process(Response response) {
164                                assert isActive();
165                                try {
166                                        if (!response.getOk()) {
167                                                throw new FramsticksException().msg("invalid response").arg("comment", response.getComment());
168                                        }
169                                        if (response.getFiles().size() != 1) {
170                                                throw new FramsticksException().msg("invalid number of files in response").arg("files", response.getFiles().size());
171                                        }
172                                        if (!path.isTheSame(response.getFiles().get(0).getPath())) {
173                                                throw new FramsticksException().msg("path mismatch").arg("returned path", response.getFiles().get(0).getPath());
174                                        }
175                                        FramsClass framsClass = InstanceUtils.processFetchedInfo(RemoteInstance.this, response.getFiles().get(0));
176
177                                        CompositeParam thisParam = path.getTop().getParam();
178                                        if (!thisParam.isMatchingContainedName(framsClass.getId())) {
179                                                throw new FramsticksException().msg("framsclass id mismatch").arg("request", thisParam.getContainedTypeName()).arg("fetched", framsClass.getId());
180                                        }
181
182                                        finishInfoRequest(name, framsClass, null);
183                                } catch (FramsticksException e) {
184                                        finishInfoRequest(name, null, e.arg("path", path));
185                                }
186                        }
187                });
188        }
189
190        @Override
191        public void fetchValues(final Path path, final StateFunctor stateFunctor) {
192                assert isActive();
193                assert path.getTop().getObject() != null;
194
195                log.trace("fetching values for " + path);
196                connection.send(new GetRequest().path(path.getTextual()), this, new ResponseCallback<Instance>() {
197                        @Override
198                        public void process(Response response) {
199                                assert isActive();
200                                try {
201                                        if (!response.getOk()) {
202                                                throw new FramsticksException().msg("failed to fetch values").arg("comment", response.getComment());
203                                        }
204                                        InstanceUtils.processFetchedValues(path, response.getFiles());
205                                        stateFunctor.call();
206                                } catch (FramsticksException e) {
207                                        stateFunctor.handle(e);
208                                }
209                        }
210                });
211        }
212
213        @Override
214        public void resolve(final Path path, final Future<Path> future) {
215                InstanceUtils.resolve(path, future);
216        }
217
218        @Override
219        protected void tryRegisterOnChangeEvents(final Path path) {
220                assert isActive();
221                AccessInterface access = InstanceUtils.bindAccess(path);
222                if (!(access instanceof ListAccess)) {
223                        return;
224                }
225
226                assert path.size() >= 2;
227                FramsClass underFramsClass = getInfoFromCache(path.getUnder().getParam().getContainedTypeName());
228
229                EventParam changedEvent;
230                try {
231                        changedEvent = underFramsClass.getParamEntry(path.getTop().getParam().getId() + "_changed", EventParam.class);
232                } catch (FramsticksException e) {
233                        return;
234                }
235
236                log.debug("registering for " + changedEvent);
237                if (getSubscription(path) != null) {
238                        return;
239                }
240
241                final Pair<Path, Subscription<?>> temporary = new Pair<>(path, null);
242                subscriptions.add(temporary);
243
244                connection.subscribe(path.getTextual() + "_changed", this, new SubscriptionCallback<Instance>() {
245                        @Override
246                        public EventCallback subscribed(final Subscription<? super Instance> subscription) {
247                                if (subscription == null) {
248                                        log.error("failed to subscribe for change event for " + path);
249                                        return null;
250                                }
251                                log.debug("subscribed for change event for " + path);
252                                // subscription.setDispatcher(RemoteInstance.this);
253                                RemoteInstance.this.dispatch(new RunAt<Instance>() {
254                                        @Override
255                                        public void run() {
256                                                subscriptions.remove(temporary);
257                                                subscriptions.add(new Pair<Path, Subscription<?>>(path, subscription));
258                                        }
259                                });
260                                return new EventCallback() {
261                                        @Override
262                                        public void call(List<File> files) {
263                                                assert isActive();
264                                                assert files.size() == 1;
265                                                MultiParamLoader loader = new MultiParamLoader();
266                                                loader.setNewSource(files.get(0).getContent());
267                                                loader.addBreakCondition(MultiParamLoader.Status.AfterObject);
268                                                ReflectionAccess access = new ReflectionAccess(ListChange.class, FramsClass.build().forClass(ListChange.class));
269                                                loader.addAccessInterface(access);
270                                                MultiParamLoader.Status status;
271                                                try {
272                                                        while ((status = loader.go()) != MultiParamLoader.Status.Finished) {
273                                                                if (status == MultiParamLoader.Status.AfterObject) {
274                                                                        AccessInterface accessInterface = loader.getLastAccessInterface();
275                                                                        reactToChange(path, (ListChange) accessInterface.getSelected());
276                                                                }
277                                                        }
278                                                } catch (Exception e) {
279                                                        e.printStackTrace();
280                                                }
281                                        }
282                                };
283                        }
284                });
285        }
286
287        protected Future<Path> futureListChanger(final ListChange listChange, final String path) {
288                return new Future<Path>(Logging.logger(log, "failed to " + listChange, path)) {
289                        @Override
290                        protected void result(Path result) {
291                                log.debug(listChange + ": " + result);
292                                fireListChange(result, listChange);
293                        }
294                };
295        }
296
297        protected void reactToChange(final Path path, final ListChange listChange) {
298                assert isActive();
299                log.debug("reacting to change " + listChange + " in " + path);
300                AccessInterface access = InstanceUtils.bindAccess(path);
301                assert access != null;
302
303                if ((listChange.getAction() == ListChange.Action.Modify) && (listChange.getPosition() == -1)) {
304                        final String p = path.getTextual();
305                        InstanceUtils.resolveAndFetch(this, p, futureListChanger(listChange, p));
306                        return;
307                }
308
309                CompositeParam childParam = Casting.tryCast(CompositeParam.class, access.getParam(listChange.getBestIdentifier()));
310                assert childParam != null;
311                switch (listChange.getAction()) {
312                        case Add: {
313                                final String p = path.getTextual() + "/" + childParam.getId();
314                                InstanceUtils.resolveAndFetch(this, p, futureListChanger(listChange, p));
315                                break;
316                        }
317                        case Remove: {
318                                access.set(childParam, null);
319                                fireListChange(path, listChange);
320                                break;
321                        }
322                        case Modify: {
323                                final String p = path.getTextual() + "/" + childParam.getId();
324                                InstanceUtils.resolveAndFetch(this, p, futureListChanger(listChange, p));
325                                break;
326                        }
327                }
328        }
329
330        @Override
331        public void storeValue(final Path path, final ValueParam param, final Object value, StateFunctor stateFunctor) {
332                assert isActive();
333
334                log.trace("storing value " + param + " for " + path);
335                connection.send(new SetRequest().value(value.toString()).field(param.getId()).path(path.getTextual()), this, new StateCallback<Instance>(stateFunctor) {
336                        @Override
337                        protected void callImpl() {
338                                InstanceUtils.bindAccess(path).set((ValueParam) param, value);
339                        }
340                });
341        }
342
343        @Override
344        protected void joinableStart() {
345                Dispatching.use(connection, this);
346                super.joinableStart();
347        }
348
349        @Override
350        protected void joinableInterrupt() {
351                Dispatching.drop(connection, this);
352                super.joinableInterrupt();
353        }
354
355        @Override
356        protected void joinableFinish() {
357                super.joinableFinish();
358
359        }
360
361        @Override
362        public void joinableJoin() throws InterruptedException {
363                Dispatching.join(connection);
364                super.joinableJoin();
365        }
366
367        @Override
368        public void childChangedState(Joinable joinable, JoinableState state) {
369                proceedToState(state);
370        }
371
372        @Override
373        public void call(@Nonnull final Path path, @Nonnull final ProcedureParam procedure, @Nonnull Object[] arguments, final Future<Object> future) {
374                assert isActive();
375                assert path.isResolved();
376
377                //TODO validate arguments type using params
378                connection.send(new CallRequest().procedure(procedure.getId()).arguments(Arrays.asList(arguments)).path(path.getTextual()), this, new ResponseCallback<Instance>() {
379                        @Override
380                        public void process(Response response) {
381                                assert isActive();
382                                try {
383                                        if (!response.getOk()) {
384                                                throw new FramsticksException().msg("failed to call procedure").arg("procedure", procedure).arg("comment", response.getComment());
385                                        }
386                                        // InstanceUtils.processFetchedValues(path, response.getFiles());
387                                        future.pass(null);
388                                } catch (FramsticksException ex) {
389                                        future.handle(ex);
390                                }
391                        }
392                });
393
394        }
395
396        @Override
397        public Path create(Path path) {
398                assert isActive();
399                assert !path.isResolved();
400                Path resolved = path.tryFindResolution();
401                if (!resolved.isResolved()) {
402                        log.debug("creating: " + path);
403                        AccessInterface access = registry.prepareAccess(path.getTop().getParam());
404                        assert access != null;
405                        Object child = access.createAccessee();
406                        assert child != null;
407                        if (path.size() == 1) {
408                                setRoot(new Node(getRoot().getParam(), child));
409                        } else {
410                                bindAccess(this, path.getUnder()).set(path.getTop().getParam(), child);
411                        }
412                        resolved = path.appendResolution(child);
413                }
414                tryRegisterOnChangeEvents(resolved);
415                return resolved;
416        }
417
418}
Note: See TracBrowser for help on using the repository browser.