package com.framsticks.communication; import com.framsticks.communication.queries.ApplicationRequest; import com.framsticks.communication.queries.CallRequest; import com.framsticks.communication.queries.ProtocolRequest; import com.framsticks.communication.queries.RegisterRequest; import com.framsticks.communication.queries.UseRequest; import com.framsticks.communication.queries.VersionRequest; import com.framsticks.core.Path; import com.framsticks.params.ListSource; import com.framsticks.util.*; import com.framsticks.util.dispatching.AtOnceDispatcher; import com.framsticks.util.dispatching.Dispatcher; import com.framsticks.util.dispatching.Dispatching; import com.framsticks.util.dispatching.ExceptionResultHandler; import com.framsticks.util.dispatching.Future; import com.framsticks.util.dispatching.FutureHandler; import com.framsticks.util.dispatching.JoinableState; import com.framsticks.util.lang.Casting; import com.framsticks.util.lang.Pair; import com.framsticks.util.lang.Strings; import com.framsticks.params.EventListener; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; import java.util.*; import java.util.regex.Matcher; import javax.annotation.Nonnull; import javax.annotation.Nullable; import com.framsticks.util.dispatching.RunAt; /** * @author Piotr Sniegowski */ public class ClientSideManagedConnection extends ManagedConnection { private final static Logger log = LogManager.getLogger(ClientSideManagedConnection.class); private final List applicationRequestsBuffer = new LinkedList<>(); private boolean isHandshakeDone = false; /** * @return the requestedVersion */ public int getRequestedVersion() { return requestedVersion; } /** * @param requestedVersion the requestedVersion to set */ public void setRequestedVersion(int requestedVersion) { this.requestedVersion = requestedVersion; } protected int requestedVersion = 4; public ClientSideManagedConnection() { setDescription("client connection"); protocolVersion = -1; } protected List readFileContent() { List content = new LinkedList(); String line; boolean longValue = false; while (true) { line = getLine(); if (longValue) { if (line.endsWith("~") && !line.endsWith("\\~")) { longValue = false; } } else { if (line.equals("eof")) { break; } if (line.endsWith(":~")) { longValue = true; } } content.add(line); } return content; } private static class SentQuery { Request request; ClientSideResponseFuture callback; Dispatcher dispatcher; protected final List files = new ArrayList(); public List getFiles() { return files; } @Override public String toString() { return request.toString(); } public void dispatchResponseProcess(final Response response) { Dispatching.dispatchIfNotActive(dispatcher, new RunAt(callback) { @Override protected void runAt() { callback.pass(response); } }); } } private Map> queryMap = new HashMap<>(); private SentQuery currentlySentQuery; public void send(ProtocolRequest request, ClientSideResponseFuture callback) { //TODO RunAt sendImplementation(request, AtOnceDispatcher.getInstance(), callback); } public void send(final ApplicationRequest request, final Dispatcher dispatcher, final ClientSideResponseFuture callback) { synchronized (applicationRequestsBuffer) { if (!isHandshakeDone) { applicationRequestsBuffer.add(new Runnable() { @Override public void run() { sendImplementation(request, dispatcher, callback); } }); return; } } sendImplementation(request, dispatcher, callback); } private void sendImplementation(Request request, Dispatcher dispatcher, ClientSideResponseFuture callback) { callback.setRequest(request); if (getState().ordinal() > JoinableState.RUNNING.ordinal()) { throw new FramsticksException().msg("connection is not connected").arg("connection", this); } final SentQuery sentQuery = new SentQuery(); sentQuery.request = request; sentQuery.callback = callback; sentQuery.dispatcher = dispatcher; senderThread.dispatch(new RunAt(callback) { @Override protected void runAt() { Integer id; synchronized (ClientSideManagedConnection.this) { while (!(requestIdEnabled || currentlySentQuery == null)) { try { ClientSideManagedConnection.this.wait(); } catch (InterruptedException ignored) { break; } } if (requestIdEnabled) { queryMap.put(nextQueryId, sentQuery); id = nextQueryId++; } else { currentlySentQuery = sentQuery; id = null; } } String command = sentQuery.request.getCommand(); StringBuilder message = new StringBuilder(); message.append(command); if (id != null) { message.append(" ").append(id); } message.append(" "); sentQuery.request.construct(message); String out = message.toString(); putLine(out); flushOut(); log.debug("sending query: {}", out); } }); /* synchronized (this) { log.debug("queueing query: {}", query); queryQueue.offer(sentQuery); notifyAll(); } */ } @Override public String toString() { return "client connection " + address; } private void sendQueryVersion(final int version, final Future future) { send(new VersionRequest().version(version), new ClientSideResponseFuture(future) { @Override protected void processOk(Response response) { protocolVersion = version; if (version < requestedVersion) { /** it is an implicit loop here*/ sendQueryVersion(version + 1, future); return; } send(new UseRequest().feature("request_id"), new ClientSideResponseFuture(future) { @Override protected void processOk(Response response) { requestIdEnabled = true; future.pass(null); } }); } }); } private synchronized @Nonnull SentQuery fetchQuery(@Nullable Integer id, boolean remove) { try { if (id == null) { if (requestIdEnabled) { throw new FramsticksException().msg("request_id is enabled and id is missing"); } SentQuery result = currentlySentQuery; if (remove) { currentlySentQuery = null; notifyAll(); } return result; } if (!queryMap.containsKey(id)) { throw new FramsticksException().msg("id is unknown").arg("id", id); } SentQuery result = queryMap.get(id); if (remove) { queryMap.remove(id); } return result; } catch (FramsticksException e) { throw new FramsticksException().msg("failed to match response to sent query").cause(e); } } private int nextQueryId = 0; protected void processEvent(String rest) { Matcher matcher = Request.EVENT_PATTERN.matcher(rest); if (!matcher.matches()) { throw new FramsticksException().msg("invalid event line").arg("rest", rest); } String fileLine = getLine(); if (!fileLine.equals("file")) { throw new FramsticksException().msg("expected file line").arg("got", fileLine); } String eventObjectPath = Request.takeGroup(rest, matcher, 1).toString(); String eventCalleePath = Request.takeGroup(rest, matcher, 2).toString(); final File file = new File("", new ListSource(readFileContent())); log.debug("firing event {}", eventObjectPath); EventListener listener; synchronized (registeredListeners) { listener = registeredListeners.get(eventObjectPath); } if (listener == null) { throw new FramsticksException().msg("failed to find registered event").arg("event path", eventObjectPath).arg("object", eventCalleePath); } listener.action(file); } protected void processFile(Pair rest) { final SentQuery sentQuery = fetchQuery(rest.first, false); String currentFilePath = rest.second.toString(); if (!Strings.notEmpty(currentFilePath)) { currentFilePath = Casting.throwCast(ApplicationRequest.class, sentQuery.request).getPath(); } sentQuery.files.add(new File(currentFilePath, new ListSource(readFileContent()))); } protected void processMessageStartingWith(final String header) { try { final Pair command = Request.takeIdentifier(header); if (command == null) { throw new FramsticksException().msg("failed to parse command"); } final CharSequence keyword = command.first; if (keyword.equals("event")) { processEvent(command.second.toString()); return; } final Pair rest = takeRequestId(command.second); if (rest == null) { throw new FramsticksException().msg("failed to parse optional id and remainder"); } if (keyword.equals("file")) { processFile(rest); return; } if (keyword.equals("ok") || keyword.equals("error")) { final SentQuery sentQuery = fetchQuery(rest.first, true); log.debug("parsing response for request {}", sentQuery); sentQuery.dispatchResponseProcess(new Response(command.first.equals("ok"), rest.second.toString(), sentQuery.getFiles())); return; } throw new FramsticksException().msg("unknown command keyword").arg("keyword", keyword); } catch (FramsticksException e) { throw new FramsticksException().msg("failed to process message").arg("starting with line", header).cause(e); } } protected final ExceptionResultHandler closeOnFailure = new ExceptionResultHandler() { @Override public void handle(FramsticksException exception) { interrupt(); // finish(); } }; @Override protected void receiverThreadRoutine() { startClientConnection(this); sendQueryVersion(1, new FutureHandler(closeOnFailure) { @Override protected void result(Void result) { synchronized (applicationRequestsBuffer) { isHandshakeDone = true; for (Runnable r : applicationRequestsBuffer) { r.run(); } applicationRequestsBuffer.clear(); } } }); processInputBatchesUntilClosed(); } protected void processNextInputBatch() { processMessageStartingWith(getLine()); } protected final Map> registeredListeners = new HashMap<>(); public void addListener(String path, final EventListener listener, final Dispatcher dispatcher, final Future future) { send(new RegisterRequest().path(path), dispatcher, new ClientSideResponseFuture(future) { @Override protected void processOk(Response response) { synchronized (registeredListeners) { registeredListeners.put(Path.validateString(response.getComment()), listener); } future.pass(null); } }); } public void removeListener(EventListener listener, final Dispatcher dispatcher, final Future future) { String eventPath = null; synchronized (registeredListeners) { for (Map.Entry> e : registeredListeners.entrySet()) { if (e.getValue() == listener) { eventPath = e.getKey(); break; } } } if (eventPath == null) { future.handle(new FramsticksException().msg("listener is not registered").arg("listener", listener)); return; } final String finalEventPath = eventPath; //TODO add arguments to the exception send(new CallRequest().procedure("remove").path(eventPath), dispatcher, new ClientSideResponseFuture(future) { @Override protected void processOk(Response response) { synchronized (registeredListeners) { registeredListeners.remove(finalEventPath); } future.pass(null); } }); } }