- Timestamp:
- 07/06/13 03:51:11 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
java/main/src/main/java/com/framsticks/communication/Connection.java
r96 r97 1 1 package com.framsticks.communication; 2 2 3 import com.framsticks.params.annotations.AutoAppendAnnotation; 4 import com.framsticks.params.annotations.FramsClassAnnotation; 5 import com.framsticks.params.annotations.ParamAnnotation; 3 6 import com.framsticks.util.FramsticksException; 4 7 import com.framsticks.util.io.Encoding; 5 import com.framsticks.util.lang.Pair; 8 import com.framsticks.util.lang.Strings; 9 10 import org.apache.log4j.Level; 6 11 import org.apache.log4j.Logger; 7 12 import java.io.BufferedReader; … … 12 17 import java.net.Socket; 13 18 import java.net.SocketTimeoutException; 19 import java.util.Collection; 20 import java.util.HashSet; 21 import java.util.Set; 14 22 15 23 import com.framsticks.util.dispatching.AbstractJoinable; … … 21 29 import com.framsticks.util.dispatching.RunAt; 22 30 import com.framsticks.util.dispatching.Thread; 23 31 import com.framsticks.util.dispatching.ThrowExceptionHandler; 32 33 @FramsClassAnnotation 24 34 public abstract class Connection extends AbstractJoinable implements JoinableParent { 25 35 26 36 protected final static Logger log = Logger.getLogger(Connection.class); 27 37 28 pr otectedPrintWriter output = null;29 pr otectedBufferedReader input = null;38 private PrintWriter output = null; 39 private BufferedReader input = null; 30 40 31 41 protected Socket socket = null; 32 42 33 public boolean requestIdEnabled = false; 34 35 protected int protocolVersion = -1; 36 37 public String getAddress() { 38 return address; 39 } 40 protected final String address; 41 protected final String description; 43 protected Address address; 44 protected String description = "connection"; 42 45 43 46 protected final Thread<Connection> senderThread = new Thread<>(); 44 47 protected final Thread<Connection> receiverThread = new Thread<>(); 45 48 protected final JoinableCollection<Thread<Connection>> threads = new JoinableCollection<>(); 46 47 protected void setUpThreadNames(String name) { 48 } 49 protected final Set<ConnectionListener> listeners = new HashSet<>(); 49 50 50 51 /** 51 52 * 52 53 */ 53 public Connection(String address, String description) { 54 this.address = address; 55 this.description = description; 56 threads.setObservableName(address + " connection threads"); 54 public Connection() { 57 55 threads.add(senderThread); 58 56 threads.add(receiverThread); 59 57 58 } 59 60 protected void updateNames() { 61 if (address == null) { 62 return; 63 } 60 64 senderThread.setName(description + " thread " + address + " sender"); 61 65 receiverThread.setName(description + " thread " + address + " receiver"); 66 threads.setObservableName(address + " connection threads"); 67 } 68 69 public void setDescription(String description) { 70 this.description = description; 71 updateNames(); 72 } 73 74 @AutoAppendAnnotation 75 public Connection setAddress(Address address) { 76 this.address = address; 77 updateNames(); 78 return this; 79 } 80 81 @ParamAnnotation 82 public Connection setAddress(String address) { 83 return setAddress(new Address(address)); 62 84 } 63 85 64 86 public synchronized boolean isConnected() { 65 87 return socket != null && socket.isConnected(); 66 }67 68 69 // protected static final String ARGUMENT_PATTERN_FRAGMENT = "((?:\\S+)|(?:\\\"[^\"]*\\\"))";70 // protected static final Pattern REQUEST_ID_ENABLED_PATTERN = Pattern.compile("^\\s*([0-9]+)(?:\\s+" + ARGUMENT_PATTERN_FRAGMENT + ")?$");71 // protected static final Pattern REQUEST_ID_DISABLED_PATTERN = Pattern.compile("^\\s*" + ARGUMENT_PATTERN_FRAGMENT + "?$");72 73 // // protected final Pair<String, String> breakLine(String line)74 // protected final Pair<Integer, String> parseRest(String rest) {75 // Matcher matcher = (requestIdEnabled ? REQUEST_ID_ENABLED_PATTERN : REQUEST_ID_DISABLED_PATTERN).matcher(rest);76 // if (!matcher.matches()) {77 // log.fatal("unmatched first line of input: '" + rest + "'");78 // return null;79 // }80 // return new Pair<Integer, String>(requestIdEnabled ? Integer.parseInt(matcher.group(1)) : null, matcher.group(requestIdEnabled ? 2 : 1));81 // }82 83 protected final Pair<Integer, CharSequence> takeRequestId(CharSequence line) {84 return Request.takeRequestId(requestIdEnabled, line);85 88 } 86 89 … … 105 108 ++iterator; 106 109 bufferStart = iterator; 107 return lineBuffer.toString(); 110 String line = lineBuffer.toString(); 111 112 synchronized (listeners) { 113 for (ConnectionListener l : listeners) { 114 l.connectionIncomming(line); 115 } 116 } 117 118 return line; 108 119 } 109 120 final int length = readChars - bufferStart; … … 131 142 } 132 143 144 protected void putLine(String line) { 145 synchronized (listeners) { 146 for (ConnectionListener l : listeners) { 147 l.connectionOutgoing(line); 148 } 149 } 150 output.println(line); 151 } 152 153 protected void flushOut() { 154 output.flush(); 155 } 156 157 protected abstract void processNextInputBatch(); 158 159 protected final void processInputBatchesUntilClosed() { 160 while (isRunning() && isConnected()) { 161 try { 162 processNextInputBatch(); 163 } catch (Exception e) { 164 log.log(isRunning() ? Level.ERROR : Level.DEBUG, "caught exception: ", e); 165 break; 166 } 167 } 168 } 169 133 170 protected abstract void receiverThreadRoutine(); 134 171 135 172 // @SuppressWarnings("NN_NAKED_NOTIFY") 136 173 protected void setupStreams() { 137 174 try { … … 146 183 } 147 184 148 /**149 * Returns Query associated with query getId.150 *151 * @return Query associated with query getId.152 */153 public int getProtocolVersion() {154 return protocolVersion;155 }156 185 157 186 … … 187 216 @Override 188 217 public String getName() { 189 return description + " " + address;218 return address != null ? description + " " + address : description; 190 219 } 191 220 … … 194 223 Dispatching.use(threads, this); 195 224 196 senderThread.dispatch(new RunAt<Connection>( ) {225 senderThread.dispatch(new RunAt<Connection>(ThrowExceptionHandler.getInstance()) { 197 226 @Override 198 p ublic void run() {227 protected void runAt() { 199 228 synchronized (Connection.this) { 200 229 while (state.equals(JoinableState.RUNNING) && output == null) { … … 205 234 }); 206 235 207 receiverThread.dispatch(new RunAt<Connection>( ) {236 receiverThread.dispatch(new RunAt<Connection>(ThrowExceptionHandler.getInstance()) { 208 237 @Override 209 p ublic void run() {238 protected void runAt() { 210 239 receiverThreadRoutine(); 240 interrupt(); 241 finish(); 211 242 } 212 243 }); … … 215 246 @Override 216 247 protected void joinableInterrupt() { 217 protocolVersion = -1;218 248 Dispatching.drop(threads, this); 219 249 finish(); … … 226 256 } 227 257 258 protected static void startClientConnection(Connection connection) { 259 while (connection.isRunning() && !connection.isConnected()) { 260 log.debug("connecting to " + connection.address); 261 try { 262 connection.socket = new Socket(connection.getAddressObject().getHostName(), connection.getAddressObject().getPort()); 263 } catch (IOException e) { 264 log.info(connection + " failed to connect (retrying): " + e); 265 Dispatching.sleep(0.5); 266 } 267 } 268 269 log.debug(connection + " connected"); 270 try { 271 connection.socket.setSoTimeout(500); 272 connection.setupStreams(); 273 } catch (Exception e) { 274 throw new FramsticksException().msg("failed to initialize socket").cause(e).arg("connection", connection); 275 } 276 } 277 278 /** 279 * @return the address 280 */ 281 @ParamAnnotation 282 public String getAddress() { 283 return Strings.toStringNullProof(address, "?"); 284 } 285 286 public Address getAddressObject() { 287 return address; 288 } 289 290 291 /** 292 * @return the listeners 293 */ 294 public Collection<ConnectionListener> getListeners() { 295 return listeners; 296 } 297 298 public static <T extends Connection> T to(T connection, Address address) { 299 connection.setAddress(address); 300 return connection; 301 } 302 228 303 }
Note: See TracChangeset
for help on using the changeset viewer.