package com.framsticks.communication; import com.framsticks.communication.queries.*; import com.framsticks.params.SourceInterface; import com.framsticks.util.FramsticksException; import com.framsticks.util.lang.Holder; import com.framsticks.util.lang.Pair; import com.framsticks.util.lang.Strings; import org.apache.log4j.Logger; import java.net.Socket; import com.framsticks.util.dispatching.RunAt; import com.framsticks.util.dispatching.ThrowExceptionHandler; /** * @author Piotr Sniegowski */ public class ServerSideManagedConnection extends ManagedConnection { private final static Logger log = Logger.getLogger(ServerSideManagedConnection.class); RequestHandler requestHandler; public ServerSideManagedConnection(Socket socket, RequestHandler requestHandler) { setAddress(new Address(socket.getInetAddress().getHostAddress(), socket.getPort())); setDescription("server connection"); this.socket = socket; this.requestHandler = requestHandler; // socket.setSoTimeout(500); setupStreams(); } protected void processNextInputBatch() { processNextRequest(); } @Override protected void receiverThreadRoutine() { processInputBatchesUntilClosed(); } protected void handleRequest(Request request, ServerSideResponseFuture responseCallback) { if (request instanceof ApplicationRequest) { requestHandler.handle((ApplicationRequest) request, responseCallback); return; } if (request instanceof ProtocolRequest) { if (request instanceof VersionRequest) { responseCallback.pass(new Response(true, null, null)); return; } if (request instanceof UseRequest) { String feature = ((UseRequest)request).getFeature(); if (feature.equals("request_id")) { requestIdEnabled = true; responseCallback.pass(new Response(true, null, null)); return; } responseCallback.pass(new Response(false, "unknown feature: " + feature, null)); return; } } log.error("unhandled request: " + request); responseCallback.pass(new Response(false, "unhandled", null)); } protected final void respond(final Response response, final Integer id) { //TODO TEH: pass it the hosted tree senderThread.dispatch(new RunAt(ThrowExceptionHandler.getInstance()) { @Override protected void runAt() { String outId = id != null ? " " + id : ""; if (response.getFiles() != null) { for (File f : response.getFiles()) { putLine("file" + outId/* + " " + f.getPath()*/); SourceInterface content = f.getContent(); String line; while ((line = content.readLine()) != null) { putLine(line); } putLine("eof"); } } StringBuilder statusLine = new StringBuilder(); statusLine.append(response.getOk() ? "ok" : "error").append(outId); if (Strings.notEmpty(response.getComment())) { statusLine.append(" \"").append(response.getComment()).append('"'); } putLine(statusLine.toString()); flushOut(); } }); } protected void processNextRequest() { final Holder id = new Holder<>(); final String line = getLine(); try { Pair command = Request.takeIdentifier(line); final Pair rest = takeRequestId(command.second); id.set(rest.first); final Request request = Request.parse(command.first, rest.second); if (log.isTraceEnabled()) { log.trace("read request: " + request); } //TODO what to do here? handleRequest(request, new ServerSideResponseFuture() { @Override protected void result(Response response) { respond(response, rest.first); } }); } catch (FramsticksException e) { e.arg("id", id.get()).arg("line", line); log.error("error: ", e); respond(new Response(false, "invalid input: " + e.getMsg(), null), id.get()); return; } } }