source: java/main/src/main/java/com/framsticks/remote/RemoteInstance.java @ 88

Last change on this file since 88 was 88, checked in by psniegowski, 11 years ago

HIGHLIGHTS:

  • loading f0 schema with XmlLoader?
  • use XmlLoader? to load configuration
  • introduce unified fork-join model of various entities

(Instances, Connections, GUI Frames, etc.),
all those entities clean up gracefully on
shutdown, which may be initialized by user
or by some entity

  • basing on above, simplify several organizing classes

(Observer, main class)

(to host native frams server process from Java level)

CHANGELOG:
Remove redundant Observer class.

Clean up in AbstractJoinable?.

Update ExternalProcess? class to changes in joining model.

Another sweep through code with FindBugs?.

Find bug with not joining RemoteInstance?.

Joining almost works.

Much improved joining model.

More improvement to joining model.

Add logging messages around joinable operations.

Rename methods in AbstractJoinable?.

Improve Joinable.

Rewrite of entity structure.

More simplifications with entities.

Further improve joinables.

Let Frame compose from JFrame instead of inheriting.

Add join classes.

Improvements of closing.

Add Builder interface.

Add FramsServerTest?.xml

FramsServer? may be configured through xml.

Make Framsticks main class an Observer of Entities.

Make Observer a generic type.

Remove variables regarding to removed endpoint.

Simplify observer (remove endpoints).

More changes to Observer and Endpoint.

Minor improvements.

Add OutputListener? to ExternalProcess?.

Improve testing of ExternalProcess?.

Add ExternalProcess? runner.

Rename the Program class to Framsticks.

Migrate Program to use XmlLoader? configuration.

First steps with configuration using XmlLoader?.

Fix several bugs.

Move all f0 classes to apriopriate package.

XmlLoader? is able to load Schema.

XmlLoader? is loading classes and props.

Add GroupBuilder?.

