- Timestamp:
- 06/24/13 13:38:40 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
java/main/src/main/java/com/framsticks/communication/ClientConnection.java
r84 r85 21 21 import java.util.regex.Matcher; 22 22 import java.util.regex.Pattern; 23 import com.framsticks.util.dispatching.RunAt; 23 24 24 25 /** … … 29 30 private final static Logger log = Logger.getLogger(ClientConnection.class); 30 31 31 protected final Map<String, Subscription > subscriptions = new HashMap<String, Subscription>();32 protected final Map<String, Subscription<?>> subscriptions = new HashMap<>(); 32 33 33 34 public String getAddress() { … … 93 94 94 95 private static class EventFire extends InboundMessage { 95 public final Subscription subscription;96 97 private EventFire(Subscription subscription) {96 public final Subscription<?> subscription; 97 98 private EventFire(Subscription<?> subscription) { 98 99 this.subscription = subscription; 99 100 } … … 107 108 public void eof() { 108 109 finishCurrentFile(); 109 Dispatching.invokeLaterOrNow(subscription.getDispatcher(), new Runnable() { 110 @Override 111 public void run() { 112 subscription.getEventCallback().call(getFiles()); 113 } 114 }); 115 } 116 } 117 118 private static class SentQuery extends InboundMessage { 110 111 subscription.dispatchCall(getFiles()); 112 } 113 } 114 115 private static class SentQuery<C> extends InboundMessage { 119 116 Request request; 120 ResponseCallback callback;121 Dispatcher dispatcher;117 ResponseCallback<? extends C> callback; 118 Dispatcher<C> dispatcher; 122 119 123 120 public void startFile(String path) { … … 139 136 return request.toString(); 140 137 } 141 } 142 private Map<Integer, SentQuery> queryMap = new HashMap<Integer, SentQuery>(); 138 139 public void dispatchResponseProcess(final Response response) { 140 Dispatching.invokeLaterOrNow(dispatcher, new RunAt<C>() { 141 @Override 142 public void run() { 143 callback.process(response); 144 } 145 }); 146 } 147 } 148 private Map<Integer, SentQuery<?>> queryMap = new HashMap<>(); 143 149 144 150 … … 163 169 } 164 170 165 private SentQuery currentlySentQuery; 166 167 public void send(Request request, ResponseCallback callback) { 168 send(request, AtOnceDispatcher.instance, callback); 169 } 170 171 public void send(Request request, Dispatcher dispatcher, ResponseCallback callback) { 171 private SentQuery<?> currentlySentQuery; 172 173 174 public <C extends Connection> void send(Request request, ResponseCallback<C> callback) { 175 //TODO RunAt 176 send(request, AtOnceDispatcher.getInstance(), callback); 177 } 178 179 public <C> void send(Request request, Dispatcher<C> dispatcher, ResponseCallback<? extends C> callback) { 172 180 173 181 if (!isConnected()) { … … 175 183 return; 176 184 } 177 final SentQuery sentQuery = new SentQuery();185 final SentQuery<C> sentQuery = new SentQuery<C>(); 178 186 sentQuery.request = request; 179 187 sentQuery.callback = callback; 180 188 sentQuery.dispatcher = dispatcher; 181 189 182 senderThread.invokeLater(new Run nable(){190 senderThread.invokeLater(new RunAt<Connection>(){ 183 191 @Override 184 192 public void run() { 193 Integer id; 185 194 synchronized (ClientConnection.this) { 186 195 … … 192 201 } 193 202 } 194 } 195 Integer id = stashQuery(sentQuery); 203 if (requestIdEnabled) { 204 queryMap.put(nextQueryId, sentQuery); 205 id = nextQueryId++; 206 } else { 207 currentlySentQuery = sentQuery; 208 id = null; 209 } 210 } 196 211 String command = sentQuery.request.getCommand(); 197 212 StringBuilder message = new StringBuilder(); … … 223 238 } 224 239 225 public void subscribe(final String path, final SubscriptionCallbackcallback) {226 send(new RegistrationRequest(). setPath(path), new ResponseCallback() {240 public <C> void subscribe(final String path, final Dispatcher<C> dispatcher, final SubscriptionCallback<? extends C> callback) { 241 send(new RegistrationRequest().path(path), new ResponseCallback<Connection>() { 227 242 @Override 228 243 public void process(Response response) { … … 233 248 } 234 249 assert response.getFiles().isEmpty(); 235 Subscription subscription = new Subscription(ClientConnection.this, path, response.getComment());250 Subscription<C> subscription = new Subscription<C>(ClientConnection.this, path, response.getComment(), dispatcher); 236 251 log.debug("registered on event: " + subscription); 237 252 synchronized (subscriptions) { … … 241 256 if (subscription.getEventCallback() == null) { 242 257 log.info("subscription for " + path + " aborted"); 243 subscription.unsubscribe(new LoggingStateCallback (log, "abort subscription"));258 subscription.unsubscribe(new LoggingStateCallback<C>(log, "abort subscription")); 244 259 } 245 260 } … … 253 268 254 269 public void sendQueryVersion(final int version, final StateFunctor stateFunctor) { 255 send(new VersionRequest().version(version), new StateCallback () {270 send(new VersionRequest().version(version), new StateCallback<Connection>() { 256 271 @Override 257 272 public void call(Exception e) { … … 266 281 return; 267 282 } 268 send(new UseRequest().feature("request_id"), new StateCallback () {283 send(new UseRequest().feature("request_id"), new StateCallback<Connection>() { 269 284 @Override 270 285 public void call(Exception e) { … … 289 304 290 305 291 private synchronized SentQuery fetchQuery(Integer id, boolean remove) {306 private synchronized SentQuery<?> fetchQuery(Integer id, boolean remove) { 292 307 if (id == null) { 293 308 if (requestIdEnabled) { 294 309 return null; 295 310 } 296 SentQuery result = currentlySentQuery;311 SentQuery<?> result = currentlySentQuery; 297 312 if (remove) { 298 313 currentlySentQuery = null; … … 302 317 } 303 318 if (queryMap.containsKey(id)) { 304 SentQuery result = queryMap.get(id);319 SentQuery<?> result = queryMap.get(id); 305 320 if (remove) { 306 321 queryMap.remove(id); … … 312 327 313 328 private int nextQueryId = 0; 314 315 private Integer stashQuery(SentQuery sentQuery) {316 if (!requestIdEnabled) {317 currentlySentQuery = sentQuery;318 return null;319 }320 queryMap.put(nextQueryId, sentQuery);321 return nextQueryId++;322 }323 329 324 330 protected void processMessage(InboundMessage inboundMessage) throws Exception { … … 342 348 return; 343 349 } 344 Subscription subscription = subscriptions.get(matcher.group(1));350 Subscription<?> subscription = subscriptions.get(matcher.group(1)); 345 351 if (subscription == null) { 346 352 log.error("non subscribed event: " + matcher.group(1)); … … 362 368 363 369 if (command.first.equals("file")) { 364 SentQuery sentQuery = fetchQuery(rest.first, false);370 SentQuery<?> sentQuery = fetchQuery(rest.first, false); 365 371 sentQuery.startFile(rest.second); 366 372 processMessage(sentQuery); … … 368 374 } 369 375 370 SentQuery sentQuery = fetchQuery(rest.first, true);376 SentQuery<?> sentQuery = fetchQuery(rest.first, true); 371 377 if (sentQuery == null) { 372 378 return; … … 374 380 log.debug("parsing response for request " + sentQuery); 375 381 376 final Response response = new Response(command.first.equals("ok"), rest.second, sentQuery.getFiles()); 377 final ResponseCallback callback = sentQuery.callback; 378 379 Dispatching.invokeLaterOrNow(sentQuery.dispatcher, new Runnable() { 380 @Override 381 public void run() { 382 callback.process(response); 383 } 384 }); 382 sentQuery.dispatchResponseProcess(new Response(command.first.equals("ok"), rest.second, sentQuery.getFiles())); 385 383 } 386 384
Note: See TracChangeset
for help on using the changeset viewer.