- Timestamp:
- 06/30/13 12:48:20 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
java/main/src/main/java/com/framsticks/communication/Connection.java
r85 r88 1 1 package com.framsticks.communication; 2 2 3 import com.framsticks.util.FramsticksException; 3 4 import com.framsticks.util.io.Encoding; 4 5 import com.framsticks.util.lang.Pair; … … 14 15 import java.util.regex.Pattern; 15 16 17 import com.framsticks.util.dispatching.AbstractJoinable; 18 import com.framsticks.util.dispatching.Dispatching; 19 import com.framsticks.util.dispatching.Joinable; 20 import com.framsticks.util.dispatching.JoinableCollection; 21 import com.framsticks.util.dispatching.JoinableParent; 22 import com.framsticks.util.dispatching.JoinableState; 16 23 import com.framsticks.util.dispatching.RunAt; 17 24 import com.framsticks.util.dispatching.Thread; 18 25 19 public abstract class Connection {26 public abstract class Connection extends AbstractJoinable implements JoinableParent { 20 27 21 28 protected final static Logger log = Logger.getLogger(Connection.class); … … 32 39 protected int protocolVersion = -1; 33 40 41 public String getAddress() { 42 return address; 43 } 44 protected final String address; 45 34 46 protected final Thread<Connection> senderThread = new Thread<>(); 35 47 protected final Thread<Connection> receiverThread = new Thread<>(); 36 48 protected final JoinableCollection<Thread<Connection>> threads = new JoinableCollection<>(true); 49 50 /** 51 * 52 */ 53 public Connection(String address) { 54 this.address = address; 55 threads.add(senderThread); 56 threads.add(receiverThread); 57 } 37 58 public boolean isConnected() { 38 59 return connected; 39 60 } 40 61 41 public void close() {42 protocolVersion = -1;43 try {44 connected = false;45 46 senderThread.interrupt();47 senderThread.join();48 49 receiverThread.interrupt();50 receiverThread.join();51 52 if (output != null) {53 output.close();54 output = null;55 }56 57 if (input != null) {58 input.close();59 input = null;60 }61 62 63 if (socket != null) {64 socket.close();65 socket = null;66 }67 68 log.info("connection closed");69 } catch (Exception e) {70 log.error(e);71 }72 73 }74 62 75 63 protected static final String ARGUMENT_PATTERN_FRAGMENT = "((?:\\S+)|(?:\"[^\"]*\"))"; … … 95 83 char[] readBuffer = new char[BUFFER_LENGTH]; 96 84 97 protected String getLine() throws Exception{85 protected String getLine() { 98 86 StringBuilder lineBuffer = new StringBuilder(); 99 while (!Thread.interrupted()) { 100 while (iterator < readChars) { 101 if (readBuffer[iterator] != '\n') { 87 try { 88 while (!Thread.interrupted()) { 89 while (iterator < readChars) { 90 if (readBuffer[iterator] != '\n') { 91 ++iterator; 92 continue; 93 } 94 lineBuffer.append(readBuffer, bufferStart, iterator - bufferStart + 1); 102 95 ++iterator; 103 continue; 96 bufferStart = iterator; 97 return lineBuffer.toString(); 104 98 } 105 lineBuffer.append(readBuffer, bufferStart, iterator - bufferStart + 1); 106 ++iterator; 107 bufferStart = iterator; 108 return lineBuffer.toString(); 109 } 110 lineBuffer.append(readBuffer, bufferStart, readChars - bufferStart); 111 112 readChars = 0; 113 while (readChars == 0) { 114 try { 115 readChars = input.read(readBuffer); 116 } catch (SocketTimeoutException ignored) { 117 //timeout - continue 99 lineBuffer.append(readBuffer, bufferStart, readChars - bufferStart); 100 101 readChars = 0; 102 while (readChars == 0) { 103 try { 104 readChars = input.read(readBuffer); 105 } catch (SocketTimeoutException ignored) { 106 //timeout - continue 107 } 118 108 } 119 } 120 iterator = 0; 121 bufferStart = 0; 122 } 123 throw new InterruptedException(); 124 } 125 126 protected abstract void receiverThreadRoutine() throws Exception; 109 iterator = 0; 110 bufferStart = 0; 111 } 112 throw new InterruptedException(); 113 } catch (Exception e) { 114 throw new FramsticksException().msg("failed to read line").cause(e); 115 } 116 } 117 118 protected abstract void receiverThreadRoutine(); 119 120 protected void setUpThread(Thread<Connection> thread, String name) { 121 thread.setName("connection thread " + address + " " + name); 122 } 127 123 128 124 protected void runThreads() { … … 132 128 } catch (IOException e) { 133 129 log.error("buffer creation failure"); 134 close();135 130 return; 136 131 } 137 132 138 senderThread.setName(this + "-sender"); 139 receiverThread.setName(this + "-receiver"); 140 141 senderThread.start(); 142 receiverThread.start(); 133 setUpThread(senderThread, "sender"); 134 setUpThread(receiverThread, "receiver"); 135 Dispatching.use(threads, this); 143 136 144 137 receiverThread.invokeLater(new RunAt<Connection>() { 145 138 @Override 146 139 public void run() { 147 try { 148 receiverThreadRoutine(); 149 } catch (InterruptedException ignored) { 150 log.debug("receiver thread interrupted"); 151 } catch (Exception e) { 152 log.error("error: " + e); 153 close(); 154 } 140 receiverThreadRoutine(); 155 141 } 156 142 }); 157 143 158 144 } 159 160 145 161 146 /** … … 164 149 * @return Query associated with query getId. 165 150 */ 166 167 151 public int getProtocolVersion() { 168 152 return protocolVersion; 169 153 } 170 154 171 155 @Override 156 protected void joinableInterrupt() { 157 protocolVersion = -1; 158 159 connected = false; 160 Dispatching.drop(threads, this); 161 162 // finish(); 163 } 164 165 @Override 166 protected void joinableFinish() { 167 try { 168 if (output != null) { 169 output.close(); 170 output = null; 171 } 172 173 if (input != null) { 174 input.close(); 175 input = null; 176 } 177 178 179 if (socket != null) { 180 socket.close(); 181 socket = null; 182 } 183 } catch (Exception e) { 184 log.error("failed to stop connection: ", e); 185 } 186 log.debug("connection closed"); 187 } 188 189 @Override 190 public void childChangedState(Joinable joinable, JoinableState state) { 191 proceedToState(state); 192 } 193 194 @Override 195 protected void joinableJoin() throws InterruptedException { 196 Dispatching.join(threads); 197 198 } 172 199 173 200 }
Note: See TracChangeset
for help on using the changeset viewer.