package com.framsticks.communication; import com.framsticks.communication.queries.ApplicationRequest; import com.framsticks.communication.queries.RegistrationRequest; import com.framsticks.communication.queries.UseRequest; import com.framsticks.communication.queries.VersionRequest; import com.framsticks.communication.util.LoggingStateCallback; import com.framsticks.params.ListSource; import com.framsticks.util.*; import com.framsticks.util.dispatching.AtOnceDispatcher; import com.framsticks.util.dispatching.Dispatcher; import com.framsticks.util.dispatching.Dispatching; import com.framsticks.util.lang.Pair; import com.framsticks.util.lang.Strings; import org.apache.log4j.Logger; import; import; import; import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; /** * @author Piotr Sniegowski */ public class ClientConnection extends Connection { private final static Logger log = Logger.getLogger(ClientConnection.class); protected final Map subscriptions = new HashMap(); public String getAddress() { return address; } public void connect(StateFunctor connectedFunctor) { try {"connecting to " + address); socket = new Socket(hostName, port); socket.setSoTimeout(500);"connected to " + hostName + ":" + port); connected = true; runThreads();; } catch (SocketException e) { log.error("failed to connect: " + e);; } catch (IOException e) { log.error("buffer creation failure");; close(); } } private static abstract class InboundMessage { protected String currentFilePath; protected List currentFileContent; protected final List files = new ArrayList(); public abstract void eof(); protected void initCurrentFile(String path) { currentFileContent = new LinkedList(); currentFilePath = path; } protected void finishCurrentFile() { if (currentFileContent == null) { return; } files.add(new File(currentFilePath, new ListSource(currentFileContent))); currentFilePath = null; currentFileContent= null; } public abstract void startFile(String path); public void addLine(String line) { assert line != null; assert currentFileContent != null; currentFileContent.add(line.substring(0, line.length() - 1)); } public List getFiles() { return files; } } private static class EventFire extends InboundMessage { public final Subscription subscription; private EventFire(Subscription subscription) { this.subscription = subscription; } public void startFile(String path) { assert path == null; initCurrentFile(null); } @Override public void eof() { finishCurrentFile(); Dispatching.invokeLaterOrNow(subscription.getDispatcher(), new Runnable() { @Override public void run() { subscription.getEventCallback().call(getFiles()); } }); } } private static class SentQuery extends InboundMessage { Request request; ResponseCallback callback; Dispatcher dispatcher; public void startFile(String path) { finishCurrentFile(); if (path == null) { assert request instanceof ApplicationRequest; path = ((ApplicationRequest)request).getPath(); } initCurrentFile(path); } public void eof() { finishCurrentFile(); //no-operation } @Override public String toString() { return request.toString(); } } private Map queryMap = new HashMap(); protected final String address; protected final String hostName; protected final int port; private static Pattern addressPattern = Pattern.compile("^([^:]*)(:([0-9]+))?$"); public ClientConnection(String address) { assert address != null; this.address = address; Matcher matcher = addressPattern.matcher(address); if (!matcher.matches()) { log.fatal("invalid address: " + address); hostName = null; port = 0; return; } hostName =; port = != null ? Integer.parseInt( : 9009; } private SentQuery currentlySentQuery; public void send(Request request, ResponseCallback callback) { send(request, AtOnceDispatcher.instance, callback); } public void send(Request request, Dispatcher dispatcher, ResponseCallback callback) { if (!isConnected()) { log.fatal("not connected"); return; } final SentQuery sentQuery = new SentQuery(); sentQuery.request = request; sentQuery.callback = callback; sentQuery.dispatcher = dispatcher; senderThread.invokeLater(new Runnable(){ @Override public void run() { synchronized (ClientConnection.this) { while (!(requestIdEnabled || currentlySentQuery == null)) { try { ClientConnection.this.wait(); } catch (InterruptedException ignored) { break; } } } Integer id = stashQuery(sentQuery); String command = sentQuery.request.getCommand(); StringBuilder message = new StringBuilder(); message.append(command); if (id != null) { message.append(" ").append(id); } sentQuery.request.construct(message); String out = message.toString(); output.println(out); log.debug("sending query: " + out); } }); /* synchronized (this) { log.debug("queueing query: " + query); queryQueue.offer(sentQuery); notifyAll(); } */ } @Override public String toString() { return address; } public void subscribe(final String path, final SubscriptionCallback callback) { send(new RegistrationRequest().setPath(path), new ResponseCallback() { @Override public void process(Response response) { if (!response.getOk()) { log.error("failed to register on event: " + path); callback.subscribed(null); return; } assert response.getFiles().isEmpty(); Subscription subscription = new Subscription(ClientConnection.this, path, response.getComment()); log.debug("registered on event: " + subscription); synchronized (subscriptions) { subscriptions.put(subscription.getRegisteredPath(), subscription); } subscription.setEventCallback(callback.subscribed(subscription)); if (subscription.getEventCallback() == null) {"subscription for " + path + " aborted"); subscription.unsubscribe(new LoggingStateCallback(log, "abort subscription")); } } }); } public void negotiateProtocolVersion(StateFunctor stateFunctor) { protocolVersion = -1; sendQueryVersion(1, stateFunctor); } public void sendQueryVersion(final int version, final StateFunctor stateFunctor) { send(new VersionRequest().version(version), new StateCallback() { @Override public void call(Exception e) { if (e != null) { log.fatal("failed to upgrade protocol to version: " + version); return; } protocolVersion = version; if (version < 4) { /** it is an implicit loop here*/ sendQueryVersion(version + 1, stateFunctor); return; } send(new UseRequest().feature("request_id"), new StateCallback() { @Override public void call(Exception e) { requestIdEnabled = e == null; /* synchronized (ClientConnection.this) { ClientConnection.this.notifyAll(); } */ if (!requestIdEnabled) { log.fatal("protocol negotiation failed"); Exception("protocol negotiation failed", e)); return; }; } }); } }); } private synchronized SentQuery fetchQuery(Integer id, boolean remove) { if (id == null) { if (requestIdEnabled) { return null; } SentQuery result = currentlySentQuery; if (remove) { currentlySentQuery = null; notifyAll(); } return result; } if (queryMap.containsKey(id)) { SentQuery result = queryMap.get(id); if (remove) { queryMap.remove(id); } return result; } return null; } private int nextQueryId = 0; private Integer stashQuery(SentQuery sentQuery) { if (!requestIdEnabled) { currentlySentQuery = sentQuery; return null; } queryMap.put(nextQueryId, sentQuery); return nextQueryId++; } protected void processMessage(InboundMessage inboundMessage) throws Exception { if (inboundMessage == null) { log.error("failed to use any inbound message"); return; } String line; while (!(line = getLine()).startsWith("eof")) { // log.debug("line: " + line); inboundMessage.addLine(line); } inboundMessage.eof(); } protected void processEvent(String rest) throws Exception { Matcher matcher = eventPattern.matcher(rest); if (!matcher.matches()) { log.error("invalid event line: " + rest); return; } Subscription subscription = subscriptions.get(; if (subscription == null) { log.error("non subscribed event: " +; return; } EventFire event = new EventFire(subscription); event.startFile(null); processMessage(event); } protected void processMessageStartingWith(String line) throws Exception { Pair command = Strings.splitIntoPair(line, ' ', "\n"); if (command.first.equals("event")) { processEvent(command.second); return; } Pair rest = parseRest(command.second); if (command.first.equals("file")) { SentQuery sentQuery = fetchQuery(rest.first, false); sentQuery.startFile(rest.second); processMessage(sentQuery); return; } SentQuery sentQuery = fetchQuery(rest.first, true); if (sentQuery == null) { return; } log.debug("parsing response for request " + sentQuery); final Response response = new Response(command.first.equals("ok"), rest.second, sentQuery.getFiles()); final ResponseCallback callback = sentQuery.callback; Dispatching.invokeLaterOrNow(sentQuery.dispatcher, new Runnable() { @Override public void run() { callback.process(response); } }); } @Override protected void receiverThreadRoutine() throws Exception { while (connected) { processMessageStartingWith(getLine()); } } }