package com.framsticks.remote; import com.framsticks.communication.*; import com.framsticks.communication.queries.GetRequest; import com.framsticks.communication.queries.InfoRequest; import com.framsticks.communication.queries.SetRequest; import com.framsticks.communication.util.LoggingSubscriptionCallback; import com.framsticks.core.ListChange; import com.framsticks.core.Path; import com.framsticks.params.*; import com.framsticks.params.types.EventParam; import com.framsticks.parsers.MultiParamLoader; import com.framsticks.core.Instance; import com.framsticks.util.*; import com.framsticks.util.dispatching.Dispatching; import com.framsticks.util.dispatching.Future; import com.framsticks.util.lang.Casting; import com.framsticks.util.lang.Pair; import com.framsticks.util.dispatching.RunAt; import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import java.util.*; /** * @author Piotr Sniegowski */ public class RemoteInstance extends Instance { private final static Logger log = Logger.getLogger(RemoteInstance.class.getName()); protected Path simulator; protected ClientConnection connection; protected final Set>> subscriptions = new HashSet<>(); public Pair> getSubscription(Path path) { for (Pair> s : subscriptions) { if (s.first.matches(path)) { return s; } } return null; } @Override public void run() { assert isActive(); super.run(); connection.connect(new StateFunctor() { @Override public void call(Exception e) { if (e != null) { fireRun(e); return; } connection.negotiateProtocolVersion(new StateFunctor() { @Override public void call(Exception e) { if (e != null) { log.fatal("unsupported protocol version!\n minimal version is: " + "nmanager protocol is: " + connection.getProtocolVersion()); connection.close(); fireRun(e); return; } invokeLater(new RunAt() { @Override public void run() { resolveAndFetch("/simulator", new Future() { @Override public void result(Path path, Exception e) { if (e != null) { log.fatal("failed to resolve simulator node"); fireRun(e); return; } assert isActive(); simulator = path; fireRun(null); log.info("resolved simulator node"); EventParam param = getParam(simulator, "running_changed", EventParam.class); assert param != null; connection.subscribe(simulator.getTextual() + "/" + param.getId(), RemoteInstance.this, new LoggingSubscriptionCallback(log, "server running state change", new EventCallback() { @Override public void call(List files) { invokeLater(new RunAt() { @Override public void run() { updateSimulationRunning(); } }); } })); new PeriodicTask(RemoteInstance.this, 1000) { @Override public void run() { updateSimulationRunning(); again(); } }; } }); } }); } }); } }); } public RemoteInstance() { } @Override public void configure(Configuration config) { super.configure(config); connection = new ClientConnection(config.getString("address")); } public void setConnection(ClientConnection connection) { this.connection = connection; } @Override public String toString() { assert Dispatching.isThreadSafe(); return getConnection().getAddress(); } public void setRunning(final boolean running) { assert isActive(); //simulator.call(simulator.getParam(running ? "start" : "stop", ProcedureParam.class), new LoggingStateCallback(log, (running ? "starting" : "stopping") + " server")); } protected final UnaryListenersSet simulationRunningListeners = new UnaryListenersSet(); protected void updateSimulationRunning() { assert isActive(); /* fetchValue(simulator, getParam(simulator, "running", Param.class), new StateFunctor() { @Override public void call(Exception e) { if (e != null) { log.fatal("failed to query simulator running status: " + e); return; } invokeLater(new Runnable() { @Override public void run() { boolean value = bindAccess(simulator).get("running", Boolean.class); log.trace("server running: " + value); simulationRunningListeners.call(value); } }); } }); */ } public void addRunningStateListener(UnaryFunctor listener) { assert isActive(); simulationRunningListeners.add(listener); } public void disconnect() { assert isActive(); if (connection.isConnected()) { connection.close(); } } public final ClientConnection getConnection() { return connection; } @Override public void fetchValue(final Path path, final Param param, final StateFunctor stateFunctor) { assert isActive(); assert param != null; assert path.isResolved(); connection.send(new GetRequest().field(param.getId()).path(path.getTextual()), this, new ResponseCallback() { @Override public void process(Response response) { assert isActive(); if (!response.getOk()) { stateFunctor.call(new Exception(response.getComment())); return; } try { processFetchedValues(path, response.getFiles()); stateFunctor.call(null); } catch (Exception ex) { stateFunctor.call(ex); } } }); } protected final Map>> infoRequests = new HashMap>>(); protected void finishInfoRequest(String id, FramsClass result, Exception e) { assert isActive(); Set> futures = infoRequests.get(id); infoRequests.remove(id); for (Future f : futures) { f.result(result, e); } } @Override protected void fetchInfo(final Path path, final Future future) { final String name = path.getTop().getParam().getContainedTypeName(); if (infoRequests.containsKey(name)) { infoRequests.get(name).add(future); return; } log.debug("issuing info request for " + name); Set> futures = new HashSet>(); futures.add(future); infoRequests.put(name, futures); //TODO: if the info is in the cache, then don't communicate connection.send(new InfoRequest().path(path.getTextual()), this, new ResponseCallback() { @Override public void process(Response response) { assert isActive(); if (!response.getOk()) { finishInfoRequest(name, null, new Exception(response.getComment())); return; } assert response.getFiles().size() == 1; assert path.isTheSame(response.getFiles().get(0).getPath()); FramsClass framsClass = processFetchedInfo(response.getFiles().get(0)); if (framsClass == null) { log.fatal("could not read class info"); finishInfoRequest(name, null, new Exception("could not read class info")); return; } CompositeParam thisParam = path.getTop().getParam(); if (!thisParam.isMatchingContainedName(framsClass.getId())) { String mismatch = "class name mismatch: param=" + thisParam.getContainedTypeName() + " differs from fetched=" + framsClass.getId(); log.error(mismatch); finishInfoRequest(name, null, new Exception(mismatch)); return; } finishInfoRequest(name, framsClass, null); } }); } @Override public void fetchValues(final Path path, final StateFunctor stateFunctor) { assert isActive(); assert path.getTop().getObject() != null; log.trace("fetching values for " + path); connection.send(new GetRequest().path(path.getTextual()), this, new ResponseCallback() { @Override public void process(Response response) { assert isActive(); if (!response.getOk()) { stateFunctor.call(new Exception(response.getComment())); return; } try { processFetchedValues(path, response.getFiles()); stateFunctor.call(null); } catch (Exception ex) { log.error("an exception occurred while loading: " + ex); ex.printStackTrace(); stateFunctor.call(ex); } } }); } @Override public void resolve(final Path path, final Future future) { assert isActive(); if (path.getTop().getObject() != null) { if (getInfoFromCache(path) != null) { future.result(path, null); return; } findInfo(path, new Future() { @Override public void result(FramsClass result, Exception e) { if (e != null) { future.result(null, e); return; } future.result(path, null); } }); return; } findInfo(path, new Future() { @Override public void result(FramsClass result, Exception e) { assert isActive(); if (e != null) { future.result(null, e); return; } assert path.getTop().getParam().isMatchingContainedName(result.getId()); Path p = (path.getTop().getParam().getContainedTypeName() != null ? path : path.tryFindResolution()); future.result(createIfNeeded(p), null); } }); } @Override protected void tryRegisterOnChangeEvents(final Path path) { assert isActive(); AccessInterface access = bindAccess(path); if (!(access instanceof ListAccess)) { return; } assert path.size() >= 2; FramsClass underFramsClass = getInfoFromCache(path.getUnder().getParam().getContainedTypeName()); EventParam changedEvent = underFramsClass.getParamEntry(path.getTop().getParam().getId() + "_changed", EventParam.class); if (changedEvent == null) { return; } if (getSubscription(path) != null) { return; } final Pair> temporary = new Pair<>(path, null); subscriptions.add(temporary); connection.subscribe(path.getTextual() + "_changed", this, new SubscriptionCallback() { @Override public EventCallback subscribed(final Subscription subscription) { if (subscription == null) { log.error("failed to subscribe for change event for " + path); return null; } log.debug("subscribed for change event for " + path); // subscription.setDispatcher(RemoteInstance.this); RemoteInstance.this.invokeLater(new RunAt() { @Override public void run() { subscriptions.remove(temporary); subscriptions.add(new Pair>(path, subscription)); } }); return new EventCallback() { @Override public void call(List files) { assert isActive(); assert files.size() == 1; MultiParamLoader loader = new MultiParamLoader(); loader.setNewSource(files.get(0).getContent()); loader.addBreakCondition(MultiParamLoader.Status.AfterObject); ReflectionAccess access = new ReflectionAccess(ListChange.class, ListChange.getFramsClass()); loader.addAccessInterface(access); MultiParamLoader.Status status; try { while ((status = loader.go()) != MultiParamLoader.Status.Finished) { if (status == MultiParamLoader.Status.AfterObject) { AccessInterface accessInterface = loader.getLastAccessInterface(); reactToChange(path, (ListChange) accessInterface.getSelected()); } } } catch (Exception e) { e.printStackTrace(); } } }; } }); } protected void reactToChange(final Path path, final ListChange listChange) { assert isActive(); log.debug("reacting to change " + listChange + " in " + path); AccessInterface access = bindAccess(path); assert access != null; if ((listChange.getAction() == ListChange.Action.Modify) && (listChange.getPosition() == -1)) { final String p = path.getTextual(); resolveAndFetch(p, new Future() { @Override public void result(Path result, Exception e) { if (e != null) { log.error("failed to modify " + p + ": " + e); return; } fireListChange(path, listChange); } }); return; } CompositeParam childParam = Casting.tryCast(CompositeParam.class, access.getParam(listChange.getBestIdentifier())); assert childParam != null; switch (listChange.getAction()) { case Add: { final String p = path.getTextual() + "/" + childParam.getId(); resolveAndFetch(p, new Future() { @Override public void result(Path result, Exception e) { if (e != null) { log.error("failed to add " + p + ": " + e); return; } log.debug("added: " + result); fireListChange(path, listChange); } }); break; } case Remove: { access.set(childParam, null); fireListChange(path, listChange); break; } case Modify: { final String p = path.getTextual() + "/" + childParam.getId(); resolveAndFetch(p, new Future() { @Override public void result(Path result, Exception e) { if (e != null) { log.error("failed to modify " + p + ": " + e); return; } fireListChange(path, listChange); } }); break; } } } //TODO ValueParam @Override public void storeValue(final Path path, final Param param, final Object value, final StateFunctor stateFunctor) { assert isActive(); log.trace("storing value " + param + " for " + path); connection.send(new SetRequest().value(value.toString()).field(param.getId()).path(path.getTextual()), this, new StateCallback() { @Override public void call(Exception e) { if (e == null) { bindAccess(path).set((ValueParam) param, value); } stateFunctor.call(e); } }); } }