- Timestamp:
- 06/24/13 13:38:40 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
java/main/src/main/java/com/framsticks/communication/Subscription.java
r84 r85 1 1 package com.framsticks.communication; 2 2 3 import java.util.List; 4 3 5 import com.framsticks.communication.queries.RegistrationRequest; 4 import com.framsticks.util.dispatching.AtOnceDispatcher;5 6 import com.framsticks.util.dispatching.Dispatcher; 7 import com.framsticks.util.dispatching.Dispatching; 8 import com.framsticks.util.dispatching.RunAt; 6 9 import com.framsticks.util.StateFunctor; 7 10 import org.apache.log4j.Logger; … … 11 14 * @author Piotr Sniegowski 12 15 */ 13 public class Subscription {16 public class Subscription<C> { 14 17 15 18 private final static Logger log = Logger.getLogger(Subscription.class); 16 19 17 18 20 private final ClientConnection connection; 21 private final String path; 19 22 private final String registeredPath; 20 private Dispatcher dispatcher = AtOnceDispatcher.instance;23 private final Dispatcher<C> dispatcher; 21 24 22 25 private EventCallback eventCallback; 23 26 24 public Subscription(ClientConnection connection, String path, String registeredPath ) {27 public Subscription(ClientConnection connection, String path, String registeredPath, Dispatcher<C> dispatcher) { 25 28 this.connection = connection; 26 29 this.path = path; 27 30 this.registeredPath = registeredPath; 31 this.dispatcher = dispatcher; 28 32 } 29 33 … … 41 45 } 42 46 43 public void setDispatcher(Dispatcher dispatcher) { 44 this.dispatcher = dispatcher; 45 } 47 public void unsubscribe(final StateFunctor stateFunctor) { 48 //@todo remove that /cli/ prefix, when registeredPath will be a fully qualified path 49 connection.send(new RegistrationRequest().register(false).path(registeredPath), new ResponseCallback<Connection>() { 50 @Override 51 public void process(Response response) { 52 if (!response.getOk()) { 53 log.error("failed to unsunscribe " + this + ": " + response.getComment()); 54 stateFunctor.call(new Exception(response.getComment())); 55 return; 56 } 57 assert response.hasFiles(); 58 log.debug("unsunscribed " + this); 59 stateFunctor.call(null); 60 } 61 }); 62 } 46 63 47 public void unsubscribe(final StateFunctor stateFunctor) { 48 //@todo remove that /cli/ prefix, when registeredPath will be a fully qualified path 49 connection.send(new RegistrationRequest().register(false).setPath(registeredPath), new ResponseCallback() { 50 @Override 51 public void process(Response response) { 52 if (!response.getOk()) { 53 log.error("failed to unsunscribe " + this + ": " + response.getComment()); 54 stateFunctor.call(new Exception(response.getComment())); 55 return; 56 } 57 assert response.hasFiles(); 58 log.debug("unsunscribed " + this); 59 stateFunctor.call(null); 60 } 61 }); 62 } 64 public EventCallback getEventCallback() { 65 return eventCallback; 66 } 63 67 64 public EventCallback getEventCallback() {65 return eventCallback;66 68 public Dispatcher<C> getDispatcher() { 69 return dispatcher; 70 } 67 71 68 public Dispatcher getDispatcher() {69 return dispatcher;70 72 public void setEventCallback(EventCallback eventCallback) { 73 this.eventCallback = eventCallback; 74 } 71 75 72 public void setEventCallback(EventCallback eventCallback) { 73 this.eventCallback = eventCallback; 74 } 76 public void dispatchCall(final List<File> files) { 77 Dispatching.invokeLaterOrNow(dispatcher, new RunAt<C>() { 78 @Override 79 public void run() { 80 eventCallback.call(files); 81 } 82 }); 83 } 75 84 }
Note: See TracChangeset
for help on using the changeset viewer.