Changeset 102 for java/main/src/main/java/com/framsticks/communication
- Timestamp:
- 07/16/13 23:31:35 (11 years ago)
- Location:
- java/main/src/main/java/com/framsticks/communication
- Files:
-
- 1 added
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
java/main/src/main/java/com/framsticks/communication/ClientSideManagedConnection.java
r101 r102 132 132 133 133 public void send(ProtocolRequest request, ClientSideResponseFuture callback) { 134 //TODO RunAt135 134 sendImplementation(request, AtOnceDispatcher.getInstance(), callback); 136 135 } … … 466 465 467 466 final String finalEventPath = eventPath; 468 467 //TODO add arguments to the exception 469 468 send(new CallRequest().procedure("remove").path(eventPath), dispatcher, new ClientSideResponseFuture(future) { 470 469 -
java/main/src/main/java/com/framsticks/communication/Connection.java
r101 r102 18 18 import java.io.PrintWriter; 19 19 import java.net.Socket; 20 import java.net.SocketTimeoutException;21 20 import java.util.Collection; 22 21 import java.util.HashSet; … … 87 86 public Connection setAddress(String address) { 88 87 return setAddress(new Address(address)); 89 }90 91 public synchronized boolean isConnected() {92 return socket != null && socket.isConnected();93 88 } 94 89 … … 131 126 132 127 readChars = 0; 133 while (readChars == 0) { 134 try { 135 readChars = input.read(readBuffer); 136 } catch (SocketTimeoutException ignored) { 137 //timeout - continue 138 } 128 readChars = input.read(readBuffer); 129 if (readChars < 0) { 130 throw new SocketClosedException().msg("socket is closed"); 139 131 } 140 132 iterator = 0; … … 143 135 throw new InterruptedException(); 144 136 } catch (Exception e) { 145 throw new FramsticksException().msg("failed to read line").cause(e); 137 log.debug("failed to read line (closing): {}", e.getMessage()); 138 throw new SocketClosedException().msg("failed to read line").cause(e); 146 139 } 147 140 } … … 162 155 protected abstract void processNextInputBatch(); 163 156 157 164 158 protected final void processInputBatchesUntilClosed() { 165 while (isRunning() && isConnected()) {159 while (isRunning() && !socket.isClosed()) { 166 160 try { 167 161 processNextInputBatch(); 162 } catch (SocketClosedException e) { 163 log.log(isRunning() ? Level.ERROR : Level.DEBUG, "socket is closing: {}", e.getShortMessage(new StringBuilder())); 164 // log.log(isRunning() ? Level.ERROR : Level.DEBUG, "caught exception: ", e); 165 break; 168 166 } catch (FramsticksException e) { 167 log.debug("{} caught exception in receiver thread {}", this, e.getMessage()); 169 168 handle(e); 170 169 } catch (Exception e) { … … 173 172 } 174 173 } 174 log.debug("{} finished processing input", this); 175 175 } 176 176 … … 227 227 Dispatching.use(threads, this); 228 228 229 senderThread.dispatch(new RunAt<Connection>( ThrowExceptionHandler.getInstance()) {229 senderThread.dispatch(new RunAt<Connection>(this) { 230 230 @Override 231 231 protected void runAt() { … … 238 238 }); 239 239 240 receiverThread.dispatch(new RunAt<Connection>( ThrowExceptionHandler.getInstance()) {240 receiverThread.dispatch(new RunAt<Connection>(this) { 241 241 @Override 242 242 protected void runAt() { 243 243 receiverThreadRoutine(); 244 244 interruptJoinable(); 245 finishJoinable();245 // finishJoinable(); 246 246 } 247 247 }); … … 260 260 261 261 protected static void startClientConnection(Connection connection) { 262 while (connection.isRunning() && !connection.isConnected()) {262 while (connection.isRunning() && connection.socket == null) { 263 263 log.debug("connecting to {}", connection.address); 264 264 try { 265 265 connection.socket = new Socket(connection.getAddressObject().getHostName(), connection.getAddressObject().getPort()); 266 266 } catch (IOException e) { 267 log. info("{} failed to connect (retrying): ", connection, e);267 log.warn("{} failed to connect (retrying): {}", connection, e.getMessage()); 268 268 Dispatching.sleep(0.5); 269 269 } … … 272 272 log.debug("{} connected", connection); 273 273 try { 274 connection.socket.setSoTimeout(500);274 // connection.socket.setSoTimeout(500); 275 275 connection.setupStreams(); 276 276 } catch (Exception e) { … … 319 319 @Override 320 320 public void handle(FramsticksException exception) { 321 log.debug("{} handling {}", this, exception.getMessage()); 321 322 exceptionHandler.handle(exception); 322 323 } … … 358 359 } 359 360 361 public synchronized boolean isConnected() { 362 return socket != null && socket.isConnected(); 363 } 364 360 365 } -
java/main/src/main/java/com/framsticks/communication/ServerSideManagedConnection.java
r101 r102 32 32 33 33 34 protected void processNextInputBatch() {35 processNextRequest();36 }37 34 38 35 @Override … … 92 89 93 90 94 protected void processNext Request() {91 protected void processNextInputBatch() { 95 92 final Holder<Integer> id = new Holder<>(); 96 93 final String line = getLine(); … … 106 103 } 107 104 108 //TODO what to do here?109 105 handleRequest(request, new ServerSideResponseFuture() { 110 106 @Override
Note: See TracChangeset
for help on using the changeset viewer.