- Timestamp:
- 07/04/13 20:29:50 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
java/main/src/main/java/com/framsticks/communication/ClientConnection.java
r90 r96 11 11 import com.framsticks.util.dispatching.Dispatcher; 12 12 import com.framsticks.util.dispatching.Dispatching; 13 import com.framsticks.util.dispatching.JoinableState; 13 14 import com.framsticks.util.lang.Pair; 14 15 import com.framsticks.util.lang.Strings; 16 17 import org.apache.log4j.Level; 15 18 import org.apache.log4j.Logger; 16 19 17 20 import java.io.IOException; 18 21 import java.net.Socket; 19 import java.net.SocketException;20 22 import java.util.*; 21 23 import java.util.regex.Matcher; … … 39 41 } 40 42 43 public ClientConnection(String address) { 44 super(address, "client connection"); 45 Matcher matcher = addressPattern.matcher(address); 46 if (!matcher.matches()) { 47 log.fatal("invalid address: " + address); 48 hostName = null; 49 port = 0; 50 return; 51 } 52 hostName = matcher.group(1); 53 port = matcher.group(3) != null ? Integer.parseInt(matcher.group(3)) : 9009; 54 } 41 55 42 56 protected StateFunctor connectedFunctor; 43 44 @Override45 protected void joinableStart() {46 try {47 log.debug("connecting to " + address);48 49 socket = new Socket(hostName, port);50 51 socket.setSoTimeout(500);52 53 log.debug("connected to " + hostName + ":" + port);54 connected = true;55 runThreads();56 57 connectedFunctor.call(null);58 } catch (SocketException e) {59 log.error("failed to connect: " + e);60 connectedFunctor.call(e);61 } catch (IOException e) {62 log.error("buffer creation failure");63 connectedFunctor.call(e);64 // close();65 }66 }67 68 57 69 58 private static abstract class InboundMessage { … … 78 67 currentFilePath = path; 79 68 } 69 80 70 protected void finishCurrentFile() { 81 71 if (currentFileContent == null) { … … 84 74 files.add(new File(currentFilePath, new ListSource(currentFileContent))); 85 75 currentFilePath = null; 86 currentFileContent = null;76 currentFileContent = null; 87 77 } 88 78 89 79 public abstract void startFile(String path); 90 80 91 public void addLine(String line) {81 public final void addLine(String line) { 92 82 assert line != null; 93 83 assert currentFileContent != null; 94 currentFileContent.add(line .substring(0, line.length() - 1));84 currentFileContent.add(line); 95 85 } 96 86 … … 127 117 public void startFile(String path) { 128 118 finishCurrentFile(); 129 if ( path == null) {119 if (!Strings.notEmpty(path)) { 130 120 assert request instanceof ApplicationRequest; 131 path = ((ApplicationRequest)request).getPath(); 132 } 121 path = ((ApplicationRequest) request).getPath(); 122 } 123 Strings.assureNotEmpty(path); 133 124 initCurrentFile(path); 134 125 } 135 126 136 127 public void eof() { 128 assert Strings.notEmpty(currentFilePath); 137 129 finishCurrentFile(); 138 130 //no-operation … … 153 145 } 154 146 } 147 155 148 private Map<Integer, SentQuery<?>> queryMap = new HashMap<>(); 156 157 149 158 150 protected final String hostName; … … 161 153 private static Pattern addressPattern = Pattern.compile("^([^:]*)(:([0-9]+))?$"); 162 154 163 public ClientConnection(String address) {164 super(address);165 Matcher matcher = addressPattern.matcher(address);166 if (!matcher.matches()) {167 log.fatal("invalid address: " + address);168 hostName = null;169 port = 0;170 return;171 }172 hostName = matcher.group(1);173 port = matcher.group(3) != null ? Integer.parseInt(matcher.group(3)) : 9009;174 }175 155 176 156 private SentQuery<?> currentlySentQuery; 177 178 157 179 158 public <C extends Connection> void send(Request request, ResponseCallback<C> callback) { … … 184 163 public <C> void send(Request request, Dispatcher<C> dispatcher, ResponseCallback<? extends C> callback) { 185 164 186 if ( !isConnected()) {165 if (getState().ordinal() > JoinableState.RUNNING.ordinal()) { 187 166 log.fatal("not connected"); 188 167 return; 189 168 } 169 190 170 final SentQuery<C> sentQuery = new SentQuery<C>(); 191 171 sentQuery.request = request; … … 193 173 sentQuery.dispatcher = dispatcher; 194 174 195 senderThread.dispatch(new RunAt<Connection>() {175 senderThread.dispatch(new RunAt<Connection>() { 196 176 @Override 197 177 public void run() { … … 220 200 message.append(" ").append(id); 221 201 } 202 message.append(" "); 222 203 sentQuery.request.construct(message); 223 204 String out = message.toString(); 224 205 225 206 output.println(out); 207 output.flush(); 226 208 log.debug("sending query: " + out); 227 209 … … 234 216 notifyAll(); 235 217 } 236 */ 237 } 238 218 */ 219 } 239 220 240 221 @Override … … 272 253 } 273 254 274 public void sendQueryVersion(final int version, finalStateFunctor stateFunctor) {275 send(new VersionRequest().version(version), new StateCallback<Connection>( ) {255 public void sendQueryVersion(final int version, StateFunctor stateFunctor) { 256 send(new VersionRequest().version(version), new StateCallback<Connection>(stateFunctor) { 276 257 @Override 277 public void call(Exception e) { 278 if (e != null) { 279 log.fatal("failed to upgrade protocol to version: " + version); 280 return; 281 } 258 public void callImpl() { 282 259 protocolVersion = version; 283 260 if (version < 4) { 284 261 /** it is an implicit loop here*/ 285 sendQueryVersion(version + 1, stateFunctor);262 sendQueryVersion(version + 1, move()); 286 263 return; 287 264 } 288 send(new UseRequest().feature("request_id"), new StateCallback<Connection>( ) {265 send(new UseRequest().feature("request_id"), new StateCallback<Connection>(move()) { 289 266 @Override 290 public void call(Exception e) { 291 requestIdEnabled = e == null; 292 /* 293 synchronized (ClientConnection.this) { 294 ClientConnection.this.notifyAll(); 295 } 296 */ 297 if (!requestIdEnabled) { 298 log.fatal("protocol negotiation failed"); 299 stateFunctor.call(new Exception("protocol negotiation failed", e)); 300 return; 301 } 302 stateFunctor.call(null); 267 public void handle(FramsticksException exception) { 268 requestIdEnabled = false; 269 log.fatal("protocol negotiation failed"); 270 super.handle(new FramsticksException().msg("protocol negotiation failed").cause(exception)); 271 } 272 273 @Override 274 public void callImpl() { 275 requestIdEnabled = true; 303 276 } 304 277 }); … … 307 280 }); 308 281 } 309 310 282 311 283 private synchronized SentQuery<?> fetchQuery(Integer id, boolean remove) { … … 348 320 349 321 protected void processEvent(String rest) { 350 Matcher matcher = eventPattern.matcher(rest);322 Matcher matcher = Request.EVENT_PATTERN.matcher(rest); 351 323 if (!matcher.matches()) { 352 324 log.error("invalid event line: " + rest); … … 363 335 } 364 336 365 366 337 protected void processMessageStartingWith(String line) { 367 Pair<String, String> command = Strings.splitIntoPair(line, ' ', "\n"); 368 if (command.first.equals("event")) { 369 processEvent(command.second); 370 return; 371 } 372 Pair<Integer, String> rest = parseRest(command.second); 373 374 if (command.first.equals("file")) { 375 SentQuery<?> sentQuery = fetchQuery(rest.first, false); 376 sentQuery.startFile(rest.second); 377 processMessage(sentQuery); 378 return; 379 } 380 381 SentQuery<?> sentQuery = fetchQuery(rest.first, true); 382 if (sentQuery == null) { 383 return; 384 } 385 log.debug("parsing response for request " + sentQuery); 386 387 sentQuery.dispatchResponseProcess(new Response(command.first.equals("ok"), rest.second, sentQuery.getFiles())); 338 try { 339 Pair<CharSequence, CharSequence> command = Request.takeIdentifier(line); 340 if (command.first.equals("event")) { 341 processEvent(command.second.toString()); 342 return; 343 } 344 Pair<Integer, CharSequence> rest = takeRequestId(command.second); 345 346 if (command.first.equals("file")) { 347 SentQuery<?> sentQuery = fetchQuery(rest.first, false); 348 sentQuery.startFile(rest.second.toString()); 349 processMessage(sentQuery); 350 return; 351 } 352 353 SentQuery<?> sentQuery = fetchQuery(rest.first, true); 354 if (sentQuery == null) { 355 return; 356 } 357 log.debug("parsing response for request " + sentQuery); 358 359 sentQuery.dispatchResponseProcess(new Response(command.first.equals("ok"), rest.second.toString(), sentQuery.getFiles())); 360 } catch (FramsticksException e) { 361 throw new FramsticksException().msg("failed to process message").arg("starting with line", line).cause(e); 362 } 388 363 } 389 364 390 365 @Override 391 366 protected void receiverThreadRoutine() { 392 while (connected) { 367 while (isRunning() && !isConnected()) { 368 log.debug("connecting to " + address); 369 try { 370 socket = new Socket(hostName, port); 371 } catch (IOException e) { 372 log.info(this + " failed to connect (retrying): " + e); 373 Dispatching.sleep(0.5); 374 } 375 } 376 377 log.debug(this + " connected"); 378 try { 379 socket.setSoTimeout(500); 380 setupStreams(); 381 } catch (Exception e) { 382 throw new FramsticksException().msg("failed to initialize socket").cause(e).arg("connection", this); 383 } 384 385 connectedFunctor.call(); 386 387 while (isRunning() && isConnected()) { 393 388 try { 394 389 processMessageStartingWith(getLine()); 395 390 } catch (Exception e) { 391 log.log(isRunning() ? Level.ERROR : Level.DEBUG, "caught exception: ", e); 396 392 break; 397 393 } 398 394 } 399 }400 401 395 interrupt(); 396 finish(); 397 } 402 398 }
Note: See TracChangeset
for help on using the changeset viewer.