package com.framsticks.communication; import com.framsticks.communication.queries.ApplicationRequest; import com.framsticks.communication.queries.ProtocolRequest; import com.framsticks.communication.queries.RegistrationRequest; import com.framsticks.communication.queries.UseRequest; import com.framsticks.communication.queries.VersionRequest; import com.framsticks.communication.util.LoggingStateCallback; import com.framsticks.params.ListSource; import com.framsticks.util.*; import com.framsticks.util.dispatching.AtOnceDispatcher; import com.framsticks.util.dispatching.Dispatcher; import com.framsticks.util.dispatching.Dispatching; import com.framsticks.util.dispatching.ExceptionResultHandler; import com.framsticks.util.dispatching.Future; import com.framsticks.util.dispatching.FutureHandler; import com.framsticks.util.dispatching.JoinableState; import com.framsticks.util.lang.Pair; import com.framsticks.util.lang.Strings; import org.apache.log4j.Logger; import java.util.*; import java.util.regex.Matcher; import com.framsticks.util.dispatching.RunAt; /** * @author Piotr Sniegowski */ public class ClientSideManagedConnection extends ManagedConnection { private final static Logger log = Logger.getLogger(ClientSideManagedConnection.class); protected final Map> subscriptions = new HashMap<>(); private final List applicationRequestsBuffer = new LinkedList<>(); private boolean isHandshakeDone = false; /** * @return the requestedVersion */ public int getRequestedVersion() { return requestedVersion; } /** * @param requestedVersion the requestedVersion to set */ public void setRequestedVersion(int requestedVersion) { this.requestedVersion = requestedVersion; } protected int requestedVersion = 4; public ClientSideManagedConnection() { setDescription("client connection"); protocolVersion = -1; } private static abstract class InboundMessage { protected String currentFilePath; protected List currentFileContent; protected final List files = new ArrayList(); public abstract void eof(); protected void initCurrentFile(String path) { currentFileContent = new LinkedList(); currentFilePath = path; } protected void finishCurrentFile() { if (currentFileContent == null) { return; } files.add(new File(currentFilePath, new ListSource(currentFileContent))); currentFilePath = null; currentFileContent = null; } public abstract void startFile(String path); public final void addLine(String line) { assert line != null; assert currentFileContent != null; currentFileContent.add(line); } public List getFiles() { return files; } } private static class EventFire extends InboundMessage { public final Subscription subscription; private EventFire(Subscription subscription) { this.subscription = subscription; } public void startFile(String path) { assert path == null; initCurrentFile(null); } @Override public void eof() { finishCurrentFile(); subscription.dispatchCall(getFiles()); } } private static class SentQuery extends InboundMessage { Request request; ClientSideResponseFuture callback; Dispatcher dispatcher; public void startFile(String path) { finishCurrentFile(); if (!Strings.notEmpty(path)) { assert request instanceof ApplicationRequest; path = ((ApplicationRequest) request).getPath(); } Strings.assureNotEmpty(path); initCurrentFile(path); } public void eof() { assert Strings.notEmpty(currentFilePath); finishCurrentFile(); //no-operation } @Override public String toString() { return request.toString(); } public void dispatchResponseProcess(final Response response) { Dispatching.dispatchIfNotActive(dispatcher, new RunAt(callback) { @Override protected void runAt() { callback.pass(response); } }); } } private Map> queryMap = new HashMap<>(); private SentQuery currentlySentQuery; public void send(ProtocolRequest request, ClientSideResponseFuture callback) { //TODO RunAt sendImplementation(request, AtOnceDispatcher.getInstance(), callback); } public void send(final ApplicationRequest request, final Dispatcher dispatcher, final ClientSideResponseFuture callback) { synchronized (applicationRequestsBuffer) { if (!isHandshakeDone) { applicationRequestsBuffer.add(new Runnable() { @Override public void run() { sendImplementation(request, dispatcher, callback); } }); return; } } sendImplementation(request, dispatcher, callback); } private void sendImplementation(Request request, Dispatcher dispatcher, ClientSideResponseFuture callback) { callback.setRequest(request); if (getState().ordinal() > JoinableState.RUNNING.ordinal()) { log.fatal("not connected"); return; } final SentQuery sentQuery = new SentQuery(); sentQuery.request = request; sentQuery.callback = callback; sentQuery.dispatcher = dispatcher; senderThread.dispatch(new RunAt(callback) { @Override protected void runAt() { Integer id; synchronized (ClientSideManagedConnection.this) { while (!(requestIdEnabled || currentlySentQuery == null)) { try { ClientSideManagedConnection.this.wait(); } catch (InterruptedException ignored) { break; } } if (requestIdEnabled) { queryMap.put(nextQueryId, sentQuery); id = nextQueryId++; } else { currentlySentQuery = sentQuery; id = null; } } String command = sentQuery.request.getCommand(); StringBuilder message = new StringBuilder(); message.append(command); if (id != null) { message.append(" ").append(id); } message.append(" "); sentQuery.request.construct(message); String out = message.toString(); putLine(out); flushOut(); log.debug("sending query: " + out); } }); /* synchronized (this) { log.debug("queueing query: " + query); queryQueue.offer(sentQuery); notifyAll(); } */ } @Override public String toString() { return "client connection " + address; } public void subscribe(final String path, final Dispatcher dispatcher, final SubscriptionCallback callback) { send(new RegistrationRequest().path(path), AtOnceDispatcher.getInstance(), new ClientSideResponseFuture(callback) { @Override protected void processOk(Response response) { assert response.getFiles().isEmpty(); Subscription subscription = new Subscription(ClientSideManagedConnection.this, path, response.getComment(), dispatcher); log.debug("registered on event: " + subscription); synchronized (subscriptions) { subscriptions.put(subscription.getRegisteredPath(), subscription); } subscription.setEventCallback(callback.subscribed(subscription)); if (subscription.getEventCallback() == null) { log.info("subscription for " + path + " aborted"); subscription.unsubscribe(new LoggingStateCallback(log, "abort subscription")); } } }); } private void sendQueryVersion(final int version, final Future future) { send(new VersionRequest().version(version), new ClientSideResponseFuture(future) { @Override protected void processOk(Response response) { protocolVersion = version; if (version < requestedVersion) { /** it is an implicit loop here*/ sendQueryVersion(version + 1, future); return; } send(new UseRequest().feature("request_id"), new ClientSideResponseFuture(future) { @Override protected void processOk(Response response) { requestIdEnabled = true; future.pass(null); } }); } }); } private synchronized SentQuery fetchQuery(Integer id, boolean remove) { if (id == null) { if (requestIdEnabled) { return null; } SentQuery result = currentlySentQuery; if (remove) { currentlySentQuery = null; notifyAll(); } return result; } if (queryMap.containsKey(id)) { SentQuery result = queryMap.get(id); if (remove) { queryMap.remove(id); } return result; } return null; } private int nextQueryId = 0; protected void processMessage(InboundMessage inboundMessage) { if (inboundMessage == null) { log.error("failed to use any inbound message"); return; } String line; while (!(line = getLine()).startsWith("eof")) { // log.debug("line: " + line); inboundMessage.addLine(line); } inboundMessage.eof(); } protected void processEvent(String rest) { Matcher matcher = Request.EVENT_PATTERN.matcher(rest); if (!matcher.matches()) { log.error("invalid event line: " + rest); return; } Subscription subscription = subscriptions.get(matcher.group(1)); if (subscription == null) { log.error("non subscribed event: " + matcher.group(1)); return; } EventFire event = new EventFire(subscription); event.startFile(null); processMessage(event); } protected void processMessageStartingWith(String line) { try { Pair command = Request.takeIdentifier(line); if (command.first.equals("event")) { processEvent(command.second.toString()); return; } Pair rest = takeRequestId(command.second); if (command.first.equals("file")) { SentQuery sentQuery = fetchQuery(rest.first, false); sentQuery.startFile(rest.second.toString()); processMessage(sentQuery); return; } SentQuery sentQuery = fetchQuery(rest.first, true); if (sentQuery == null) { return; } log.debug("parsing response for request " + sentQuery); sentQuery.dispatchResponseProcess(new Response(command.first.equals("ok"), rest.second.toString(), sentQuery.getFiles())); } catch (FramsticksException e) { throw new FramsticksException().msg("failed to process message").arg("starting with line", line).cause(e); } } protected final ExceptionResultHandler closeOnFailure = new ExceptionResultHandler() { @Override public void handle(FramsticksException exception) { interrupt(); // finish(); } }; @Override protected void receiverThreadRoutine() { startClientConnection(this); sendQueryVersion(1, new FutureHandler(closeOnFailure) { @Override protected void result(Void result) { synchronized (applicationRequestsBuffer) { isHandshakeDone = true; for (Runnable r : applicationRequestsBuffer) { r.run(); } applicationRequestsBuffer.clear(); } } }); processInputBatchesUntilClosed(); } protected void processNextInputBatch() { processMessageStartingWith(getLine()); } }