- Timestamp:
- 07/04/13 20:29:50 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
java/main/src/main/java/com/framsticks/communication/Connection.java
r90 r96 12 12 import java.net.Socket; 13 13 import java.net.SocketTimeoutException; 14 import java.util.regex.Matcher;15 import java.util.regex.Pattern;16 14 17 15 import com.framsticks.util.dispatching.AbstractJoinable; … … 33 31 protected Socket socket = null; 34 32 35 protected volatile boolean connected = false;36 37 33 public boolean requestIdEnabled = false; 38 34 … … 43 39 } 44 40 protected final String address; 41 protected final String description; 45 42 46 43 protected final Thread<Connection> senderThread = new Thread<>(); 47 44 protected final Thread<Connection> receiverThread = new Thread<>(); 48 protected final JoinableCollection<Thread<Connection>> threads = new JoinableCollection<>(true); 45 protected final JoinableCollection<Thread<Connection>> threads = new JoinableCollection<>(); 46 47 protected void setUpThreadNames(String name) { 48 } 49 49 50 50 /** 51 51 * 52 52 */ 53 public Connection(String address ) {53 public Connection(String address, String description) { 54 54 this.address = address; 55 this.description = description; 56 threads.setObservableName(address + " connection threads"); 55 57 threads.add(senderThread); 56 58 threads.add(receiverThread); 57 } 58 public boolean isConnected() { 59 return connected; 60 } 61 62 63 protected static final String ARGUMENT_PATTERN_FRAGMENT = "((?:\\S+)|(?:\"[^\"]*\"))"; 64 protected static final Pattern requestIdEnabledPattern = Pattern.compile("^\\s*([0-9]+)(?:\\s+" + ARGUMENT_PATTERN_FRAGMENT + ")?\\n$"); 65 protected static final Pattern requestIDisabledPattern = Pattern.compile("^\\s*" + ARGUMENT_PATTERN_FRAGMENT + "?\\n$"); 66 protected static final Pattern eventPattern = Pattern.compile("^\\s*(\\S+)\\s*(\\S+)\\n"); 67 68 69 protected final Pair<Integer, String> parseRest(String rest) { 70 Matcher matcher = (requestIdEnabled ? requestIdEnabledPattern : requestIDisabledPattern).matcher(rest); 71 if (!matcher.matches()) { 72 log.fatal("unmatched first line of input: " + rest); 73 return null; 74 } 75 return new Pair<Integer, String>(requestIdEnabled ? Integer.parseInt(matcher.group(1)) : null, matcher.group(requestIdEnabled ? 2 : 1)); 59 60 senderThread.setName(description + " thread " + address + " sender"); 61 receiverThread.setName(description + " thread " + address + " receiver"); 62 } 63 64 public synchronized boolean isConnected() { 65 return socket != null && socket.isConnected(); 66 } 67 68 69 // protected static final String ARGUMENT_PATTERN_FRAGMENT = "((?:\\S+)|(?:\\\"[^\"]*\\\"))"; 70 // protected static final Pattern REQUEST_ID_ENABLED_PATTERN = Pattern.compile("^\\s*([0-9]+)(?:\\s+" + ARGUMENT_PATTERN_FRAGMENT + ")?$"); 71 // protected static final Pattern REQUEST_ID_DISABLED_PATTERN = Pattern.compile("^\\s*" + ARGUMENT_PATTERN_FRAGMENT + "?$"); 72 73 // // protected final Pair<String, String> breakLine(String line) 74 // protected final Pair<Integer, String> parseRest(String rest) { 75 // Matcher matcher = (requestIdEnabled ? REQUEST_ID_ENABLED_PATTERN : REQUEST_ID_DISABLED_PATTERN).matcher(rest); 76 // if (!matcher.matches()) { 77 // log.fatal("unmatched first line of input: '" + rest + "'"); 78 // return null; 79 // } 80 // return new Pair<Integer, String>(requestIdEnabled ? Integer.parseInt(matcher.group(1)) : null, matcher.group(requestIdEnabled ? 2 : 1)); 81 // } 82 83 protected final Pair<Integer, CharSequence> takeRequestId(CharSequence line) { 84 return Request.takeRequestId(requestIdEnabled, line); 76 85 } 77 86 … … 81 90 int iterator = 0; 82 91 int bufferStart = 0; 83 char[] readBuffer = new char[BUFFER_LENGTH];92 final char[] readBuffer = new char[BUFFER_LENGTH]; 84 93 85 94 protected String getLine() { 86 StringBuilder lineBuffer = new StringBuilder();95 final StringBuilder lineBuffer = new StringBuilder(); 87 96 try { 88 97 while (!Thread.interrupted()) { … … 92 101 continue; 93 102 } 94 lineBuffer.append(readBuffer, bufferStart, iterator - bufferStart + 1); 103 /** Do not append new line. */ 104 lineBuffer.append(readBuffer, bufferStart, iterator - bufferStart); 95 105 ++iterator; 96 106 bufferStart = iterator; 97 107 return lineBuffer.toString(); 98 108 } 99 lineBuffer.append(readBuffer, bufferStart, readChars - bufferStart); 109 final int length = readChars - bufferStart; 110 if (length > 0) { 111 assert bufferStart >= 0 && bufferStart < BUFFER_LENGTH; 112 assert bufferStart + length <= BUFFER_LENGTH; 113 lineBuffer.append(readBuffer, bufferStart, length); 114 } 100 115 101 116 readChars = 0; … … 118 133 protected abstract void receiverThreadRoutine(); 119 134 120 protected void setUpThread(Thread<Connection> thread, String name) { 121 thread.setName("connection thread " + address + " " + name); 122 } 123 124 protected void runThreads() { 135 136 protected void setupStreams() { 125 137 try { 126 138 output = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Encoding.getFramsticksCharset()), true); 127 139 input = new BufferedReader(new InputStreamReader(socket.getInputStream(), Encoding.getFramsticksCharset())); 140 synchronized (this) { 141 this.notifyAll(); 142 } 128 143 } catch (IOException e) { 129 log.error("buffer creation failure"); 130 return; 144 throw new FramsticksException().msg("failed to setup streams").cause(e).arg("connection", this); 131 145 } 132 133 setUpThread(senderThread, "sender");134 setUpThread(receiverThread, "receiver");135 Dispatching.use(threads, this);136 137 receiverThread.dispatch(new RunAt<Connection>() {138 @Override139 public void run() {140 receiverThreadRoutine();141 }142 });143 144 146 } 145 147 … … 153 155 } 154 156 155 @Override156 protected void joinableInterrupt() {157 protocolVersion = -1;158 159 connected = false;160 Dispatching.drop(threads, this);161 162 // finish();163 }164 157 165 158 @Override … … 193 186 194 187 @Override 188 public String getName() { 189 return description + " " + address; 190 } 191 192 @Override 193 protected void joinableStart() { 194 Dispatching.use(threads, this); 195 196 senderThread.dispatch(new RunAt<Connection>() { 197 @Override 198 public void run() { 199 synchronized (Connection.this) { 200 while (state.equals(JoinableState.RUNNING) && output == null) { 201 Dispatching.wait(Connection.this, 500); 202 } 203 } 204 } 205 }); 206 207 receiverThread.dispatch(new RunAt<Connection>() { 208 @Override 209 public void run() { 210 receiverThreadRoutine(); 211 } 212 }); 213 } 214 215 @Override 216 protected void joinableInterrupt() { 217 protocolVersion = -1; 218 Dispatching.drop(threads, this); 219 finish(); 220 } 221 222 223 @Override 195 224 protected void joinableJoin() throws InterruptedException { 196 225 Dispatching.join(threads); 197 198 226 } 199 227
Note: See TracChangeset
for help on using the changeset viewer.