File size: 14.5 KB
Line 
1package com.framsticks.remote;
2
3import com.framsticks.communication.*;
4import com.framsticks.communication.queries.GetRequest;
5import com.framsticks.communication.queries.InfoRequest;
6import com.framsticks.communication.queries.SetRequest;
7import com.framsticks.communication.util.LoggingSubscriptionCallback;
8import com.framsticks.core.ListChange;
9import com.framsticks.core.Path;
10import com.framsticks.params.*;
11import com.framsticks.params.annotations.FramsClassAnnotation;
12import com.framsticks.params.annotations.ParamAnnotation;
13import com.framsticks.params.types.EventParam;
14import com.framsticks.parsers.MultiParamLoader;
15import com.framsticks.core.Instance;
16import com.framsticks.util.*;
17import com.framsticks.util.dispatching.Dispatching;
18import com.framsticks.util.dispatching.Future;
19import com.framsticks.util.dispatching.Joinable;
20import com.framsticks.util.dispatching.JoinableParent;
21import com.framsticks.util.dispatching.JoinableState;
22import com.framsticks.util.lang.Casting;
23import com.framsticks.util.lang.Pair;
24import com.framsticks.util.dispatching.RunAt;
25
26import java.util.*;
27
28import org.apache.log4j.Logger;
29
30/**
31 * @author Piotr Sniegowski
32 */
33@FramsClassAnnotation
34public class RemoteInstance extends Instance implements JoinableParent {
35
36        private final static Logger log = Logger.getLogger(RemoteInstance.class.getName());
37
38        protected Path simulator;
39        protected ClientConnection connection;
40
41        protected final Set<Pair<Path, Subscription<?>>> subscriptions = new HashSet<>();
42
43        public Pair<Path, Subscription<?>> getSubscription(Path path) {
44                for (Pair<Path, Subscription<?>> s : subscriptions) {
45                        if (s.first.matches(path)) {
46                                return s;
47                        }
48                }
49                return null;
50        }
51
52        public RemoteInstance() {
53        }
54
55        @ParamAnnotation
56        public void setAddress(String address) {
57                setConnection(new ClientConnection(address));
58        }
59
60        @ParamAnnotation
61        public String getAddress() {
62                return connection == null ? "<disconnected>" : connection.getAddress();
63        }
64
65        public void setConnection(final ClientConnection connection) {
66                this.connection = connection;
67                this.connection.setConnectedFunctor(new StateFunctor() {
68                        @Override
69                        public void call(Exception e) {
70                                if (e != null) {
71                                        fireRun(e);
72                                        return;
73                                }
74                                connection.negotiateProtocolVersion(new StateFunctor() {
75                                        @Override
76                                        public void call(Exception e) {
77                                                if (e != null) {
78                                                        log.fatal("unsupported protocol version!\n minimal version is: " + "\nmanager protocol is: " + connection.getProtocolVersion());
79                                                        Dispatching.drop(connection, RemoteInstance.this);
80                                                        fireRun(e);
81                                                        return;
82                                                }
83
84                                                invokeLater(new RunAt<Instance>() {
85                                                        @Override
86                                                        public void run() {
87                                                                resolveAndFetch("/simulator", new Future<Path>() {
88                                                                        @Override
89                                                                        public void result(Path path, Exception e) {
90                                                                                if (e != null) {
91                                                                                        log.fatal("failed to resolve simulator node");
92                                                                                        fireRun(e);
93                                                                                        return;
94                                                                                }
95                                                                                assert isActive();
96                                                                                simulator = path;
97                                                                                fireRun(null);
98                                                                                log.info("resolved simulator node");
99
100                                                                                EventParam param = getParam(simulator, "running_changed", EventParam.class);
101                                                                                assert param != null;
102                                                                                connection.subscribe(simulator.getTextual() + "/" + param.getId(), RemoteInstance.this, new LoggingSubscriptionCallback<Instance>(log, "server running state change", new EventCallback() {
103                                                                                        @Override
104                                                                                        public void call(List<File> files) {
105                                                                                                invokeLater(new RunAt<Instance>() {
106                                                                                                        @Override
107                                                                                                        public void run() {
108                                                                                                                updateSimulationRunning();
109                                                                                                        }
110                                                                                                });
111                                                                                        }
112                                                                                }));
113                                                                                new PeriodicTask<Instance>(RemoteInstance.this, 1000) {
114                                                                                        @Override
115                                                                                        public void run() {
116                                                                                                updateSimulationRunning();
117                                                                                                again();
118                                                                                        }
119                                                                                };
120                                                                        }
121                                                                });
122                                                        }
123                                                });
124                                        }
125                                });
126                        }
127                });
128
129        }
130
131        @Override
132        public String toString() {
133                assert Dispatching.isThreadSafe();
134                return "remote instance " + getName() + "(" + getAddress() + ")";
135        }
136
137        public void setRunning(final boolean running) {
138                assert isActive();
139                //simulator.call(simulator.getParam(running ? "start" : "stop", ProcedureParam.class), new LoggingStateCallback(log, (running ? "starting" : "stopping") + " server"));
140        }
141
142        protected final UnaryListenersSet<Boolean> simulationRunningListeners = new UnaryListenersSet<Boolean>();
143
144        protected void updateSimulationRunning() {
145                assert isActive();
146                /*
147                fetchValue(simulator, getParam(simulator, "running", Param.class), new StateFunctor() {
148                        @Override
149                        public void call(Exception e) {
150                                if (e != null) {
151                                        log.fatal("failed to query simulator running status: " + e);
152                                        return;
153                                }
154
155                                invokeLater(new Runnable() {
156                                        @Override
157                                        public void run() {
158                                                boolean value = bindAccess(simulator).get("running", Boolean.class);
159                                                log.trace("server running: " + value);
160                                                simulationRunningListeners.call(value);
161                                        }
162                                });
163
164                        }
165                });
166                 */
167        }
168
169        public void addRunningStateListener(UnaryFunctor<Boolean, Boolean> listener) {
170                assert isActive();
171                simulationRunningListeners.add(listener);
172        }
173
174        // public void disconnect() {
175        //      assert isActive();
176        //      if (connection.isConnected()) {
177        //              Dispatching.stop(connection, this);
178        //      }
179        // }
180
181        public final ClientConnection getConnection() {
182                return connection;
183        }
184
185        @Override
186        public void fetchValue(final Path path, final Param param, final StateFunctor stateFunctor) {
187                assert isActive();
188                assert param != null;
189                assert path.isResolved();
190                connection.send(new GetRequest().field(param.getId()).path(path.getTextual()), this, new ResponseCallback<Instance>() {
191                        @Override
192                        public void process(Response response) {
193                                assert isActive();
194                                if (!response.getOk()) {
195                                        stateFunctor.call(new Exception(response.getComment()));
196                                        return;
197                                }
198                                try {
199                                        processFetchedValues(path, response.getFiles());
200                                        stateFunctor.call(null);
201                                } catch (Exception ex) {
202                                        stateFunctor.call(ex);
203                                }
204                        }
205                });
206        }
207
208        protected final Map<String, Set<Future<FramsClass>>> infoRequests = new HashMap<String, Set<Future<FramsClass>>>();
209
210        protected void finishInfoRequest(String id, FramsClass result, Exception e) {
211                assert isActive();
212                Set<Future<FramsClass>> futures = infoRequests.get(id);
213                infoRequests.remove(id);
214                for (Future<FramsClass> f : futures) {
215                        f.result(result, e);
216                }
217        }
218
219        @Override
220        protected void fetchInfo(final Path path, final Future<FramsClass> future) {
221
222                final String name = path.getTop().getParam().getContainedTypeName();
223
224                if (infoRequests.containsKey(name)) {
225                        infoRequests.get(name).add(future);
226                        return;
227                }
228
229                log.debug("issuing info request for " + name);
230                Set<Future<FramsClass>> futures = new HashSet<Future<FramsClass>>();
231                futures.add(future);
232                infoRequests.put(name, futures);
233
234                //TODO: if the info is in the cache, then don't communicate
235                connection.send(new InfoRequest().path(path.getTextual()), this, new ResponseCallback<Instance>() {
236                        @Override
237                        public void process(Response response) {
238                                assert isActive();
239                                if (!response.getOk()) {
240                                        finishInfoRequest(name, null, new Exception(response.getComment()));
241                                        return;
242                                }
243
244                                assert response.getFiles().size() == 1;
245                                assert path.isTheSame(response.getFiles().get(0).getPath());
246                                FramsClass framsClass;
247                                try {
248                                        framsClass = processFetchedInfo(response.getFiles().get(0));
249                                } catch (ConstructionException e) {
250                                        log.fatal("could not read class info");
251                                        finishInfoRequest(name, null, new Exception("could not read class info"));
252                                        return;
253                                }
254
255                                CompositeParam thisParam = path.getTop().getParam();
256                                if (!thisParam.isMatchingContainedName(framsClass.getId())) {
257                                        String mismatch = "class name mismatch: param=" + thisParam.getContainedTypeName() + " differs from fetched=" + framsClass.getId();
258                                        log.error(mismatch);
259                                        finishInfoRequest(name, null, new Exception(mismatch));
260                                        return;
261                                }
262                                finishInfoRequest(name, framsClass, null);
263                        }
264                });
265        }
266
267        @Override
268        public void fetchValues(final Path path, final StateFunctor stateFunctor) {
269                assert isActive();
270                assert path.getTop().getObject() != null;
271
272                log.trace("fetching values for " + path);
273                connection.send(new GetRequest().path(path.getTextual()), this, new ResponseCallback<Instance>() {
274                        @Override
275                        public void process(Response response) {
276                                assert isActive();
277                                if (!response.getOk()) {
278                                        stateFunctor.call(new Exception(response.getComment()));
279                                        return;
280                                }
281                                try {
282                                        processFetchedValues(path, response.getFiles());
283                                        stateFunctor.call(null);
284                                } catch (Exception ex) {
285                                        log.error("an exception occurred while loading: " + ex);
286                                        ex.printStackTrace();
287                                        stateFunctor.call(ex);
288                                }
289                        }
290                });
291        }
292
293        @Override
294        public void resolve(final Path path, final Future<Path> future) {
295                assert isActive();
296                if (path.getTop().getObject() != null) {
297                        if (getInfoFromCache(path) != null) {
298                                future.result(path, null);
299                                return;
300                        }
301                        findInfo(path, new Future<FramsClass>() {
302                                @Override
303                                public void result(FramsClass result, Exception e) {
304                                        if (e != null) {
305                                                future.result(null, e);
306                                                return;
307                                        }
308                                        future.result(path, null);
309                                }
310                        });
311                        return;
312                }
313                findInfo(path, new Future<FramsClass>() {
314                        @Override
315                        public void result(FramsClass result, Exception e) {
316                                assert isActive();
317                                if (e != null) {
318                                        future.result(null, e);
319                                        return;
320                                }
321                                assert path.getTop().getParam().isMatchingContainedName(result.getId());
322                                Path p = (path.getTop().getParam().getContainedTypeName() != null ? path : path.tryFindResolution());
323                                future.result(createIfNeeded(p), null);
324                        }
325                });
326        }
327
328        @Override
329        protected void tryRegisterOnChangeEvents(final Path path) {
330                assert isActive();
331                AccessInterface access = bindAccess(path);
332                if (!(access instanceof ListAccess)) {
333                        return;
334                }
335
336                assert path.size() >= 2;
337                FramsClass underFramsClass = getInfoFromCache(path.getUnder().getParam().getContainedTypeName());
338
339                EventParam changedEvent;
340                try {
341                        changedEvent = underFramsClass.getParamEntry(path.getTop().getParam().getId() + "_changed", EventParam.class);
342                } catch (FramsticksException e) {
343                        return;
344                }
345
346                log.debug("registering for " + changedEvent);
347                if (getSubscription(path) != null) {
348                        return;
349                }
350
351                final Pair<Path, Subscription<?>> temporary = new Pair<>(path, null);
352                subscriptions.add(temporary);
353
354                connection.subscribe(path.getTextual() + "_changed", this, new SubscriptionCallback<Instance>() {
355                        @Override
356                        public EventCallback subscribed(final Subscription<? super Instance> subscription) {
357                                if (subscription == null) {
358                                        log.error("failed to subscribe for change event for " + path);
359                                        return null;
360                                }
361                                log.debug("subscribed for change event for " + path);
362                                // subscription.setDispatcher(RemoteInstance.this);
363                                RemoteInstance.this.invokeLater(new RunAt<Instance>() {
364                                        @Override
365                                        public void run() {
366                                                subscriptions.remove(temporary);
367                                                subscriptions.add(new Pair<Path, Subscription<?>>(path, subscription));
368                                        }
369                                });
370                                return new EventCallback() {
371                                        @Override
372                                        public void call(List<File> files) {
373                                                assert isActive();
374                                                assert files.size() == 1;
375                                                MultiParamLoader loader = new MultiParamLoader();
376                                                loader.setNewSource(files.get(0).getContent());
377                                                loader.addBreakCondition(MultiParamLoader.Status.AfterObject);
378                                                ReflectionAccess access = new ReflectionAccess(ListChange.class, FramsClass.build().forClass(ListChange.class));
379                                                loader.addAccessInterface(access);
380                                                MultiParamLoader.Status status;
381                                                try {
382                                                        while ((status = loader.go()) != MultiParamLoader.Status.Finished) {
383                                                                if (status == MultiParamLoader.Status.AfterObject) {
384                                                                        AccessInterface accessInterface = loader.getLastAccessInterface();
385                                                                        reactToChange(path, (ListChange) accessInterface.getSelected());
386                                                                }
387                                                        }
388                                                } catch (Exception e) {
389                                                        e.printStackTrace();
390                                                }
391                                        }
392                                };
393                        }
394                });
395        }
396
397        protected void reactToChange(final Path path, final ListChange listChange) {
398                assert isActive();
399                log.debug("reacting to change " + listChange + " in " + path);
400                AccessInterface access = bindAccess(path);
401                assert access != null;
402
403                if ((listChange.getAction() == ListChange.Action.Modify) && (listChange.getPosition() == -1)) {
404                        final String p = path.getTextual();
405                        resolveAndFetch(p, new Future<Path>() {
406                                @Override
407                                public void result(Path result, Exception e) {
408                                        if (e != null) {
409                                                log.error("failed to modify " + p + ": " + e);
410                                                return;
411                                        }
412                                        fireListChange(path, listChange);
413                                }
414                        });
415                        return;
416                }
417
418                CompositeParam childParam = Casting.tryCast(CompositeParam.class, access.getParam(listChange.getBestIdentifier()));
419                assert childParam != null;
420                switch (listChange.getAction()) {
421                case Add: {
422                        final String p = path.getTextual() + "/" + childParam.getId();
423                        resolveAndFetch(p, new Future<Path>() {
424                                @Override
425                                public void result(Path result, Exception e) {
426                                        if (e != null) {
427                                                log.error("failed to add " + p + ": " + e);
428                                                return;
429                                        }
430                                        log.debug("added: " + result);
431                                        fireListChange(path, listChange);
432                                }
433                        });
434                        break;
435                }
436                case Remove: {
437                        access.set(childParam, null);
438                        fireListChange(path, listChange);
439                        break;
440                }
441                case Modify: {
442                        final String p = path.getTextual() + "/" + childParam.getId();
443                        resolveAndFetch(p, new Future<Path>() {
444                                @Override
445                                public void result(Path result, Exception e) {
446                                        if (e != null) {
447                                                log.error("failed to modify " + p + ": " + e);
448                                                return;
449                                        }
450                                        fireListChange(path, listChange);
451                                }
452                        });
453                        break;
454                }
455                }
456        }
457
458        //TODO ValueParam
459        @Override
460        public void storeValue(final Path path, final Param param, final Object value, final StateFunctor stateFunctor) {
461                assert isActive();
462
463                log.trace("storing value " + param + " for " + path);
464                connection.send(new SetRequest().value(value.toString()).field(param.getId()).path(path.getTextual()), this, new StateCallback<Instance>() {
465                        @Override
466                        public void call(Exception e) {
467                                if (e == null) {
468                                        bindAccess(path).set((ValueParam) param, value);
469                                }
470                                stateFunctor.call(e);
471                        }
472                });
473        }
474
475        @Override
476        protected void joinableStart() {
477                Dispatching.use(connection, this);
478                super.joinableStart();
479        }
480
481        @Override
482        protected void joinableInterrupt() {
483                Dispatching.drop(connection, this);
484                super.joinableInterrupt();
485        }
486
487        @Override
488        protected void joinableFinish() {
489                super.joinableFinish();
490
491        }
492
493        @Override
494        public void joinableJoin() throws InterruptedException {
495                Dispatching.join(connection);
496                super.joinableJoin();
497        }
498
499
500        @Override
501        public void childChangedState(Joinable joinable, JoinableState state) {
502                proceedToState(state);
503        }
504
505
506}
Note: See TracBrowser for help on using the repository browser.