Changeset 88 for java/main/src/main/java/com/framsticks/communication
- Timestamp:
- 06/30/13 12:48:20 (11 years ago)
- Location:
- java/main/src/main/java/com/framsticks/communication
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
java/main/src/main/java/com/framsticks/communication/ClientConnection.java
r85 r88 32 32 protected final Map<String, Subscription<?>> subscriptions = new HashMap<>(); 33 33 34 public String getAddress() { 35 return address; 36 } 37 38 public void connect(StateFunctor connectedFunctor) { 34 /** 35 * @param connectedFunctor the connectedFunctor to set 36 */ 37 public void setConnectedFunctor(StateFunctor connectedFunctor) { 38 this.connectedFunctor = connectedFunctor; 39 } 40 41 42 protected StateFunctor connectedFunctor; 43 44 @Override 45 protected void joinableStart() { 39 46 try { 40 47 log.info("connecting to " + address); … … 46 53 log.info("connected to " + hostName + ":" + port); 47 54 connected = true; 48 49 55 runThreads(); 50 56 … … 56 62 log.error("buffer creation failure"); 57 63 connectedFunctor.call(e); 58 close(); 59 } 60 } 64 // close(); 65 } 66 } 67 61 68 62 69 private static abstract class InboundMessage { … … 149 156 150 157 151 protected final String address;152 158 protected final String hostName; 153 159 protected final int port; … … 156 162 157 163 public ClientConnection(String address) { 158 assert address != null; 159 this.address = address; 164 super(address); 160 165 Matcher matcher = addressPattern.matcher(address); 161 166 if (!matcher.matches()) { … … 235 240 @Override 236 241 public String toString() { 237 return address;242 return "client connection " + address; 238 243 } 239 244 … … 328 333 private int nextQueryId = 0; 329 334 330 protected void processMessage(InboundMessage inboundMessage) throws Exception{335 protected void processMessage(InboundMessage inboundMessage) { 331 336 if (inboundMessage == null) { 332 337 log.error("failed to use any inbound message"); … … 342 347 } 343 348 344 protected void processEvent(String rest) throws Exception{349 protected void processEvent(String rest) { 345 350 Matcher matcher = eventPattern.matcher(rest); 346 351 if (!matcher.matches()) { … … 359 364 360 365 361 protected void processMessageStartingWith(String line) throws Exception{366 protected void processMessageStartingWith(String line) { 362 367 Pair<String, String> command = Strings.splitIntoPair(line, ' ', "\n"); 363 368 if (command.first.equals("event")) { … … 384 389 385 390 @Override 386 protected void receiverThreadRoutine() throws Exception{391 protected void receiverThreadRoutine() { 387 392 while (connected) { 388 processMessageStartingWith(getLine()); 389 } 390 } 393 try { 394 processMessageStartingWith(getLine()); 395 } catch (Exception e) { 396 break; 397 } 398 } 399 } 400 391 401 392 402 } -
java/main/src/main/java/com/framsticks/communication/Connection.java
r85 r88 1 1 package com.framsticks.communication; 2 2 3 import com.framsticks.util.FramsticksException; 3 4 import com.framsticks.util.io.Encoding; 4 5 import com.framsticks.util.lang.Pair; … … 14 15 import java.util.regex.Pattern; 15 16 17 import com.framsticks.util.dispatching.AbstractJoinable; 18 import com.framsticks.util.dispatching.Dispatching; 19 import com.framsticks.util.dispatching.Joinable; 20 import com.framsticks.util.dispatching.JoinableCollection; 21 import com.framsticks.util.dispatching.JoinableParent; 22 import com.framsticks.util.dispatching.JoinableState; 16 23 import com.framsticks.util.dispatching.RunAt; 17 24 import com.framsticks.util.dispatching.Thread; 18 25 19 public abstract class Connection {26 public abstract class Connection extends AbstractJoinable implements JoinableParent { 20 27 21 28 protected final static Logger log = Logger.getLogger(Connection.class); … … 32 39 protected int protocolVersion = -1; 33 40 41 public String getAddress() { 42 return address; 43 } 44 protected final String address; 45 34 46 protected final Thread<Connection> senderThread = new Thread<>(); 35 47 protected final Thread<Connection> receiverThread = new Thread<>(); 36 48 protected final JoinableCollection<Thread<Connection>> threads = new JoinableCollection<>(true); 49 50 /** 51 * 52 */ 53 public Connection(String address) { 54 this.address = address; 55 threads.add(senderThread); 56 threads.add(receiverThread); 57 } 37 58 public boolean isConnected() { 38 59 return connected; 39 60 } 40 61 41 public void close() {42 protocolVersion = -1;43 try {44 connected = false;45 46 senderThread.interrupt();47 senderThread.join();48 49 receiverThread.interrupt();50 receiverThread.join();51 52 if (output != null) {53 output.close();54 output = null;55 }56 57 if (input != null) {58 input.close();59 input = null;60 }61 62 63 if (socket != null) {64 socket.close();65 socket = null;66 }67 68 log.info("connection closed");69 } catch (Exception e) {70 log.error(e);71 }72 73 }74 62 75 63 protected static final String ARGUMENT_PATTERN_FRAGMENT = "((?:\\S+)|(?:\"[^\"]*\"))"; … … 95 83 char[] readBuffer = new char[BUFFER_LENGTH]; 96 84 97 protected String getLine() throws Exception{85 protected String getLine() { 98 86 StringBuilder lineBuffer = new StringBuilder(); 99 while (!Thread.interrupted()) { 100 while (iterator < readChars) { 101 if (readBuffer[iterator] != '\n') { 87 try { 88 while (!Thread.interrupted()) { 89 while (iterator < readChars) { 90 if (readBuffer[iterator] != '\n') { 91 ++iterator; 92 continue; 93 } 94 lineBuffer.append(readBuffer, bufferStart, iterator - bufferStart + 1); 102 95 ++iterator; 103 continue; 96 bufferStart = iterator; 97 return lineBuffer.toString(); 104 98 } 105 lineBuffer.append(readBuffer, bufferStart, iterator - bufferStart + 1); 106 ++iterator; 107 bufferStart = iterator; 108 return lineBuffer.toString(); 109 } 110 lineBuffer.append(readBuffer, bufferStart, readChars - bufferStart); 111 112 readChars = 0; 113 while (readChars == 0) { 114 try { 115 readChars = input.read(readBuffer); 116 } catch (SocketTimeoutException ignored) { 117 //timeout - continue 99 lineBuffer.append(readBuffer, bufferStart, readChars - bufferStart); 100 101 readChars = 0; 102 while (readChars == 0) { 103 try { 104 readChars = input.read(readBuffer); 105 } catch (SocketTimeoutException ignored) { 106 //timeout - continue 107 } 118 108 } 119 } 120 iterator = 0; 121 bufferStart = 0; 122 } 123 throw new InterruptedException(); 124 } 125 126 protected abstract void receiverThreadRoutine() throws Exception; 109 iterator = 0; 110 bufferStart = 0; 111 } 112 throw new InterruptedException(); 113 } catch (Exception e) { 114 throw new FramsticksException().msg("failed to read line").cause(e); 115 } 116 } 117 118 protected abstract void receiverThreadRoutine(); 119 120 protected void setUpThread(Thread<Connection> thread, String name) { 121 thread.setName("connection thread " + address + " " + name); 122 } 127 123 128 124 protected void runThreads() { … … 132 128 } catch (IOException e) { 133 129 log.error("buffer creation failure"); 134 close();135 130 return; 136 131 } 137 132 138 senderThread.setName(this + "-sender"); 139 receiverThread.setName(this + "-receiver"); 140 141 senderThread.start(); 142 receiverThread.start(); 133 setUpThread(senderThread, "sender"); 134 setUpThread(receiverThread, "receiver"); 135 Dispatching.use(threads, this); 143 136 144 137 receiverThread.invokeLater(new RunAt<Connection>() { 145 138 @Override 146 139 public void run() { 147 try { 148 receiverThreadRoutine(); 149 } catch (InterruptedException ignored) { 150 log.debug("receiver thread interrupted"); 151 } catch (Exception e) { 152 log.error("error: " + e); 153 close(); 154 } 140 receiverThreadRoutine(); 155 141 } 156 142 }); 157 143 158 144 } 159 160 145 161 146 /** … … 164 149 * @return Query associated with query getId. 165 150 */ 166 167 151 public int getProtocolVersion() { 168 152 return protocolVersion; 169 153 } 170 154 171 155 @Override 156 protected void joinableInterrupt() { 157 protocolVersion = -1; 158 159 connected = false; 160 Dispatching.drop(threads, this); 161 162 // finish(); 163 } 164 165 @Override 166 protected void joinableFinish() { 167 try { 168 if (output != null) { 169 output.close(); 170 output = null; 171 } 172 173 if (input != null) { 174 input.close(); 175 input = null; 176 } 177 178 179 if (socket != null) { 180 socket.close(); 181 socket = null; 182 } 183 } catch (Exception e) { 184 log.error("failed to stop connection: ", e); 185 } 186 log.debug("connection closed"); 187 } 188 189 @Override 190 public void childChangedState(Joinable joinable, JoinableState state) { 191 proceedToState(state); 192 } 193 194 @Override 195 protected void joinableJoin() throws InterruptedException { 196 Dispatching.join(threads); 197 198 } 172 199 173 200 } -
java/main/src/main/java/com/framsticks/communication/ServerConnection.java
r85 r88 20 20 21 21 public ServerConnection(Socket socket, RequestHandler requestHandler) { 22 super("todo"); 22 23 this.socket = socket; 23 24 this.requestHandler = requestHandler; 24 25 connected = true; 25 26 26 }27 28 public void start() {29 runThreads();30 27 } 31 28 … … 36 33 37 34 @Override 38 protected void receiverThreadRoutine() throws Exception{35 protected void receiverThreadRoutine() { 39 36 while (connected) { 40 37 processNextRequest(); … … 98 95 } 99 96 100 protected void processNextRequest() throws Exception{97 protected void processNextRequest() { 101 98 String line = getLine(); 102 99 Pair<String, String> command = Strings.splitIntoPair(line, ' ', "\n"); … … 124 121 }); 125 122 } 123 124 @Override 125 protected void joinableStart() { 126 // TODO Auto-generated method stub 127 128 } 126 129 }
Note: See TracChangeset
for help on using the changeset viewer.