- Timestamp:
- 06/22/13 21:51:33 (11 years ago)
- Location:
- java/main
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
java/main
-
Property
svn:ignore
set to
target
-
Property
svn:ignore
set to
-
java/main/src/main/java/com/framsticks/communication/ClientConnection.java
r77 r84 8 8 import com.framsticks.params.ListSource; 9 9 import com.framsticks.util.*; 10 import com.framsticks.util.dispatching.AtOnceDispatcher; 11 import com.framsticks.util.dispatching.Dispatcher; 12 import com.framsticks.util.dispatching.Dispatching; 13 import com.framsticks.util.lang.Pair; 14 import com.framsticks.util.lang.Strings; 10 15 import org.apache.log4j.Logger; 11 16 12 import java.io.BufferedReader;13 17 import java.io.IOException; 14 import java.io.InputStreamReader;15 import java.io.PrintWriter;16 18 import java.net.Socket; 17 19 import java.net.SocketException; … … 25 27 public class ClientConnection extends Connection { 26 28 27 private final static Logger LOGGER = Logger.getLogger(ClientConnection.class); 28 29 protected final Map<String, Subscription> subscriptions = new HashMap<String, Subscription>(); 30 31 public String getAddress() { 32 return address; 33 } 34 35 public void connect(StateFunctor connectedFunctor) { 36 try { 37 LOGGER.info("connecting to " + address); 38 39 socket = new Socket(hostName, port); 40 41 socket.setSoTimeout(500); 42 43 LOGGER.info("connected to " + hostName + ":" + port); 44 connected = true; 45 46 runThreads(); 47 48 connectedFunctor.call(null); 49 } catch (SocketException e) { 50 LOGGER.error("failed to connect: " + e); 51 connectedFunctor.call(e); 52 } catch (IOException e) { 53 LOGGER.error("buffer creation failure"); 54 connectedFunctor.call(e); 55 close(); 56 } 57 } 58 59 private static abstract class InboundMessage { 60 protected String currentFilePath; 61 protected List<String> currentFileContent; 62 protected final List<File> files = new ArrayList<File>(); 63 64 public abstract void eof(); 65 66 protected void initCurrentFile(String path) { 67 currentFileContent = new LinkedList<String>(); 68 currentFilePath = path; 69 } 70 protected void finishCurrentFile() { 71 if (currentFileContent == null) { 72 return; 73 } 74 files.add(new File(currentFilePath, new ListSource(currentFileContent))); 75 currentFilePath = null; 76 currentFileContent= null; 77 } 78 79 public abstract void startFile(String path); 80 81 public void addLine(String line) { 82 assert line != null; 83 assert currentFileContent != null; 84 currentFileContent.add(line.substring(0, line.length() - 1)); 85 } 86 87 public List<File> getFiles() { 88 return files; 89 } 90 } 91 92 private static class EventFire extends InboundMessage { 93 public final Subscription subscription; 94 95 private EventFire(Subscription subscription) { 96 this.subscription = subscription; 97 } 98 99 public void startFile(String path) { 100 assert path == null; 101 initCurrentFile(null); 102 } 103 104 @Override 105 public void eof() { 106 finishCurrentFile(); 107 Dispatching.invokeLaterOrNow(subscription.getDispatcher(), new Runnable() { 108 @Override 109 public void run() { 110 subscription.getEventCallback().call(getFiles()); 111 } 112 }); 113 } 114 } 115 116 private static class SentQuery extends InboundMessage { 117 Request request; 118 ResponseCallback callback; 119 Dispatcher dispatcher; 120 121 public void startFile(String path) { 122 finishCurrentFile(); 123 if (path == null) { 124 assert request instanceof ApplicationRequest; 125 path = ((ApplicationRequest)request).getPath(); 126 } 127 initCurrentFile(path); 128 } 129 130 public void eof() { 131 finishCurrentFile(); 132 //no-operation 133 } 134 135 @Override 136 public String toString() { 137 return request.toString(); 138 } 139 } 140 private Map<Integer, SentQuery> queryMap = new HashMap<Integer, SentQuery>(); 141 142 143 protected final String address; 144 protected final String hostName; 145 protected final int port; 146 147 private static Pattern addressPattern = Pattern.compile("^([^:]*)(:([0-9]+))?$"); 148 149 public ClientConnection(String address) { 150 assert address != null; 151 this.address = address; 152 Matcher matcher = addressPattern.matcher(address); 153 if (!matcher.matches()) { 154 LOGGER.fatal("invalid address: " + address); 155 hostName = null; 156 port = 0; 157 return; 158 } 159 hostName = matcher.group(1); 160 port = matcher.group(3) != null ? Integer.parseInt(matcher.group(3)) : 9009; 161 } 162 163 private SentQuery currentlySentQuery; 164 165 public void send(Request request, ResponseCallback callback) { 166 send(request, AtOnceDispatcher.instance, callback); 167 } 168 169 public void send(Request request, Dispatcher dispatcher, ResponseCallback callback) { 170 171 if (!isConnected()) { 172 LOGGER.fatal("not connected"); 173 return; 174 } 175 final SentQuery sentQuery = new SentQuery(); 176 sentQuery.request = request; 177 sentQuery.callback = callback; 178 sentQuery.dispatcher = dispatcher; 179 180 senderThread.invokeLater(new Runnable(){ 181 @Override 182 public void run() { 183 synchronized (ClientConnection.this) { 184 185 while (!(requestIdEnabled || currentlySentQuery == null)) { 186 try { 187 ClientConnection.this.wait(); 188 } catch (InterruptedException ignored) { 189 break; 190 } 191 } 192 } 193 Integer id = stashQuery(sentQuery); 194 String command = sentQuery.request.getCommand(); 195 StringBuilder message = new StringBuilder(); 196 message.append(command); 197 if (id != null) { 198 message.append(" ").append(id); 199 } 200 sentQuery.request.construct(message); 201 String out = message.toString(); 202 203 output.println(out); 204 LOGGER.debug("sending query: " + out); 205 206 } 207 }); 208 /* 209 synchronized (this) { 210 LOGGER.debug("queueing query: " + query); 211 queryQueue.offer(sentQuery); 212 notifyAll(); 213 } 214 */ 215 } 216 217 218 @Override 219 public String toString() { 220 return address; 221 } 222 223 public void subscribe(final String path, final SubscriptionCallback callback) { 224 send(new RegistrationRequest().setPath(path), new ResponseCallback() { 225 @Override 226 public void process(Response response) { 227 if (!response.getOk()) { 228 LOGGER.error("failed to register on event: " + path); 229 callback.subscribed(null); 230 return; 231 } 232 assert response.getFiles().isEmpty(); 233 Subscription subscription = new Subscription(ClientConnection.this, path, response.getComment()); 234 LOGGER.debug("registered on event: " + subscription); 235 synchronized (subscriptions) { 236 subscriptions.put(subscription.getRegisteredPath(), subscription); 237 } 238 subscription.setEventCallback(callback.subscribed(subscription)); 239 if (subscription.getEventCallback() == null) { 240 LOGGER.info("subscription for " + path + " aborted"); 241 subscription.unsubscribe(new LoggingStateCallback(LOGGER, "abort subscription")); 242 } 243 } 244 }); 245 } 246 247 public void negotiateProtocolVersion(StateFunctor stateFunctor) { 248 protocolVersion = -1; 249 sendQueryVersion(1, stateFunctor); 250 } 251 252 public void sendQueryVersion(final int version, final StateFunctor stateFunctor) { 253 send(new VersionRequest().version(version), new StateCallback() { 254 @Override 255 public void call(Exception e) { 256 if (e != null) { 257 LOGGER.fatal("failed to upgrade protocol to version: " + version); 258 return; 259 } 260 protocolVersion = version; 261 if (version < 4) { 262 /** it is an implicit loop here*/ 263 sendQueryVersion(version + 1, stateFunctor); 264 return; 265 } 266 send(new UseRequest().feature("request_id"), new StateCallback() { 267 @Override 268 public void call(Exception e) { 269 requestIdEnabled = e == null; 270 /* 271 synchronized (ClientConnection.this) { 272 ClientConnection.this.notifyAll(); 273 } 274 */ 275 if (!requestIdEnabled) { 276 LOGGER.fatal("protocol negotiation failed"); 277 stateFunctor.call(new Exception("protocol negotiation failed", e)); 278 return; 279 } 280 stateFunctor.call(null); 281 } 282 }); 283 284 } 285 }); 286 } 287 288 289 private synchronized SentQuery fetchQuery(Integer id, boolean remove) { 290 if (id == null) { 291 if (requestIdEnabled) { 292 return null; 293 } 294 SentQuery result = currentlySentQuery; 295 if (remove) { 296 currentlySentQuery = null; 297 notifyAll(); 298 } 299 return result; 300 } 301 if (queryMap.containsKey(id)) { 302 SentQuery result = queryMap.get(id); 303 if (remove) { 304 queryMap.remove(id); 305 } 306 return result; 307 } 308 return null; 309 } 310 311 private int nextQueryId = 0; 312 313 private Integer stashQuery(SentQuery sentQuery) { 314 if (!requestIdEnabled) { 315 currentlySentQuery = sentQuery; 316 return null; 317 } 318 queryMap.put(nextQueryId, sentQuery); 319 return nextQueryId++; 320 } 321 322 protected void processMessage(InboundMessage inboundMessage) throws Exception { 323 if (inboundMessage == null) { 324 LOGGER.error("failed to use any inbound message"); 325 return; 326 } 327 328 String line; 329 while (!(line = getLine()).startsWith("eof")) { 330 inboundMessage.addLine(line); 331 } 332 inboundMessage.eof(); 333 } 334 335 protected void processEvent(String rest) throws Exception { 336 Matcher matcher = eventPattern.matcher(rest); 337 if (!matcher.matches()) { 338 LOGGER.error("invalid event line: " + rest); 339 return; 340 } 341 Subscription subscription = subscriptions.get(matcher.group(1)); 342 if (subscription == null) { 343 LOGGER.error("non subscribed event: " + matcher.group(1)); 344 return; 345 } 346 EventFire event = new EventFire(subscription); 347 event.startFile(null); 348 processMessage(event); 349 } 350 351 352 protected void processMessageStartingWith(String line) throws Exception { 353 Pair<String, String> command = Strings.splitIntoPair(line, ' ', "\n"); 354 if (command.first.equals("event")) { 355 processEvent(command.second); 356 return; 357 } 358 Pair<Integer, String> rest = parseRest(command.second); 359 360 if (command.first.equals("file")) { 361 SentQuery sentQuery = fetchQuery(rest.first, false); 362 sentQuery.startFile(rest.second); 363 processMessage(sentQuery); 364 return; 365 } 366 367 SentQuery sentQuery = fetchQuery(rest.first, true); 368 if (sentQuery == null) { 369 return; 370 } 371 LOGGER.debug("parsing response for request " + sentQuery); 372 373 final Response response = new Response(command.first.equals("ok"), rest.second, sentQuery.getFiles()); 374 final ResponseCallback callback = sentQuery.callback; 375 376 Dispatching.invokeLaterOrNow(sentQuery.dispatcher, new Runnable() { 377 @Override 378 public void run() { 379 callback.process(response); 380 } 381 }); 382 } 383 384 @Override 385 protected void receiverThreadRoutine() throws Exception { 386 while (connected) { 387 processMessageStartingWith(getLine()); 388 } 389 } 29 private final static Logger log = Logger.getLogger(ClientConnection.class); 30 31 protected final Map<String, Subscription> subscriptions = new HashMap<String, Subscription>(); 32 33 public String getAddress() { 34 return address; 35 } 36 37 public void connect(StateFunctor connectedFunctor) { 38 try { 39 log.info("connecting to " + address); 40 41 socket = new Socket(hostName, port); 42 43 socket.setSoTimeout(500); 44 45 log.info("connected to " + hostName + ":" + port); 46 connected = true; 47 48 runThreads(); 49 50 connectedFunctor.call(null); 51 } catch (SocketException e) { 52 log.error("failed to connect: " + e); 53 connectedFunctor.call(e); 54 } catch (IOException e) { 55 log.error("buffer creation failure"); 56 connectedFunctor.call(e); 57 close(); 58 } 59 } 60 61 private static abstract class InboundMessage { 62 protected String currentFilePath; 63 protected List<String> currentFileContent; 64 protected final List<File> files = new ArrayList<File>(); 65 66 public abstract void eof(); 67 68 protected void initCurrentFile(String path) { 69 currentFileContent = new LinkedList<String>(); 70 currentFilePath = path; 71 } 72 protected void finishCurrentFile() { 73 if (currentFileContent == null) { 74 return; 75 } 76 files.add(new File(currentFilePath, new ListSource(currentFileContent))); 77 currentFilePath = null; 78 currentFileContent= null; 79 } 80 81 public abstract void startFile(String path); 82 83 public void addLine(String line) { 84 assert line != null; 85 assert currentFileContent != null; 86 currentFileContent.add(line.substring(0, line.length() - 1)); 87 } 88 89 public List<File> getFiles() { 90 return files; 91 } 92 } 93 94 private static class EventFire extends InboundMessage { 95 public final Subscription subscription; 96 97 private EventFire(Subscription subscription) { 98 this.subscription = subscription; 99 } 100 101 public void startFile(String path) { 102 assert path == null; 103 initCurrentFile(null); 104 } 105 106 @Override 107 public void eof() { 108 finishCurrentFile(); 109 Dispatching.invokeLaterOrNow(subscription.getDispatcher(), new Runnable() { 110 @Override 111 public void run() { 112 subscription.getEventCallback().call(getFiles()); 113 } 114 }); 115 } 116 } 117 118 private static class SentQuery extends InboundMessage { 119 Request request; 120 ResponseCallback callback; 121 Dispatcher dispatcher; 122 123 public void startFile(String path) { 124 finishCurrentFile(); 125 if (path == null) { 126 assert request instanceof ApplicationRequest; 127 path = ((ApplicationRequest)request).getPath(); 128 } 129 initCurrentFile(path); 130 } 131 132 public void eof() { 133 finishCurrentFile(); 134 //no-operation 135 } 136 137 @Override 138 public String toString() { 139 return request.toString(); 140 } 141 } 142 private Map<Integer, SentQuery> queryMap = new HashMap<Integer, SentQuery>(); 143 144 145 protected final String address; 146 protected final String hostName; 147 protected final int port; 148 149 private static Pattern addressPattern = Pattern.compile("^([^:]*)(:([0-9]+))?$"); 150 151 public ClientConnection(String address) { 152 assert address != null; 153 this.address = address; 154 Matcher matcher = addressPattern.matcher(address); 155 if (!matcher.matches()) { 156 log.fatal("invalid address: " + address); 157 hostName = null; 158 port = 0; 159 return; 160 } 161 hostName = matcher.group(1); 162 port = matcher.group(3) != null ? Integer.parseInt(matcher.group(3)) : 9009; 163 } 164 165 private SentQuery currentlySentQuery; 166 167 public void send(Request request, ResponseCallback callback) { 168 send(request, AtOnceDispatcher.instance, callback); 169 } 170 171 public void send(Request request, Dispatcher dispatcher, ResponseCallback callback) { 172 173 if (!isConnected()) { 174 log.fatal("not connected"); 175 return; 176 } 177 final SentQuery sentQuery = new SentQuery(); 178 sentQuery.request = request; 179 sentQuery.callback = callback; 180 sentQuery.dispatcher = dispatcher; 181 182 senderThread.invokeLater(new Runnable(){ 183 @Override 184 public void run() { 185 synchronized (ClientConnection.this) { 186 187 while (!(requestIdEnabled || currentlySentQuery == null)) { 188 try { 189 ClientConnection.this.wait(); 190 } catch (InterruptedException ignored) { 191 break; 192 } 193 } 194 } 195 Integer id = stashQuery(sentQuery); 196 String command = sentQuery.request.getCommand(); 197 StringBuilder message = new StringBuilder(); 198 message.append(command); 199 if (id != null) { 200 message.append(" ").append(id); 201 } 202 sentQuery.request.construct(message); 203 String out = message.toString(); 204 205 output.println(out); 206 log.debug("sending query: " + out); 207 208 } 209 }); 210 /* 211 synchronized (this) { 212 log.debug("queueing query: " + query); 213 queryQueue.offer(sentQuery); 214 notifyAll(); 215 } 216 */ 217 } 218 219 220 @Override 221 public String toString() { 222 return address; 223 } 224 225 public void subscribe(final String path, final SubscriptionCallback callback) { 226 send(new RegistrationRequest().setPath(path), new ResponseCallback() { 227 @Override 228 public void process(Response response) { 229 if (!response.getOk()) { 230 log.error("failed to register on event: " + path); 231 callback.subscribed(null); 232 return; 233 } 234 assert response.getFiles().isEmpty(); 235 Subscription subscription = new Subscription(ClientConnection.this, path, response.getComment()); 236 log.debug("registered on event: " + subscription); 237 synchronized (subscriptions) { 238 subscriptions.put(subscription.getRegisteredPath(), subscription); 239 } 240 subscription.setEventCallback(callback.subscribed(subscription)); 241 if (subscription.getEventCallback() == null) { 242 log.info("subscription for " + path + " aborted"); 243 subscription.unsubscribe(new LoggingStateCallback(log, "abort subscription")); 244 } 245 } 246 }); 247 } 248 249 public void negotiateProtocolVersion(StateFunctor stateFunctor) { 250 protocolVersion = -1; 251 sendQueryVersion(1, stateFunctor); 252 } 253 254 public void sendQueryVersion(final int version, final StateFunctor stateFunctor) { 255 send(new VersionRequest().version(version), new StateCallback() { 256 @Override 257 public void call(Exception e) { 258 if (e != null) { 259 log.fatal("failed to upgrade protocol to version: " + version); 260 return; 261 } 262 protocolVersion = version; 263 if (version < 4) { 264 /** it is an implicit loop here*/ 265 sendQueryVersion(version + 1, stateFunctor); 266 return; 267 } 268 send(new UseRequest().feature("request_id"), new StateCallback() { 269 @Override 270 public void call(Exception e) { 271 requestIdEnabled = e == null; 272 /* 273 synchronized (ClientConnection.this) { 274 ClientConnection.this.notifyAll(); 275 } 276 */ 277 if (!requestIdEnabled) { 278 log.fatal("protocol negotiation failed"); 279 stateFunctor.call(new Exception("protocol negotiation failed", e)); 280 return; 281 } 282 stateFunctor.call(null); 283 } 284 }); 285 286 } 287 }); 288 } 289 290 291 private synchronized SentQuery fetchQuery(Integer id, boolean remove) { 292 if (id == null) { 293 if (requestIdEnabled) { 294 return null; 295 } 296 SentQuery result = currentlySentQuery; 297 if (remove) { 298 currentlySentQuery = null; 299 notifyAll(); 300 } 301 return result; 302 } 303 if (queryMap.containsKey(id)) { 304 SentQuery result = queryMap.get(id); 305 if (remove) { 306 queryMap.remove(id); 307 } 308 return result; 309 } 310 return null; 311 } 312 313 private int nextQueryId = 0; 314 315 private Integer stashQuery(SentQuery sentQuery) { 316 if (!requestIdEnabled) { 317 currentlySentQuery = sentQuery; 318 return null; 319 } 320 queryMap.put(nextQueryId, sentQuery); 321 return nextQueryId++; 322 } 323 324 protected void processMessage(InboundMessage inboundMessage) throws Exception { 325 if (inboundMessage == null) { 326 log.error("failed to use any inbound message"); 327 return; 328 } 329 330 String line; 331 while (!(line = getLine()).startsWith("eof")) { 332 // log.debug("line: " + line); 333 inboundMessage.addLine(line); 334 } 335 inboundMessage.eof(); 336 } 337 338 protected void processEvent(String rest) throws Exception { 339 Matcher matcher = eventPattern.matcher(rest); 340 if (!matcher.matches()) { 341 log.error("invalid event line: " + rest); 342 return; 343 } 344 Subscription subscription = subscriptions.get(matcher.group(1)); 345 if (subscription == null) { 346 log.error("non subscribed event: " + matcher.group(1)); 347 return; 348 } 349 EventFire event = new EventFire(subscription); 350 event.startFile(null); 351 processMessage(event); 352 } 353 354 355 protected void processMessageStartingWith(String line) throws Exception { 356 Pair<String, String> command = Strings.splitIntoPair(line, ' ', "\n"); 357 if (command.first.equals("event")) { 358 processEvent(command.second); 359 return; 360 } 361 Pair<Integer, String> rest = parseRest(command.second); 362 363 if (command.first.equals("file")) { 364 SentQuery sentQuery = fetchQuery(rest.first, false); 365 sentQuery.startFile(rest.second); 366 processMessage(sentQuery); 367 return; 368 } 369 370 SentQuery sentQuery = fetchQuery(rest.first, true); 371 if (sentQuery == null) { 372 return; 373 } 374 log.debug("parsing response for request " + sentQuery); 375 376 final Response response = new Response(command.first.equals("ok"), rest.second, sentQuery.getFiles()); 377 final ResponseCallback callback = sentQuery.callback; 378 379 Dispatching.invokeLaterOrNow(sentQuery.dispatcher, new Runnable() { 380 @Override 381 public void run() { 382 callback.process(response); 383 } 384 }); 385 } 386 387 @Override 388 protected void receiverThreadRoutine() throws Exception { 389 while (connected) { 390 processMessageStartingWith(getLine()); 391 } 392 } 390 393 391 394 }
Note: See TracChangeset
for help on using the changeset viewer.