Changeset 100 for java/main/src/main/java/com/framsticks/communication/ClientSideManagedConnection.java
- Timestamp:
- 07/12/13 23:41:06 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
java/main/src/main/java/com/framsticks/communication/ClientSideManagedConnection.java
r99 r100 17 17 import com.framsticks.util.dispatching.FutureHandler; 18 18 import com.framsticks.util.dispatching.JoinableState; 19 import com.framsticks.util.lang.Casting; 19 20 import com.framsticks.util.lang.Pair; 20 21 import com.framsticks.util.lang.Strings; 21 22 import com.framsticks.params.EventListener; 22 23 23 import org.apache.log4j.Logger; 24 import org.apache.logging.log4j.Logger; 25 import org.apache.logging.log4j.LogManager; 24 26 25 27 import java.util.*; 26 28 import java.util.regex.Matcher; 29 30 import javax.annotation.Nonnull; 31 import javax.annotation.Nullable; 32 27 33 import com.framsticks.util.dispatching.RunAt; 28 34 … … 32 38 public class ClientSideManagedConnection extends ManagedConnection { 33 39 34 private final static Logger log = Log ger.getLogger(ClientSideManagedConnection.class);40 private final static Logger log = LogManager.getLogger(ClientSideManagedConnection.class); 35 41 36 42 private final List<Runnable> applicationRequestsBuffer = new LinkedList<>(); … … 60 66 61 67 62 private static abstract class InboundMessage {63 protected String currentFilePath;64 protected List<String> currentFileContent;65 protected final List<File> files = new ArrayList<File>();66 67 public abstract void eof();68 69 protected void initCurrentFile(String path) {70 currentFileContent = new LinkedList<String>();71 currentFilePath = path;72 }73 74 protected void finishCurrentFile() {75 if (currentFileContent == null) {76 return;77 }78 files.add(new File(currentFilePath, new ListSource(currentFileContent)));79 currentFilePath = null;80 currentFileContent = null;81 }82 83 public abstract void startFile(String path);84 85 public final void addLine(String line) {86 assert line != null;87 assert currentFileContent != null;88 currentFileContent.add(line);89 }90 91 public List<File> getFiles() {92 return files;93 }94 }95 68 96 69 protected List<String> readFileContent() { 97 70 List<String> content = new LinkedList<String>(); 98 71 String line; 99 while (!(line = getLine()).startsWith("eof")) { 72 boolean longValue = false; 73 while (true) { 74 line = getLine(); 75 if (longValue) { 76 if (line.endsWith("~") && !line.endsWith("\\~")) { 77 longValue = false; 78 } 79 } else { 80 if (line.equals("eof")) { 81 break; 82 } 83 if (line.endsWith(":~")) { 84 longValue = true; 85 } 86 } 100 87 content.add(line); 101 88 } … … 103 90 } 104 91 105 private static class SentQuery<C> extends InboundMessage { 92 private static class SentQuery<C> { 93 106 94 Request request; 107 95 ClientSideResponseFuture callback; 108 96 Dispatcher<C> dispatcher; 109 110 public void startFile(String path) { 111 finishCurrentFile(); 112 if (!Strings.notEmpty(path)) { 113 assert request instanceof ApplicationRequest; 114 path = ((ApplicationRequest) request).getPath(); 115 } 116 Strings.assureNotEmpty(path); 117 initCurrentFile(path); 118 } 119 120 public void eof() { 121 assert Strings.notEmpty(currentFilePath); 122 finishCurrentFile(); 123 //no-operation 97 protected final List<File> files = new ArrayList<File>(); 98 99 public List<File> getFiles() { 100 return files; 124 101 } 125 102 … … 169 146 170 147 if (getState().ordinal() > JoinableState.RUNNING.ordinal()) { 171 log.fatal("not connected"); 172 return; 148 throw new FramsticksException().msg("connection is not connected").arg("connection", this); 173 149 } 174 150 … … 211 187 putLine(out); 212 188 flushOut(); 213 log.debug("sending query: " +out);189 log.debug("sending query: {}", out); 214 190 215 191 } … … 217 193 /* 218 194 synchronized (this) { 219 log.debug("queueing query: " +query);195 log.debug("queueing query: {}", query); 220 196 queryQueue.offer(sentQuery); 221 197 notifyAll(); … … 253 229 } 254 230 255 private synchronized SentQuery<?> fetchQuery(Integer id, boolean remove) { 256 if (id == null) { 257 if (requestIdEnabled) { 258 return null; 259 } 260 SentQuery<?> result = currentlySentQuery; 261 if (remove) { 262 currentlySentQuery = null; 263 notifyAll(); 264 } 265 return result; 266 } 267 if (queryMap.containsKey(id)) { 231 private synchronized @Nonnull SentQuery<?> fetchQuery(@Nullable Integer id, boolean remove) { 232 try { 233 if (id == null) { 234 if (requestIdEnabled) { 235 throw new FramsticksException().msg("request_id is enabled and id is missing"); 236 } 237 SentQuery<?> result = currentlySentQuery; 238 if (remove) { 239 currentlySentQuery = null; 240 notifyAll(); 241 } 242 return result; 243 } 244 245 if (!queryMap.containsKey(id)) { 246 throw new FramsticksException().msg("id is unknown").arg("id", id); 247 } 248 268 249 SentQuery<?> result = queryMap.get(id); 269 250 if (remove) { … … 271 252 } 272 253 return result; 273 } 274 return null; 254 255 } catch (FramsticksException e) { 256 throw new FramsticksException().msg("failed to match response to sent query").cause(e); 257 } 275 258 } 276 259 277 260 private int nextQueryId = 0; 278 279 protected void processMessage(InboundMessage inboundMessage) {280 if (inboundMessage == null) {281 log.error("failed to use any inbound message");282 return;283 }284 285 String line;286 while (!(line = getLine()).startsWith("eof")) {287 // log.debug("line: " + line);288 inboundMessage.addLine(line);289 }290 inboundMessage.eof();291 }292 261 293 262 protected void processEvent(String rest) { … … 303 272 String eventCalleePath = Request.takeGroup(rest, matcher, 2).toString(); 304 273 final File file = new File("", new ListSource(readFileContent())); 305 log.debug("firing event " +eventObjectPath);274 log.debug("firing event {}", eventObjectPath); 306 275 EventListener<File> listener; 307 276 synchronized (registeredListeners) { … … 314 283 } 315 284 316 protected void processMessageStartingWith(String line) { 285 protected void processFile(Pair<Integer, CharSequence> rest) { 286 final SentQuery<?> sentQuery = fetchQuery(rest.first, false); 287 288 String currentFilePath = rest.second.toString(); 289 if (!Strings.notEmpty(currentFilePath)) { 290 currentFilePath = Casting.throwCast(ApplicationRequest.class, sentQuery.request).getPath(); 291 } 292 293 sentQuery.files.add(new File(currentFilePath, new ListSource(readFileContent()))); 294 295 } 296 297 protected void processMessageStartingWith(final String header) { 317 298 try { 318 Pair<CharSequence, CharSequence> command = Request.takeIdentifier(line); 319 if (command.first.equals("event")) { 299 final Pair<CharSequence, CharSequence> command = Request.takeIdentifier(header); 300 if (command == null) { 301 throw new FramsticksException().msg("failed to parse command"); 302 } 303 final CharSequence keyword = command.first; 304 if (keyword.equals("event")) { 320 305 processEvent(command.second.toString()); 321 306 return; 322 307 } 323 Pair<Integer, CharSequence> rest = takeRequestId(command.second); 324 325 if (command.first.equals("file")) { 326 SentQuery<?> sentQuery = fetchQuery(rest.first, false); 327 sentQuery.startFile(rest.second.toString()); 328 processMessage(sentQuery); 308 309 final Pair<Integer, CharSequence> rest = takeRequestId(command.second); 310 if (rest == null) { 311 throw new FramsticksException().msg("failed to parse optional id and remainder"); 312 } 313 314 if (keyword.equals("file")) { 315 processFile(rest); 329 316 return; 330 317 } 331 332 SentQuery<?> sentQuery = fetchQuery(rest.first, true); 333 if (sentQuery == null) { 318 if (keyword.equals("ok") || keyword.equals("error")) { 319 320 final SentQuery<?> sentQuery = fetchQuery(rest.first, true); 321 322 log.debug("parsing response for request {}", sentQuery); 323 324 sentQuery.dispatchResponseProcess(new Response(command.first.equals("ok"), rest.second.toString(), sentQuery.getFiles())); 334 325 return; 335 326 } 336 log.debug("parsing response for request " + sentQuery); 337 338 sentQuery.dispatchResponseProcess(new Response(command.first.equals("ok"), rest.second.toString(), sentQuery.getFiles())); 327 328 throw new FramsticksException().msg("unknown command keyword").arg("keyword", keyword); 339 329 } catch (FramsticksException e) { 340 throw new FramsticksException().msg("failed to process message").arg("starting with line", line).cause(e);330 throw new FramsticksException().msg("failed to process message").arg("starting with line", header).cause(e); 341 331 } 342 332 }
Note: See TracChangeset
for help on using the changeset viewer.