Changeset 101 for java/main/src/main/java/com/framsticks/communication/ClientSideManagedConnection.java
- Timestamp:
- 07/14/13 23:20:04 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
java/main/src/main/java/com/framsticks/communication/ClientSideManagedConnection.java
r100 r101 3 3 import com.framsticks.communication.queries.ApplicationRequest; 4 4 import com.framsticks.communication.queries.CallRequest; 5 import com.framsticks.communication.queries.NeedFile; 6 import com.framsticks.communication.queries.NeedFileAcceptor; 5 7 import com.framsticks.communication.queries.ProtocolRequest; 6 8 import com.framsticks.communication.queries.RegisterRequest; … … 43 45 private boolean isHandshakeDone = false; 44 46 47 protected NeedFileAcceptor needFileAcceptor; 48 49 /** 50 * @return the needFileAcceptor 51 */ 52 public NeedFileAcceptor getNeedFileAcceptor() { 53 return needFileAcceptor; 54 } 55 56 /** 57 * @param needFileAcceptor the needFileAcceptor to set 58 */ 59 public void setNeedFileAcceptor(NeedFileAcceptor needFileAcceptor) { 60 this.needFileAcceptor = needFileAcceptor; 61 } 45 62 46 63 /** … … 64 81 protocolVersion = -1; 65 82 } 66 67 68 83 69 84 protected List<String> readFileContent() { … … 116 131 } 117 132 118 private Map<Integer, SentQuery<?>> queryMap = new HashMap<>();119 120 private SentQuery<?> currentlySentQuery;121 122 133 public void send(ProtocolRequest request, ClientSideResponseFuture callback) { 123 134 //TODO RunAt … … 154 165 sentQuery.dispatcher = dispatcher; 155 166 167 156 168 senderThread.dispatch(new RunAt<Connection>(callback) { 157 169 @Override 158 170 protected void runAt() { 159 Integer id; 160 synchronized (ClientSideManagedConnection.this) { 161 162 while (!(requestIdEnabled || currentlySentQuery == null)) { 163 try { 164 ClientSideManagedConnection.this.wait(); 165 } catch (InterruptedException ignored) { 166 break; 167 } 168 } 169 if (requestIdEnabled) { 170 queryMap.put(nextQueryId, sentQuery); 171 id = nextQueryId++; 172 } else { 173 currentlySentQuery = sentQuery; 174 id = null; 175 } 176 } 171 Integer id = sentQueries.put(null, sentQuery); 172 177 173 String command = sentQuery.request.getCommand(); 178 174 StringBuilder message = new StringBuilder(); … … 188 184 flushOut(); 189 185 log.debug("sending query: {}", out); 190 191 186 } 192 187 }); 193 /*194 synchronized (this) {195 log.debug("queueing query: {}", query);196 queryQueue.offer(sentQuery);197 notifyAll();198 }199 */200 188 } 201 189 … … 204 192 return "client connection " + address; 205 193 } 206 207 194 208 195 private void sendQueryVersion(final int version, final Future<Void> future) { … … 229 216 } 230 217 231 private synchronized @Nonnull SentQuery<?> fetchQuery(@Nullable Integer id, boolean remove) { 232 try { 233 if (id == null) { 234 if (requestIdEnabled) { 235 throw new FramsticksException().msg("request_id is enabled and id is missing"); 236 } 237 SentQuery<?> result = currentlySentQuery; 238 if (remove) { 239 currentlySentQuery = null; 240 notifyAll(); 241 } 242 return result; 243 } 244 245 if (!queryMap.containsKey(id)) { 246 throw new FramsticksException().msg("id is unknown").arg("id", id); 247 } 248 249 SentQuery<?> result = queryMap.get(id); 250 if (remove) { 251 queryMap.remove(id); 252 } 253 return result; 254 255 } catch (FramsticksException e) { 256 throw new FramsticksException().msg("failed to match response to sent query").cause(e); 257 } 258 } 218 protected class IdCollection<T> { 219 220 221 protected final Map<Integer, T> map = new HashMap<>(); 222 protected T current; 223 224 public Integer put(Integer idProposition, T value) { 225 synchronized (ClientSideManagedConnection.this) { 226 while (!(requestIdEnabled || current == null)) { 227 try { 228 ClientSideManagedConnection.this.wait(); 229 } catch (InterruptedException ignored) { 230 break; 231 } 232 } 233 if (!requestIdEnabled) { 234 current = value; 235 return null; 236 } 237 if (idProposition == null) { 238 idProposition = nextQueryId++; 239 } 240 map.put(idProposition, value); 241 return idProposition; 242 } 243 } 244 245 public void clear(Integer id) { 246 if (requestIdEnabled) { 247 current = null; 248 } else { 249 map.remove(id); 250 } 251 } 252 253 public @Nonnull T fetch(@Nullable Integer id, boolean remove) { 254 synchronized (ClientSideManagedConnection.this) { 255 try { 256 if (id == null) { 257 if (requestIdEnabled) { 258 throw new FramsticksException().msg("request_id is enabled and id is missing"); 259 } 260 T result = current; 261 current = null; 262 ClientSideManagedConnection.this.notifyAll(); 263 return result; 264 } 265 if (!map.containsKey(id)) { 266 throw new FramsticksException().msg("id is unknown").arg("id", id); 267 } 268 269 T result = map.get(id); 270 if (remove) { 271 map.remove(id); 272 } 273 return result; 274 275 } catch (FramsticksException e) { 276 throw new FramsticksException().msg("failed to match response to sent query").cause(e); 277 } 278 } 279 } 280 } 281 282 protected IdCollection<SentQuery<?>> sentQueries = new IdCollection<>(); 283 protected IdCollection<NeedFile> needFiles = new IdCollection<>(); 259 284 260 285 private int nextQueryId = 0; … … 269 294 throw new FramsticksException().msg("expected file line").arg("got", fileLine); 270 295 } 271 String eventObjectPath = Request.takeGroup(rest, matcher, 1).toString();272 String eventCalleePath = Request.takeGroup(rest, matcher, 2).toString();296 String eventObjectPath = Strings.takeGroup(rest, matcher, 1).toString(); 297 String eventCalleePath = Strings.takeGroup(rest, matcher, 2).toString(); 273 298 final File file = new File("", new ListSource(readFileContent())); 274 299 log.debug("firing event {}", eventObjectPath); … … 277 302 listener = registeredListeners.get(eventObjectPath); 278 303 } 279 if (listener 304 if (listener == null) { 280 305 throw new FramsticksException().msg("failed to find registered event").arg("event path", eventObjectPath).arg("object", eventCalleePath); 281 306 } … … 283 308 } 284 309 310 protected void processNeedFile(Pair<Integer, CharSequence> rest) { 311 final Integer id = rest.first; 312 String suggestedName = null; 313 String description = null; 314 Pair<CharSequence, CharSequence> s = Request.takeString(rest.second); 315 if (s != null) { 316 suggestedName = s.first.toString(); 317 Pair<CharSequence, CharSequence> d = Request.takeString(s.second); 318 if (d != null) { 319 description = d.first.toString(); 320 } 321 } 322 323 final Future<File> future = new Future<File>() { 324 325 protected void send(final File result) { 326 log.info("sending file: " + result); 327 needFiles.clear(id); 328 sendFile(null, result, id, ClientSideManagedConnection.this); 329 330 } 331 332 @Override 333 protected void result(File result) { 334 send(result); 335 } 336 337 @Override 338 public void handle(FramsticksException exception) { 339 send(new File("", ListSource.createFrom("# invalid", "# " + exception.getMessage()))); 340 } 341 }; 342 343 NeedFile needFile = new NeedFile(suggestedName, description, future); 344 345 if (needFileAcceptor.acceptNeed(needFile)) { 346 return; 347 } 348 349 future.handle(new FramsticksException().msg("acceptor did not accepted need")); 350 } 351 285 352 protected void processFile(Pair<Integer, CharSequence> rest) { 286 final SentQuery<?> sentQuery = fetchQuery(rest.first, false);353 final SentQuery<?> sentQuery = sentQueries.fetch(rest.first, false); 287 354 288 355 String currentFilePath = rest.second.toString(); … … 292 359 293 360 sentQuery.files.add(new File(currentFilePath, new ListSource(readFileContent()))); 294 295 361 } 296 362 … … 318 384 if (keyword.equals("ok") || keyword.equals("error")) { 319 385 320 final SentQuery<?> sentQuery = fetchQuery(rest.first, true);386 final SentQuery<?> sentQuery = sentQueries.fetch(rest.first, true); 321 387 322 388 log.debug("parsing response for request {}", sentQuery); 323 389 324 390 sentQuery.dispatchResponseProcess(new Response(command.first.equals("ok"), rest.second.toString(), sentQuery.getFiles())); 391 return; 392 } 393 if (keyword.equals("needfile")) { 394 processNeedFile(rest); 325 395 return; 326 396 } … … 336 406 @Override 337 407 public void handle(FramsticksException exception) { 338 interrupt ();408 interruptJoinable(); 339 409 // finish(); 340 410 }
Note: See TracChangeset
for help on using the changeset viewer.