source: java/main/src/main/java/com/framsticks/remote/RemoteInstance.java @ 85

Last change on this file since 85 was 85, checked in by psniegowski, 11 years ago

HIGHLIGHTS:

  • upgrade to Java 7
    • use try-multi-catch clauses
    • use try-with-resources were appropriate
  • configure FindBugs? (use mvn site and then navigate in browser to the report)
    • remove most bugs found
  • parametrize Dispatching environment (Dispatcher, RunAt?) to enforce more control on the place of closures actual call

CHANGELOG:
Rework FavouritesXMLFactory.

FindBugs?. Thread start.

FindBugs?. Minor change.

FindBugs?. Iterate over entrySet.

FindBugs?. Various.

FindBug?.

FindBug?. Encoding.

FindBug?. Final fields.

FindBug?.

Remove synchronization bug in ClientConnection?.

Experiments with findbugs.

Finish parametrization.

Make RunAt? an abstract class.

More changes in parametrization.

More changes in parametrizing dispatching.

Several changes to parametrize tasks.

Rename Runnable to RunAt?.

Add specific framsticks Runnable.

Add JSR305 (annotations).

Add findbugs reporting.

More improvements to ParamBuilder? wording.

Make FramsClass? accept also ParamBuilder?.

Change wording of ParamBuilder?.

Change wording of Request creation.

Use Java 7 exception catch syntax.

Add ScopeEnd? class.

Upgrade to Java 7.

