- Timestamp:
- 07/04/13 20:29:50 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
java/main/src/main/java/com/framsticks/remote/RemoteInstance.java
r90 r96 2 2 3 3 import com.framsticks.communication.*; 4 import com.framsticks.communication.queries.CallRequest; 4 5 import com.framsticks.communication.queries.GetRequest; 5 6 import com.framsticks.communication.queries.InfoRequest; 6 7 import com.framsticks.communication.queries.SetRequest; 7 import com.framsticks.communication.util.LoggingSubscriptionCallback; 8 import com.framsticks.core.AbstractInstance; 9 import com.framsticks.core.InstanceUtils; 8 10 import com.framsticks.core.ListChange; 11 import com.framsticks.core.Node; 9 12 import com.framsticks.core.Path; 10 13 import com.framsticks.params.*; … … 17 20 import com.framsticks.util.*; 18 21 import com.framsticks.util.dispatching.Dispatching; 22 import com.framsticks.util.dispatching.ExceptionResultHandler; 19 23 import com.framsticks.util.dispatching.Future; 20 24 import com.framsticks.util.dispatching.Joinable; … … 24 28 import com.framsticks.util.lang.Pair; 25 29 import com.framsticks.util.dispatching.RunAt; 30 import static com.framsticks.core.InstanceUtils.*; 26 31 27 32 import java.util.*; 33 34 import javax.annotation.Nonnull; 28 35 29 36 import org.apache.log4j.Logger; … … 33 40 */ 34 41 @FramsClassAnnotation 35 public class RemoteInstance extends Instance implements JoinableParent {42 public class RemoteInstance extends AbstractInstance implements JoinableParent { 36 43 37 44 private final static Logger log = Logger.getLogger(RemoteInstance.class.getName()); 38 45 39 protected Path simulator;40 46 protected ClientConnection connection; 41 47 … … 64 70 } 65 71 72 protected void onProtocolVersionNegotiated() { 73 } 74 75 66 76 public void setConnection(final ClientConnection connection) { 67 77 this.connection = connection; 68 this.connection.setConnectedFunctor(new StateFunctor() { 69 @Override 70 public void call(Exception e) { 71 if (e != null) { 72 fireRun(e); 73 return; 74 } 75 connection.negotiateProtocolVersion(new StateFunctor() { 78 final ExceptionResultHandler failure = new ExceptionResultHandler() { 79 @Override 80 public void handle(FramsticksException exception) { 81 log.fatal("failed to establish connection: ", exception); 82 // log.fatal("unsupported protocol version!\n minimal version is: " + "\nmanager protocol is: " + connection.getProtocolVersion()); 83 Dispatching.drop(connection, RemoteInstance.this); 84 fireRun(exception); 85 } 86 }; 87 88 this.connection.setConnectedFunctor(new AbstractStateFunctor(failure) { 89 @Override 90 public void call() { 91 connection.negotiateProtocolVersion(new AbstractStateFunctor(failure) { 76 92 @Override 77 public void call(Exception e) { 78 if (e != null) { 79 log.fatal("unsupported protocol version!\n minimal version is: " + "\nmanager protocol is: " + connection.getProtocolVersion()); 80 Dispatching.drop(connection, RemoteInstance.this); 81 fireRun(e); 82 return; 83 } 84 85 dispatch(new RunAt<Instance>() { 86 @Override 87 public void run() { 88 resolveAndFetch("/simulator", new Future<Path>() { 89 @Override 90 public void result(Path path, Exception e) { 91 if (e != null) { 92 log.fatal("failed to resolve simulator node"); 93 fireRun(e); 94 return; 95 } 96 assert isActive(); 97 simulator = path; 98 fireRun(null); 99 log.info("resolved simulator node"); 100 101 EventParam param = getParam(simulator, "running_changed", EventParam.class); 102 assert param != null; 103 connection.subscribe(simulator.getTextual() + "/" + param.getId(), RemoteInstance.this, new LoggingSubscriptionCallback<Instance>(log, "server running state change", new EventCallback() { 104 @Override 105 public void call(List<File> files) { 106 dispatch(new RunAt<Instance>() { 107 @Override 108 public void run() { 109 updateSimulationRunning(); 110 } 111 }); 112 } 113 })); 114 new PeriodicTask<Instance>(RemoteInstance.this, 1000) { 115 @Override 116 public void run() { 117 updateSimulationRunning(); 118 again(); 119 } 120 }; 121 } 122 }); 123 } 124 }); 93 public void call() { 94 onProtocolVersionNegotiated(); 125 95 } 126 96 }); 127 97 } 128 98 }); 129 130 99 } 131 100 … … 136 105 } 137 106 138 public void setRunning(final boolean running) {139 assert isActive();140 //simulator.call(simulator.getParam(running ? "start" : "stop", ProcedureParam.class), new LoggingStateCallback(log, (running ? "starting" : "stopping") + " server"));141 }142 143 protected final UnaryListenersSet<Boolean> simulationRunningListeners = new UnaryListenersSet<Boolean>();144 145 protected void updateSimulationRunning() {146 assert isActive();147 /*148 fetchValue(simulator, getParam(simulator, "running", Param.class), new StateFunctor() {149 @Override150 public void call(Exception e) {151 if (e != null) {152 log.fatal("failed to query simulator running status: " + e);153 return;154 }155 156 invokeLater(new Runnable() {157 @Override158 public void run() {159 boolean value = bindAccess(simulator).get("running", Boolean.class);160 log.trace("server running: " + value);161 simulationRunningListeners.call(value);162 }163 });164 165 }166 });167 */168 }169 170 public void addRunningStateListener(UnaryFunctor<Boolean, Boolean> listener) {171 assert isActive();172 simulationRunningListeners.add(listener);173 }174 175 // public void disconnect() {176 // assert isActive();177 // if (connection.isConnected()) {178 // Dispatching.stop(connection, this);179 // }180 // }181 182 107 public final ClientConnection getConnection() { 183 108 return connection; … … 185 110 186 111 @Override 187 public void fetchValue(final Path path, final Param param, final StateFunctor stateFunctor) {112 public void fetchValue(final Path path, final ValueParam param, final StateFunctor stateFunctor) { 188 113 assert isActive(); 189 114 assert param != null; … … 194 119 assert isActive(); 195 120 if (!response.getOk()) { 196 stateFunctor. call(new Exception(response.getComment()));121 stateFunctor.handle(new FramsticksException().msg("failed to fetch value").arg("comment", response.getComment())); 197 122 return; 198 123 } 199 124 try { 200 processFetchedValues(path, response.getFiles());201 stateFunctor.call( null);202 } catch ( Exception ex) {203 stateFunctor. call(ex);125 InstanceUtils.processFetchedValues(path, response.getFiles()); 126 stateFunctor.call(); 127 } catch (FramsticksException ex) { 128 stateFunctor.handle(ex); 204 129 } 205 130 } … … 209 134 protected final Map<String, Set<Future<FramsClass>>> infoRequests = new HashMap<String, Set<Future<FramsClass>>>(); 210 135 211 protected void finishInfoRequest(String id, FramsClass result, Exception e) {136 protected void finishInfoRequest(String id, FramsClass result, FramsticksException e) { 212 137 assert isActive(); 213 138 Set<Future<FramsClass>> futures = infoRequests.get(id); 214 139 infoRequests.remove(id); 215 140 for (Future<FramsClass> f : futures) { 216 f.result(result, e);217 } 218 } 219 220 @Override 221 p rotectedvoid fetchInfo(final Path path, final Future<FramsClass> future) {141 Future.passOrHandle(f, result, e); 142 } 143 } 144 145 @Override 146 public void fetchInfo(final Path path, final Future<FramsClass> future) { 222 147 223 148 final String name = path.getTop().getParam().getContainedTypeName(); … … 238 163 public void process(Response response) { 239 164 assert isActive(); 240 if (!response.getOk()) {241 finishInfoRequest(name, null, new Exception(response.getComment()));242 return;243 }244 245 assert response.getFiles().size() == 1;246 assert path.isTheSame(response.getFiles().get(0).getPath());247 FramsClass framsClass;248 165 try { 249 framsClass = processFetchedInfo(response.getFiles().get(0)); 250 } catch (ConstructionException e) { 251 log.fatal("could not read class info"); 252 finishInfoRequest(name, null, new Exception("could not read class info")); 253 return; 254 } 255 256 CompositeParam thisParam = path.getTop().getParam(); 257 if (!thisParam.isMatchingContainedName(framsClass.getId())) { 258 String mismatch = "class name mismatch: param=" + thisParam.getContainedTypeName() + " differs from fetched=" + framsClass.getId(); 259 log.error(mismatch); 260 finishInfoRequest(name, null, new Exception(mismatch)); 261 return; 262 } 263 finishInfoRequest(name, framsClass, null); 166 if (!response.getOk()) { 167 throw new FramsticksException().msg("invalid response").arg("comment", response.getComment()); 168 } 169 if (response.getFiles().size() != 1) { 170 throw new FramsticksException().msg("invalid number of files in response").arg("files", response.getFiles().size()); 171 } 172 if (!path.isTheSame(response.getFiles().get(0).getPath())) { 173 throw new FramsticksException().msg("path mismatch").arg("returned path", response.getFiles().get(0).getPath()); 174 } 175 FramsClass framsClass = InstanceUtils.processFetchedInfo(RemoteInstance.this, response.getFiles().get(0)); 176 177 CompositeParam thisParam = path.getTop().getParam(); 178 if (!thisParam.isMatchingContainedName(framsClass.getId())) { 179 throw new FramsticksException().msg("framsclass id mismatch").arg("request", thisParam.getContainedTypeName()).arg("fetched", framsClass.getId()); 180 } 181 182 finishInfoRequest(name, framsClass, null); 183 } catch (FramsticksException e) { 184 finishInfoRequest(name, null, e.arg("path", path)); 185 } 264 186 } 265 187 }); … … 276 198 public void process(Response response) { 277 199 assert isActive(); 278 if (!response.getOk()) {279 stateFunctor.call(new Exception(response.getComment()));280 return;281 }282 200 try { 283 processFetchedValues(path, response.getFiles()); 284 stateFunctor.call(null); 285 } catch (Exception ex) { 286 log.error("an exception occurred while loading: " + ex); 287 ex.printStackTrace(); 288 stateFunctor.call(ex); 201 if (!response.getOk()) { 202 throw new FramsticksException().msg("failed to fetch values").arg("comment", response.getComment()); 203 } 204 InstanceUtils.processFetchedValues(path, response.getFiles()); 205 stateFunctor.call(); 206 } catch (FramsticksException e) { 207 stateFunctor.handle(e); 289 208 } 290 209 } … … 294 213 @Override 295 214 public void resolve(final Path path, final Future<Path> future) { 296 assert isActive(); 297 if (path.getTop().getObject() != null) { 298 if (getInfoFromCache(path) != null) { 299 future.result(path, null); 300 return; 301 } 302 findInfo(path, new Future<FramsClass>() { 303 @Override 304 public void result(FramsClass result, Exception e) { 305 if (e != null) { 306 future.result(null, e); 307 return; 308 } 309 future.result(path, null); 310 } 311 }); 312 return; 313 } 314 findInfo(path, new Future<FramsClass>() { 315 @Override 316 public void result(FramsClass result, Exception e) { 317 assert isActive(); 318 if (e != null) { 319 future.result(null, e); 320 return; 321 } 322 assert path.getTop().getParam().isMatchingContainedName(result.getId()); 323 Path p = (path.getTop().getParam().getContainedTypeName() != null ? path : path.tryFindResolution()); 324 future.result(createIfNeeded(p), null); 325 } 326 }); 215 InstanceUtils.resolve(path, future); 327 216 } 328 217 … … 330 219 protected void tryRegisterOnChangeEvents(final Path path) { 331 220 assert isActive(); 332 AccessInterface access = bindAccess(path);221 AccessInterface access = InstanceUtils.bindAccess(path); 333 222 if (!(access instanceof ListAccess)) { 334 223 return; … … 396 285 } 397 286 287 protected Future<Path> futureListChanger(final ListChange listChange, final String path) { 288 return new Future<Path>(Logging.logger(log, "failed to " + listChange, path)) { 289 @Override 290 protected void result(Path result) { 291 log.debug(listChange + ": " + result); 292 fireListChange(result, listChange); 293 } 294 }; 295 } 296 398 297 protected void reactToChange(final Path path, final ListChange listChange) { 399 298 assert isActive(); 400 299 log.debug("reacting to change " + listChange + " in " + path); 401 AccessInterface access = bindAccess(path);300 AccessInterface access = InstanceUtils.bindAccess(path); 402 301 assert access != null; 403 302 404 303 if ((listChange.getAction() == ListChange.Action.Modify) && (listChange.getPosition() == -1)) { 405 304 final String p = path.getTextual(); 406 resolveAndFetch(p, new Future<Path>() { 407 @Override 408 public void result(Path result, Exception e) { 409 if (e != null) { 410 log.error("failed to modify " + p + ": " + e); 411 return; 412 } 413 fireListChange(path, listChange); 414 } 415 }); 305 InstanceUtils.resolveAndFetch(this, p, futureListChanger(listChange, p)); 416 306 return; 417 307 } … … 420 310 assert childParam != null; 421 311 switch (listChange.getAction()) { 422 case Add: { 423 final String p = path.getTextual() + "/" + childParam.getId(); 424 resolveAndFetch(p, new Future<Path>() { 425 @Override 426 public void result(Path result, Exception e) { 427 if (e != null) { 428 log.error("failed to add " + p + ": " + e); 429 return; 430 } 431 log.debug("added: " + result); 432 fireListChange(path, listChange); 433 } 434 }); 435 break; 436 } 437 case Remove: { 438 access.set(childParam, null); 439 fireListChange(path, listChange); 440 break; 441 } 442 case Modify: { 443 final String p = path.getTextual() + "/" + childParam.getId(); 444 resolveAndFetch(p, new Future<Path>() { 445 @Override 446 public void result(Path result, Exception e) { 447 if (e != null) { 448 log.error("failed to modify " + p + ": " + e); 449 return; 450 } 451 fireListChange(path, listChange); 452 } 453 }); 454 break; 455 } 456 } 457 } 458 459 //TODO ValueParam 460 @Override 461 public void storeValue(final Path path, final Param param, final Object value, final StateFunctor stateFunctor) { 312 case Add: { 313 final String p = path.getTextual() + "/" + childParam.getId(); 314 InstanceUtils.resolveAndFetch(this, p, futureListChanger(listChange, p)); 315 break; 316 } 317 case Remove: { 318 access.set(childParam, null); 319 fireListChange(path, listChange); 320 break; 321 } 322 case Modify: { 323 final String p = path.getTextual() + "/" + childParam.getId(); 324 InstanceUtils.resolveAndFetch(this, p, futureListChanger(listChange, p)); 325 break; 326 } 327 } 328 } 329 330 @Override 331 public void storeValue(final Path path, final ValueParam param, final Object value, StateFunctor stateFunctor) { 462 332 assert isActive(); 463 333 464 334 log.trace("storing value " + param + " for " + path); 465 connection.send(new SetRequest().value(value.toString()).field(param.getId()).path(path.getTextual()), this, new StateCallback<Instance>() { 466 @Override 467 public void call(Exception e) { 468 if (e == null) { 469 bindAccess(path).set((ValueParam) param, value); 470 } 471 stateFunctor.call(e); 335 connection.send(new SetRequest().value(value.toString()).field(param.getId()).path(path.getTextual()), this, new StateCallback<Instance>(stateFunctor) { 336 @Override 337 protected void callImpl() { 338 InstanceUtils.bindAccess(path).set((ValueParam) param, value); 472 339 } 473 340 }); … … 504 371 505 372 @Override 506 public void call(Path path, ProcedureParam param, Object[] arguments, StateFunctor stateFunctor) { 507 throw new UnimplementedException(); 373 public void call(@Nonnull final Path path, @Nonnull final ProcedureParam procedure, @Nonnull Object[] arguments, final Future<Object> future) { 374 assert isActive(); 375 assert path.isResolved(); 376 377 //TODO validate arguments type using params 378 connection.send(new CallRequest().procedure(procedure.getId()).arguments(Arrays.asList(arguments)).path(path.getTextual()), this, new ResponseCallback<Instance>() { 379 @Override 380 public void process(Response response) { 381 assert isActive(); 382 try { 383 if (!response.getOk()) { 384 throw new FramsticksException().msg("failed to call procedure").arg("procedure", procedure).arg("comment", response.getComment()); 385 } 386 // InstanceUtils.processFetchedValues(path, response.getFiles()); 387 future.pass(null); 388 } catch (FramsticksException ex) { 389 future.handle(ex); 390 } 391 } 392 }); 393 394 } 395 396 @Override 397 public Path create(Path path) { 398 assert isActive(); 399 assert !path.isResolved(); 400 Path resolved = path.tryFindResolution(); 401 if (!resolved.isResolved()) { 402 log.debug("creating: " + path); 403 AccessInterface access = registry.prepareAccess(path.getTop().getParam()); 404 assert access != null; 405 Object child = access.createAccessee(); 406 assert child != null; 407 if (path.size() == 1) { 408 setRoot(new Node(getRoot().getParam(), child)); 409 } else { 410 bindAccess(this, path.getUnder()).set(path.getTop().getParam(), child); 411 } 412 resolved = path.appendResolution(child); 413 } 414 tryRegisterOnChangeEvents(resolved); 415 return resolved; 508 416 } 509 417
Note: See TracChangeset
for help on using the changeset viewer.