Changeset 99 for java/main/src/main/java/com/framsticks/communication/ClientSideManagedConnection.java
- Timestamp:
- 07/10/13 22:41:02 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
java/main/src/main/java/com/framsticks/communication/ClientSideManagedConnection.java
r98 r99 2 2 3 3 import com.framsticks.communication.queries.ApplicationRequest; 4 import com.framsticks.communication.queries.CallRequest; 4 5 import com.framsticks.communication.queries.ProtocolRequest; 5 import com.framsticks.communication.queries.Regist rationRequest;6 import com.framsticks.communication.queries.RegisterRequest; 6 7 import com.framsticks.communication.queries.UseRequest; 7 8 import com.framsticks.communication.queries.VersionRequest; 8 import com.framsticks.co mmunication.util.LoggingStateCallback;9 import com.framsticks.core.Path; 9 10 import com.framsticks.params.ListSource; 10 11 import com.framsticks.util.*; … … 18 19 import com.framsticks.util.lang.Pair; 19 20 import com.framsticks.util.lang.Strings; 21 import com.framsticks.params.EventListener; 20 22 21 23 import org.apache.log4j.Logger; … … 32 34 private final static Logger log = Logger.getLogger(ClientSideManagedConnection.class); 33 35 34 protected final Map<String, Subscription<?>> subscriptions = new HashMap<>();35 36 36 private final List<Runnable> applicationRequestsBuffer = new LinkedList<>(); 37 37 private boolean isHandshakeDone = false; 38 38 39 39 40 /** … … 93 94 } 94 95 95 private static class EventFire extends InboundMessage { 96 public final Subscription<?> subscription; 97 98 private EventFire(Subscription<?> subscription) { 99 this.subscription = subscription; 100 } 101 102 public void startFile(String path) { 103 assert path == null; 104 initCurrentFile(null); 105 } 106 107 @Override 108 public void eof() { 109 finishCurrentFile(); 110 111 subscription.dispatchCall(getFiles()); 112 } 96 protected List<String> readFileContent() { 97 List<String> content = new LinkedList<String>(); 98 String line; 99 while (!(line = getLine()).startsWith("eof")) { 100 content.add(line); 101 } 102 return content; 113 103 } 114 104 … … 239 229 } 240 230 241 public <C> void subscribe(final String path, final Dispatcher<C> dispatcher, final SubscriptionCallback<? extends C> callback) {242 send(new RegistrationRequest().path(path), AtOnceDispatcher.getInstance(), new ClientSideResponseFuture(callback) {243 @Override244 protected void processOk(Response response) {245 assert response.getFiles().isEmpty();246 Subscription<C> subscription = new Subscription<C>(ClientSideManagedConnection.this, path, response.getComment(), dispatcher);247 log.debug("registered on event: " + subscription);248 synchronized (subscriptions) {249 subscriptions.put(subscription.getRegisteredPath(), subscription);250 }251 subscription.setEventCallback(callback.subscribed(subscription));252 if (subscription.getEventCallback() == null) {253 log.info("subscription for " + path + " aborted");254 subscription.unsubscribe(new LoggingStateCallback(log, "abort subscription"));255 }256 }257 });258 }259 231 260 232 private void sendQueryVersion(final int version, final Future<Void> future) { … … 322 294 Matcher matcher = Request.EVENT_PATTERN.matcher(rest); 323 295 if (!matcher.matches()) { 324 log.error("invalid event line: " + rest); 325 return; 326 } 327 Subscription<?> subscription = subscriptions.get(matcher.group(1)); 328 if (subscription == null) { 329 log.error("non subscribed event: " + matcher.group(1)); 330 return; 331 } 332 EventFire event = new EventFire(subscription); 333 event.startFile(null); 334 processMessage(event); 296 throw new FramsticksException().msg("invalid event line").arg("rest", rest); 297 } 298 String fileLine = getLine(); 299 if (!fileLine.equals("file")) { 300 throw new FramsticksException().msg("expected file line").arg("got", fileLine); 301 } 302 String eventObjectPath = Request.takeGroup(rest, matcher, 1).toString(); 303 String eventCalleePath = Request.takeGroup(rest, matcher, 2).toString(); 304 final File file = new File("", new ListSource(readFileContent())); 305 log.debug("firing event " + eventObjectPath); 306 EventListener<File> listener; 307 synchronized (registeredListeners) { 308 listener = registeredListeners.get(eventObjectPath); 309 } 310 if (listener == null) { 311 throw new FramsticksException().msg("failed to find registered event").arg("event path", eventObjectPath).arg("object", eventCalleePath); 312 } 313 listener.action(file); 335 314 } 336 315 … … 397 376 } 398 377 378 protected final Map<String, EventListener<File>> registeredListeners = new HashMap<>(); 379 380 public <C> void addListener(String path, final EventListener<File> listener, final Dispatcher<C> dispatcher, final Future<Void> future) { 381 send(new RegisterRequest().path(path), dispatcher, new ClientSideResponseFuture(future) { 382 @Override 383 protected void processOk(Response response) { 384 synchronized (registeredListeners) { 385 registeredListeners.put(Path.validateString(response.getComment()), listener); 386 } 387 future.pass(null); 388 } 389 }); 390 } 391 392 public <C> void removeListener(EventListener<File> listener, final Dispatcher<C> dispatcher, final Future<Void> future) { 393 String eventPath = null; 394 synchronized (registeredListeners) { 395 for (Map.Entry<String, EventListener<File>> e : registeredListeners.entrySet()) { 396 if (e.getValue() == listener) { 397 eventPath = e.getKey(); 398 break; 399 } 400 } 401 } 402 if (eventPath == null) { 403 future.handle(new FramsticksException().msg("listener is not registered").arg("listener", listener)); 404 return; 405 } 406 407 final String finalEventPath = eventPath; 408 //TODO add arguments to the exception 409 send(new CallRequest().procedure("remove").path(eventPath), dispatcher, new ClientSideResponseFuture(future) { 410 411 @Override 412 protected void processOk(Response response) { 413 synchronized (registeredListeners) { 414 registeredListeners.remove(finalEventPath); 415 } 416 future.pass(null); 417 } 418 }); 419 } 399 420 }
Note: See TracChangeset
for help on using the changeset viewer.