File size: 13.5 KB
RevLine 
[77]1package com.framsticks.remote;
2
3import com.framsticks.communication.*;
4import com.framsticks.communication.queries.GetRequest;
5import com.framsticks.communication.queries.InfoRequest;
6import com.framsticks.communication.queries.SetRequest;
7import com.framsticks.communication.util.LoggingSubscriptionCallback;
8import com.framsticks.core.ListChange;
9import com.framsticks.core.Path;
10import com.framsticks.params.*;
11import com.framsticks.params.types.EventParam;
12import com.framsticks.parsers.MultiParamLoader;
13import com.framsticks.core.Instance;
14import com.framsticks.util.*;
[84]15import com.framsticks.util.dispatching.Dispatching;
16import com.framsticks.util.dispatching.Future;
17import com.framsticks.util.lang.Casting;
18import com.framsticks.util.lang.Pair;
[85]19import com.framsticks.util.dispatching.RunAt;
[84]20
21import org.apache.commons.configuration.Configuration;
[77]22import org.apache.log4j.Logger;
23
24import java.util.*;
25
26/**
27 * @author Piotr Sniegowski
28 */
29public class RemoteInstance extends Instance {
30
[84]31        private final static Logger log = Logger.getLogger(RemoteInstance.class.getName());
[77]32
[84]33        protected Path simulator;
34        protected ClientConnection connection;
[77]35
[85]36        protected final Set<Pair<Path, Subscription<?>>> subscriptions = new HashSet<>();
[77]37
[85]38        public Pair<Path, Subscription<?>> getSubscription(Path path) {
39                for (Pair<Path, Subscription<?>> s : subscriptions) {
[84]40                        if (s.first.matches(path)) {
41                                return s;
42                        }
43                }
44                return null;
45        }
[77]46
[84]47        @Override
48        public void run() {
49                assert isActive();
50                super.run();
51                connection.connect(new StateFunctor() {
[77]52                        @Override
53                        public void call(Exception e) {
[84]54                                if (e != null) {
55                                        fireRun(e);
56                                        return;
57                                }
[77]58                                connection.negotiateProtocolVersion(new StateFunctor() {
59                                        @Override
60                                        public void call(Exception e) {
61                                                if (e != null) {
[84]62                                                        log.fatal("unsupported protocol version!\n minimal version is: "
[77]63                                                                        + "nmanager protocol is: "
64                                                                        + connection.getProtocolVersion());
65                                                        connection.close();
[84]66                                                        fireRun(e);
[77]67                                                        return;
68                                                }
69
[85]70                                                invokeLater(new RunAt<Instance>() {
[84]71                                                        @Override
72                                                        public void run() {
73                                                                resolveAndFetch("/simulator", new Future<Path>() {
74                                                                        @Override
75                                                                        public void result(Path path, Exception e) {
76                                                                                if (e != null) {
77                                                                                        log.fatal("failed to resolve simulator node");
78                                                                                        fireRun(e);
79                                                                                        return;
80                                                                                }
81                                                                                assert isActive();
82                                                                                simulator = path;
83                                                                                fireRun(null);
84                                                                                log.info("resolved simulator node");
[77]85
[84]86                                                                                EventParam param = getParam(simulator, "running_changed", EventParam.class);
87                                                                                assert param != null;
[85]88                                                                                connection.subscribe(simulator.getTextual() + "/" + param.getId(), RemoteInstance.this, new LoggingSubscriptionCallback<Instance>(log, "server running state change", new EventCallback() {
[84]89                                                                                        @Override
90                                                                                        public void call(List<File> files) {
[85]91                                                                                                invokeLater(new RunAt<Instance>() {
[84]92                                                                                                        @Override
93                                                                                                        public void run() {
94                                                                                                                updateSimulationRunning();
95                                                                                                        }
96                                                                                                });
97                                                                                        }
98                                                                                }));
[85]99                                                                                new PeriodicTask<Instance>(RemoteInstance.this, 1000) {
[84]100                                                                                        @Override
101                                                                                        public void run() {
102                                                                                                updateSimulationRunning();
103                                                                                                again();
104                                                                                        }
105                                                                                };
106                                                                        }
107                                                                });
108                                                        }
109                                                });
[77]110                                        }
111                                });
112                        }
113                });
114        }
115
[84]116        public RemoteInstance() {
117        }
[77]118
[84]119
[77]120        @Override
[84]121        public void configure(Configuration config) {
122                super.configure(config);
123                connection = new ClientConnection(config.getString("address"));
124        }
125
126
127        public void setConnection(ClientConnection connection) {
128                this.connection = connection;
129        }
130
131        @Override
[77]132        public String toString() {
[84]133                assert Dispatching.isThreadSafe();
[77]134                return getConnection().getAddress();
135        }
136
137        public void setRunning(final boolean running) {
[84]138                assert isActive();
139                //simulator.call(simulator.getParam(running ? "start" : "stop", ProcedureParam.class), new LoggingStateCallback(log, (running ? "starting" : "stopping") + " server"));
[77]140        }
141
142        protected final UnaryListenersSet<Boolean> simulationRunningListeners = new UnaryListenersSet<Boolean>();
143
144        protected void updateSimulationRunning() {
[84]145                assert isActive();
146                /*
147                fetchValue(simulator, getParam(simulator, "running", Param.class), new StateFunctor() {
148                        @Override
149                        public void call(Exception e) {
150                                if (e != null) {
151                                        log.fatal("failed to query simulator running status: " + e);
152                                        return;
153                                }
[77]154
[84]155                                invokeLater(new Runnable() {
156                                        @Override
157                                        public void run() {
158                                                boolean value = bindAccess(simulator).get("running", Boolean.class);
159                                                log.trace("server running: " + value);
160                                                simulationRunningListeners.call(value);
161                                        }
162                                });
[77]163
[84]164                        }
165                });
166                */
[77]167        }
168
169        public void addRunningStateListener(UnaryFunctor<Boolean, Boolean> listener) {
[84]170                assert isActive();
171                simulationRunningListeners.add(listener);
[77]172        }
173
174        public void disconnect() {
[84]175                assert isActive();
176                if (connection.isConnected()) {
[77]177                        connection.close();
178                }
179        }
180
[84]181        public final ClientConnection getConnection() {
182                return connection;
183        }
[77]184
[84]185        @Override
186        public void fetchValue(final Path path, final Param param, final StateFunctor stateFunctor) {
187                assert isActive();
188                assert param != null;
189                assert path.isResolved();
[85]190                connection.send(new GetRequest().field(param.getId()).path(path.getTextual()), this, new ResponseCallback<Instance>() {
[84]191                        @Override
192                        public void process(Response response) {
193                                assert isActive();
194                                if (!response.getOk()) {
195                                        stateFunctor.call(new Exception(response.getComment()));
196                                        return;
197                                }
198                                try {
199                                        processFetchedValues(path, response.getFiles());
200                                        stateFunctor.call(null);
201                                } catch (Exception ex) {
202                                        stateFunctor.call(ex);
203                                }
204                        }
205                });
206        }
[77]207
[84]208        protected final Map<String, Set<Future<FramsClass>>> infoRequests = new HashMap<String, Set<Future<FramsClass>>>();
[77]209
[84]210        protected void finishInfoRequest(String id, FramsClass result, Exception e) {
211                assert isActive();
212                Set<Future<FramsClass>> futures = infoRequests.get(id);
213                infoRequests.remove(id);
214                for (Future<FramsClass> f : futures) {
215                        f.result(result, e);
216                }
217        }
[77]218
[84]219        @Override
220        protected void fetchInfo(final Path path, final Future<FramsClass> future) {
[77]221
[84]222                final String name = path.getTop().getParam().getContainedTypeName();
[77]223
[84]224                if (infoRequests.containsKey(name)) {
225                        infoRequests.get(name).add(future);
226                        return;
227                }
[77]228
[84]229                log.debug("issuing info request for " + name);
230                Set<Future<FramsClass>> futures = new HashSet<Future<FramsClass>>();
231                futures.add(future);
232                infoRequests.put(name, futures);
[77]233
[84]234                //TODO: if the info is in the cache, then don't communicate
[85]235                connection.send(new InfoRequest().path(path.getTextual()), this, new ResponseCallback<Instance>() {
[84]236                        @Override
237                        public void process(Response response) {
238                                assert isActive();
239                                if (!response.getOk()) {
240                                        finishInfoRequest(name, null, new Exception(response.getComment()));
241                                        return;
242                                }
[77]243
[84]244                                assert response.getFiles().size() == 1;
245                                assert path.isTheSame(response.getFiles().get(0).getPath());
246                                FramsClass framsClass = processFetchedInfo(response.getFiles().get(0));
[77]247
[84]248                                if (framsClass == null) {
249                                        log.fatal("could not read class info");
250                                        finishInfoRequest(name, null, new Exception("could not read class info"));
251                                        return;
252                                }
253                                CompositeParam thisParam = path.getTop().getParam();
254                                if (!thisParam.isMatchingContainedName(framsClass.getId())) {
255                                        String mismatch = "class name mismatch: param=" + thisParam.getContainedTypeName() + " differs from fetched=" + framsClass.getId();
256                                        log.error(mismatch);
257                                        finishInfoRequest(name, null, new Exception(mismatch));
258                                        return;
259                                }
260                                finishInfoRequest(name, framsClass, null);
261                        }
262                });
263        }
[77]264
[84]265        @Override
266        public void fetchValues(final Path path, final StateFunctor stateFunctor) {
267                assert isActive();
268                assert path.getTop().getObject() != null;
[77]269
[84]270                log.trace("fetching values for " + path);
[85]271                connection.send(new GetRequest().path(path.getTextual()), this, new ResponseCallback<Instance>() {
[84]272                        @Override
273                        public void process(Response response) {
274                                assert isActive();
275                                if (!response.getOk()) {
276                                        stateFunctor.call(new Exception(response.getComment()));
277                                        return;
278                                }
279                                try {
280                                        processFetchedValues(path, response.getFiles());
281                                        stateFunctor.call(null);
282                                } catch (Exception ex) {
283                                        log.error("an exception occurred while loading: " + ex);
284                                        ex.printStackTrace();
285                                        stateFunctor.call(ex);
286                                }
287                        }
288                });
289        }
[77]290
[84]291        @Override
292        public void resolve(final Path path, final Future<Path> future) {
293                assert isActive();
294                if (path.getTop().getObject() != null) {
295                        if (getInfoFromCache(path) != null) {
296                                future.result(path, null);
297                                return;
298                        }
299                        findInfo(path, new Future<FramsClass>() {
300                                @Override
301                                public void result(FramsClass result, Exception e) {
302                                        if (e != null) {
303                                                future.result(null, e);
304                                                return;
305                                        }
306                                        future.result(path, null);
307                                }
308                        });
309                        return;
310                }
311                findInfo(path, new Future<FramsClass>() {
312                        @Override
313                        public void result(FramsClass result, Exception e) {
314                                assert isActive();
315                                if (e != null) {
316                                        future.result(null, e);
317                                        return;
318                                }
319                                assert path.getTop().getParam().isMatchingContainedName(result.getId());
320                                Path p = (path.getTop().getParam().getContainedTypeName() != null ? path : path.tryFindResolution());
321                                future.result(createIfNeeded(p), null);
322                        }
323                });
324        }
[77]325
[84]326        @Override
327        protected void tryRegisterOnChangeEvents(final Path path) {
328                assert isActive();
329                AccessInterface access = bindAccess(path);
330                if (!(access instanceof ListAccess)) {
331                        return;
332                }
[77]333
334
[84]335                assert path.size() >= 2;
336                FramsClass underFramsClass = getInfoFromCache(path.getUnder().getParam().getContainedTypeName());
[77]337
[84]338                EventParam changedEvent = underFramsClass.getParamEntry(path.getTop().getParam().getId() + "_changed", EventParam.class);
339                if (changedEvent == null) {
340                        return;
341                }
[77]342
[84]343                if (getSubscription(path) != null) {
344                        return;
345                }
[77]346
[85]347                final Pair<Path, Subscription<?>> temporary = new Pair<>(path, null);
[84]348                subscriptions.add(temporary);
[77]349
[85]350                connection.subscribe(path.getTextual() + "_changed", this, new SubscriptionCallback<Instance>() {
[84]351                        @Override
[85]352                        public EventCallback subscribed(final Subscription<? super Instance> subscription) {
[84]353                                if (subscription == null) {
354                                        log.error("failed to subscribe for change event for " + path);
355                                        return null;
356                                }
357                                log.debug("subscribed for change event for " + path);
[85]358                                // subscription.setDispatcher(RemoteInstance.this);
359                                RemoteInstance.this.invokeLater(new RunAt<Instance>() {
[84]360                                        @Override
361                                        public void run() {
362                                                subscriptions.remove(temporary);
[85]363                                                subscriptions.add(new Pair<Path, Subscription<?>>(path, subscription));
[84]364                                        }
365                                });
366                                return new EventCallback() {
367                                        @Override
368                                        public void call(List<File> files) {
369                                                assert isActive();
370                                                assert files.size() == 1;
371                                                MultiParamLoader loader = new MultiParamLoader();
372                                                loader.setNewSource(files.get(0).getContent());
373                                                loader.addBreakCondition(MultiParamLoader.Status.AfterObject);
374                                                ReflectionAccess access = new ReflectionAccess(ListChange.class, ListChange.getFramsClass());
375                                                loader.addAccessInterface(access);
376                                                MultiParamLoader.Status status;
377                                                try {
378                                                        while ((status = loader.go()) != MultiParamLoader.Status.Finished) {
379                                                                if (status == MultiParamLoader.Status.AfterObject) {
380                                                                        AccessInterface accessInterface = loader.getLastAccessInterface();
381                                                                        reactToChange(path, (ListChange) accessInterface.getSelected());
382                                                                }
383                                                        }
384                                                } catch (Exception e) {
385                                                        e.printStackTrace();
386                                                }
387                                        }
388                                };
389                        }
390                });
391        }
[77]392
393
[84]394        protected void reactToChange(final Path path, final ListChange listChange) {
395                assert isActive();
396                log.debug("reacting to change " + listChange + " in " + path);
[77]397                AccessInterface access = bindAccess(path);
398                assert access != null;
399
400                if ((listChange.getAction() == ListChange.Action.Modify) && (listChange.getPosition() == -1)) {
401                        final String p = path.getTextual();
402                        resolveAndFetch(p, new Future<Path>() {
403                                @Override
404                                public void result(Path result, Exception e) {
405                                        if (e != null) {
[84]406                                                log.error("failed to modify " + p + ": " + e);
[77]407                                                return;
408                                        }
409                                        fireListChange(path, listChange);
410                                }
411                        });
412                        return;
413                }
414
415
[84]416                CompositeParam childParam = Casting.tryCast(CompositeParam.class, access.getParam(listChange.getBestIdentifier()));
417                assert childParam != null;
418                switch (listChange.getAction()) {
419                        case Add: {
420                                final String p = path.getTextual() + "/" + childParam.getId();
421                                resolveAndFetch(p, new Future<Path>() {
422                                        @Override
423                                        public void result(Path result, Exception e) {
424                                                if (e != null) {
425                                                        log.error("failed to add " + p + ": " + e);
426                                                        return;
427                                                }
428                                                log.debug("added: " + result);
429                                                fireListChange(path, listChange);
430                                        }
431                                });
432                                break;
433                        }
434                        case Remove: {
435                                access.set(childParam, null);
436                                fireListChange(path, listChange);
437                                break;
438                        }
439                        case Modify: {
440                                final String p = path.getTextual() + "/" + childParam.getId();
441                                resolveAndFetch(p, new Future<Path>() {
442                                        @Override
443                                        public void result(Path result, Exception e) {
444                                                if (e != null) {
445                                                        log.error("failed to modify " + p + ": " + e);
446                                                        return;
447                                                }
448                                                fireListChange(path, listChange);
449                                        }
450                                });
451                                break;
452                        }
453                }
454        }
[77]455
[84]456        //TODO ValueParam
[77]457        @Override
458        public void storeValue(final Path path, final Param param, final Object value, final StateFunctor stateFunctor) {
459                assert isActive();
460
[84]461                log.trace("storing value " + param + " for " + path);
[85]462                connection.send(new SetRequest().value(value.toString()).field(param.getId()).path(path.getTextual()), this, new StateCallback<Instance>() {
[77]463                        @Override
464                        public void call(Exception e) {
465                                if (e == null) {
[84]466                                        bindAccess(path).set((ValueParam) param, value);
[77]467                                }
468                                stateFunctor.call(e);
469                        }
470                });
471        }
472}
Note: See TracBrowser for help on using the repository browser.