- Timestamp:
- 06/22/13 21:51:33 (11 years ago)
- Location:
- java/main
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
java/main
-
Property
svn:ignore
set to
target
-
Property
svn:ignore
set to
-
java/main/src/main/java/com/framsticks/remote/RemoteInstance.java
r77 r84 7 7 import com.framsticks.communication.util.LoggingSubscriptionCallback; 8 8 import com.framsticks.core.ListChange; 9 import com.framsticks.core.Parameters;10 9 import com.framsticks.core.Path; 11 10 import com.framsticks.params.*; 12 import com.framsticks.params.types.CompositeParam;13 11 import com.framsticks.params.types.EventParam; 14 12 import com.framsticks.parsers.MultiParamLoader; 15 13 import com.framsticks.core.Instance; 16 14 import com.framsticks.util.*; 15 import com.framsticks.util.dispatching.Dispatching; 16 import com.framsticks.util.dispatching.Future; 17 import com.framsticks.util.lang.Casting; 18 import com.framsticks.util.lang.Pair; 19 20 import org.apache.commons.configuration.Configuration; 17 21 import org.apache.log4j.Logger; 18 22 … … 24 28 public class RemoteInstance extends Instance { 25 29 26 private final static Logger LOGGER= Logger.getLogger(RemoteInstance.class.getName());27 28 29 protected finalClientConnection connection;30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 30 private final static Logger log = Logger.getLogger(RemoteInstance.class.getName()); 31 32 protected Path simulator; 33 protected ClientConnection connection; 34 35 protected final Set<Pair<Path, Subscription>> subscriptions = new HashSet<Pair<Path, Subscription>>(); 36 37 public Pair<Path, Subscription> getSubscription(Path path) { 38 for (Pair<Path, Subscription> s : subscriptions) { 39 if (s.first.matches(path)) { 40 return s; 41 } 42 } 43 return null; 44 } 45 46 @Override 47 public void run() { 48 assert isActive(); 49 super.run(); 50 connection.connect(new StateFunctor() { 47 51 @Override 48 52 public void call(Exception e) { 49 50 51 52 53 if (e != null) { 54 fireRun(e); 55 return; 56 } 53 57 connection.negotiateProtocolVersion(new StateFunctor() { 54 58 @Override 55 59 public void call(Exception e) { 56 60 if (e != null) { 57 LOGGER.fatal("unsupported protocol version!\n minimal version is: "61 log.fatal("unsupported protocol version!\n minimal version is: " 58 62 + "nmanager protocol is: " 59 63 + connection.getProtocolVersion()); 60 64 connection.close(); 61 65 fireRun(e); 62 66 return; 63 67 } 64 68 65 66 67 68 69 70 71 72 LOGGER.fatal("failed to resolve simulator node");73 74 75 76 77 78 79 LOGGER.info("resolved simulator node");80 81 82 83 connection.subscribe(simulator.getTextual() + "/" + param.getId(), new LoggingSubscriptionCallback(LOGGER, "server running state change", new EventCallback() {84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 69 invokeLater(new Runnable() { 70 @Override 71 public void run() { 72 resolveAndFetch("/simulator", new Future<Path>() { 73 @Override 74 public void result(Path path, Exception e) { 75 if (e != null) { 76 log.fatal("failed to resolve simulator node"); 77 fireRun(e); 78 return; 79 } 80 assert isActive(); 81 simulator = path; 82 fireRun(null); 83 log.info("resolved simulator node"); 84 85 EventParam param = getParam(simulator, "running_changed", EventParam.class); 86 assert param != null; 87 connection.subscribe(simulator.getTextual() + "/" + param.getId(), new LoggingSubscriptionCallback(log, "server running state change", new EventCallback() { 88 @Override 89 public void call(List<File> files) { 90 invokeLater(new Runnable() { 91 @Override 92 public void run() { 93 updateSimulationRunning(); 94 } 95 }); 96 } 97 })); 98 new PeriodicTask(RemoteInstance.this, 1000) { 99 @Override 100 public void run() { 101 updateSimulationRunning(); 102 again(); 103 } 104 }; 105 } 106 }); 107 } 108 }); 105 109 } 106 110 }); … … 109 113 } 110 114 111 public RemoteInstance(Parameters parameters) { 112 super(parameters); 113 connection = new ClientConnection(config.getString("address")); 114 } 115 public RemoteInstance() { 116 } 117 118 119 @Override 120 public void configure(Configuration config) { 121 super.configure(config); 122 connection = new ClientConnection(config.getString("address")); 123 } 124 125 126 public void setConnection(ClientConnection connection) { 127 this.connection = connection; 128 } 115 129 116 130 @Override 117 131 public String toString() { 118 132 assert Dispatching.isThreadSafe(); 119 133 return getConnection().getAddress(); 120 134 } 121 135 122 136 public void setRunning(final boolean running) { 123 124 //simulator.call(simulator.getParam(running ? "start" : "stop", ProcedureParam.class), new LoggingStateCallback(LOGGER, (running ? "starting" : "stopping") + " server"));137 assert isActive(); 138 //simulator.call(simulator.getParam(running ? "start" : "stop", ProcedureParam.class), new LoggingStateCallback(log, (running ? "starting" : "stopping") + " server")); 125 139 } 126 140 … … 128 142 129 143 protected void updateSimulationRunning() { 130 131 132 133 134 135 136 LOGGER.fatal("failed to query simulator running status: " + e);137 138 139 140 141 142 143 144 LOGGER.trace("server running: " + value);145 146 147 148 149 150 151 144 assert isActive(); 145 /* 146 fetchValue(simulator, getParam(simulator, "running", Param.class), new StateFunctor() { 147 @Override 148 public void call(Exception e) { 149 if (e != null) { 150 log.fatal("failed to query simulator running status: " + e); 151 return; 152 } 153 154 invokeLater(new Runnable() { 155 @Override 156 public void run() { 157 boolean value = bindAccess(simulator).get("running", Boolean.class); 158 log.trace("server running: " + value); 159 simulationRunningListeners.call(value); 160 } 161 }); 162 163 } 164 }); 165 */ 152 166 } 153 167 154 168 public void addRunningStateListener(UnaryFunctor<Boolean, Boolean> listener) { 155 156 169 assert isActive(); 170 simulationRunningListeners.add(listener); 157 171 } 158 172 159 173 public void disconnect() { 160 161 174 assert isActive(); 175 if (connection.isConnected()) { 162 176 connection.close(); 163 177 } 164 178 } 165 179 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 LOGGER.debug("issuing info request for " + name);215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 LOGGER.fatal("could not read class info");235 236 237 238 239 240 241 LOGGER.error(mismatch);242 243 244 245 246 247 248 249 250 251 252 253 254 255 LOGGER.trace("fetching values for " + path);256 257 258 259 260 261 262 263 264 265 266 267 268 LOGGER.error("an exception occurred while loading: " + ex);269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 Path p = (path.getTop().getParam().getContainedTypeName() != null ? path : new Path(path.getInstance(), path.getTextual()));306 future.result(create(p), null);307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 EventParam changedEvent = Casting.tryCast(EventParam.class, underFramsClass.getParamEntry(path.getTop().getParam().getId() + "_changed"));324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 LOGGER.error("failed to subscribe for change event for " + path);340 341 342 LOGGER.debug("subscribed for change event for " + path);343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 LOGGER.debug("reacting to change " + listChange + " in " + path);180 public final ClientConnection getConnection() { 181 return connection; 182 } 183 184 @Override 185 public void fetchValue(final Path path, final Param param, final StateFunctor stateFunctor) { 186 assert isActive(); 187 assert param != null; 188 assert path.isResolved(); 189 connection.send(new GetRequest().setField(param.getId()).setPath(path.getTextual()), this, new ResponseCallback() { 190 @Override 191 public void process(Response response) { 192 assert isActive(); 193 if (!response.getOk()) { 194 stateFunctor.call(new Exception(response.getComment())); 195 return; 196 } 197 try { 198 processFetchedValues(path, response.getFiles()); 199 stateFunctor.call(null); 200 } catch (Exception ex) { 201 stateFunctor.call(ex); 202 } 203 } 204 }); 205 } 206 207 protected final Map<String, Set<Future<FramsClass>>> infoRequests = new HashMap<String, Set<Future<FramsClass>>>(); 208 209 protected void finishInfoRequest(String id, FramsClass result, Exception e) { 210 assert isActive(); 211 Set<Future<FramsClass>> futures = infoRequests.get(id); 212 infoRequests.remove(id); 213 for (Future<FramsClass> f : futures) { 214 f.result(result, e); 215 } 216 } 217 218 @Override 219 protected void fetchInfo(final Path path, final Future<FramsClass> future) { 220 221 final String name = path.getTop().getParam().getContainedTypeName(); 222 223 if (infoRequests.containsKey(name)) { 224 infoRequests.get(name).add(future); 225 return; 226 } 227 228 log.debug("issuing info request for " + name); 229 Set<Future<FramsClass>> futures = new HashSet<Future<FramsClass>>(); 230 futures.add(future); 231 infoRequests.put(name, futures); 232 233 //TODO: if the info is in the cache, then don't communicate 234 connection.send(new InfoRequest().setPath(path.getTextual()), this, new ResponseCallback() { 235 @Override 236 public void process(Response response) { 237 assert isActive(); 238 if (!response.getOk()) { 239 finishInfoRequest(name, null, new Exception(response.getComment())); 240 return; 241 } 242 243 assert response.getFiles().size() == 1; 244 assert path.isTheSame(response.getFiles().get(0).getPath()); 245 FramsClass framsClass = processFetchedInfo(response.getFiles().get(0)); 246 247 if (framsClass == null) { 248 log.fatal("could not read class info"); 249 finishInfoRequest(name, null, new Exception("could not read class info")); 250 return; 251 } 252 CompositeParam thisParam = path.getTop().getParam(); 253 if (!thisParam.isMatchingContainedName(framsClass.getId())) { 254 String mismatch = "class name mismatch: param=" + thisParam.getContainedTypeName() + " differs from fetched=" + framsClass.getId(); 255 log.error(mismatch); 256 finishInfoRequest(name, null, new Exception(mismatch)); 257 return; 258 } 259 finishInfoRequest(name, framsClass, null); 260 } 261 }); 262 } 263 264 @Override 265 public void fetchValues(final Path path, final StateFunctor stateFunctor) { 266 assert isActive(); 267 assert path.getTop().getObject() != null; 268 269 log.trace("fetching values for " + path); 270 connection.send(new GetRequest().setPath(path.getTextual()), this, new ResponseCallback() { 271 @Override 272 public void process(Response response) { 273 assert isActive(); 274 if (!response.getOk()) { 275 stateFunctor.call(new Exception(response.getComment())); 276 return; 277 } 278 try { 279 processFetchedValues(path, response.getFiles()); 280 stateFunctor.call(null); 281 } catch (Exception ex) { 282 log.error("an exception occurred while loading: " + ex); 283 ex.printStackTrace(); 284 stateFunctor.call(ex); 285 } 286 } 287 }); 288 } 289 290 @Override 291 public void resolve(final Path path, final Future<Path> future) { 292 assert isActive(); 293 if (path.getTop().getObject() != null) { 294 if (getInfoFromCache(path) != null) { 295 future.result(path, null); 296 return; 297 } 298 findInfo(path, new Future<FramsClass>() { 299 @Override 300 public void result(FramsClass result, Exception e) { 301 if (e != null) { 302 future.result(null, e); 303 return; 304 } 305 future.result(path, null); 306 } 307 }); 308 return; 309 } 310 findInfo(path, new Future<FramsClass>() { 311 @Override 312 public void result(FramsClass result, Exception e) { 313 assert isActive(); 314 if (e != null) { 315 future.result(null, e); 316 return; 317 } 318 assert path.getTop().getParam().isMatchingContainedName(result.getId()); 319 Path p = (path.getTop().getParam().getContainedTypeName() != null ? path : path.tryFindResolution()); 320 future.result(createIfNeeded(p), null); 321 } 322 }); 323 } 324 325 @Override 326 protected void tryRegisterOnChangeEvents(final Path path) { 327 assert isActive(); 328 AccessInterface access = bindAccess(path); 329 if (!(access instanceof ListAccess)) { 330 return; 331 } 332 333 334 assert path.size() >= 2; 335 FramsClass underFramsClass = getInfoFromCache(path.getUnder().getParam().getContainedTypeName()); 336 337 EventParam changedEvent = underFramsClass.getParamEntry(path.getTop().getParam().getId() + "_changed", EventParam.class); 338 if (changedEvent == null) { 339 return; 340 } 341 342 if (getSubscription(path) != null) { 343 return; 344 } 345 346 final Pair<Path, Subscription> temporary = new Pair<Path, Subscription>(path, null); 347 subscriptions.add(temporary); 348 349 connection.subscribe(path.getTextual() + "_changed", new SubscriptionCallback() { 350 @Override 351 public EventCallback subscribed(final Subscription subscription) { 352 if (subscription == null) { 353 log.error("failed to subscribe for change event for " + path); 354 return null; 355 } 356 log.debug("subscribed for change event for " + path); 357 subscription.setDispatcher(RemoteInstance.this); 358 RemoteInstance.this.invokeLater(new Runnable() { 359 @Override 360 public void run() { 361 subscriptions.remove(temporary); 362 subscriptions.add(new Pair<Path, Subscription>(path, subscription)); 363 } 364 }); 365 return new EventCallback() { 366 @Override 367 public void call(List<File> files) { 368 assert isActive(); 369 assert files.size() == 1; 370 MultiParamLoader loader = new MultiParamLoader(); 371 loader.setNewSource(files.get(0).getContent()); 372 loader.addBreakCondition(MultiParamLoader.Status.AfterObject); 373 ReflectionAccess access = new ReflectionAccess(ListChange.class, ListChange.getFramsClass()); 374 loader.addAccessInterface(access); 375 MultiParamLoader.Status status; 376 try { 377 while ((status = loader.go()) != MultiParamLoader.Status.Finished) { 378 if (status == MultiParamLoader.Status.AfterObject) { 379 AccessInterface accessInterface = loader.getLastAccessInterface(); 380 reactToChange(path, (ListChange) accessInterface.getSelected()); 381 } 382 } 383 } catch (Exception e) { 384 e.printStackTrace(); 385 } 386 } 387 }; 388 } 389 }); 390 } 391 392 393 protected void reactToChange(final Path path, final ListChange listChange) { 394 assert isActive(); 395 log.debug("reacting to change " + listChange + " in " + path); 382 396 AccessInterface access = bindAccess(path); 383 397 assert access != null; … … 389 403 public void result(Path result, Exception e) { 390 404 if (e != null) { 391 LOGGER.error("failed to modify " + p + ": " + e);405 log.error("failed to modify " + p + ": " + e); 392 406 return; 393 407 } … … 399 413 400 414 401 CompositeParam childParam = Casting.tryCast(CompositeParam.class, access.getParam(listChange.getBestIdentifier())); 402 assert childParam != null; 403 switch (listChange.getAction()) { 404 case Add: { 405 final String p = path.getTextual() + "/" + childParam.getId(); 406 resolveAndFetch(p, new Future<Path>() { 407 @Override 408 public void result(Path result, Exception e) { 409 if (e != null) { 410 LOGGER.error("failed to add " + p + ": " + e); 411 return; 412 } 413 LOGGER.debug("added: " + result); 414 fireListChange(path, listChange); 415 } 416 }); 417 break; 418 } 419 case Remove: { 420 access.set(childParam, null); 421 fireListChange(path, listChange); 422 break; 423 } 424 case Modify: { 425 final String p = path.getTextual() + "/" + childParam.getId(); 426 resolveAndFetch(p, new Future<Path>() { 427 @Override 428 public void result(Path result, Exception e) { 429 if (e != null) { 430 LOGGER.error("failed to modify " + p + ": " + e); 431 return; 432 } 433 fireListChange(path, listChange); 434 } 435 }); 436 break; 437 } 438 } 439 } 440 415 CompositeParam childParam = Casting.tryCast(CompositeParam.class, access.getParam(listChange.getBestIdentifier())); 416 assert childParam != null; 417 switch (listChange.getAction()) { 418 case Add: { 419 final String p = path.getTextual() + "/" + childParam.getId(); 420 resolveAndFetch(p, new Future<Path>() { 421 @Override 422 public void result(Path result, Exception e) { 423 if (e != null) { 424 log.error("failed to add " + p + ": " + e); 425 return; 426 } 427 log.debug("added: " + result); 428 fireListChange(path, listChange); 429 } 430 }); 431 break; 432 } 433 case Remove: { 434 access.set(childParam, null); 435 fireListChange(path, listChange); 436 break; 437 } 438 case Modify: { 439 final String p = path.getTextual() + "/" + childParam.getId(); 440 resolveAndFetch(p, new Future<Path>() { 441 @Override 442 public void result(Path result, Exception e) { 443 if (e != null) { 444 log.error("failed to modify " + p + ": " + e); 445 return; 446 } 447 fireListChange(path, listChange); 448 } 449 }); 450 break; 451 } 452 } 453 } 454 455 //TODO ValueParam 441 456 @Override 442 457 public void storeValue(final Path path, final Param param, final Object value, final StateFunctor stateFunctor) { 443 458 assert isActive(); 444 459 445 LOGGER.trace("storing value " + param + " for " + path);460 log.trace("storing value " + param + " for " + path); 446 461 connection.send(new SetRequest().value(value.toString()).setField(param.getId()).setPath(path.getTextual()), this, new StateCallback() { 447 462 @Override 448 463 public void call(Exception e) { 449 464 if (e == null) { 450 bindAccess(path).set( param, value);465 bindAccess(path).set((ValueParam) param, value); 451 466 } 452 467 stateFunctor.call(e);
Note: See TracChangeset
for help on using the changeset viewer.