1 | package com.framsticks.communication; |
---|
2 | |
---|
3 | import com.framsticks.communication.queries.*; |
---|
4 | import com.framsticks.util.FramsticksException; |
---|
5 | import com.framsticks.util.lang.Holder; |
---|
6 | import com.framsticks.util.lang.Pair; |
---|
7 | import com.framsticks.util.lang.Strings; |
---|
8 | |
---|
9 | import org.apache.logging.log4j.Logger; |
---|
10 | import org.apache.logging.log4j.LogManager; |
---|
11 | |
---|
12 | import java.net.Socket; |
---|
13 | import com.framsticks.util.dispatching.RunAt; |
---|
14 | |
---|
15 | /** |
---|
16 | * @author Piotr Sniegowski |
---|
17 | */ |
---|
18 | public class ServerSideManagedConnection extends ManagedConnection { |
---|
19 | |
---|
20 | private final static Logger log = LogManager.getLogger(ServerSideManagedConnection.class); |
---|
21 | |
---|
22 | protected final RequestHandler requestHandler; |
---|
23 | |
---|
24 | public ServerSideManagedConnection(Socket socket, RequestHandler requestHandler) { |
---|
25 | setAddress(new Address(socket.getInetAddress().getHostAddress(), socket.getPort())); |
---|
26 | setDescription("server connection"); |
---|
27 | this.socket = socket; |
---|
28 | this.requestHandler = requestHandler; |
---|
29 | // socket.setSoTimeout(500); |
---|
30 | setupStreams(); |
---|
31 | } |
---|
32 | |
---|
33 | |
---|
34 | |
---|
35 | @Override |
---|
36 | protected void receiverThreadRoutine() { |
---|
37 | |
---|
38 | processInputBatchesUntilClosed(); |
---|
39 | } |
---|
40 | |
---|
41 | protected void handleRequest(Request request, ServerSideResponseFuture responseCallback) { |
---|
42 | if (request instanceof ApplicationRequest) { |
---|
43 | requestHandler.handle((ApplicationRequest) request, responseCallback); |
---|
44 | return; |
---|
45 | } |
---|
46 | if (request instanceof ProtocolRequest) { |
---|
47 | if (request instanceof VersionRequest) { |
---|
48 | responseCallback.pass(new Response(true, null, null)); |
---|
49 | return; |
---|
50 | } |
---|
51 | if (request instanceof UseRequest) { |
---|
52 | String feature = ((UseRequest)request).getFeature(); |
---|
53 | if (feature.equals("request_id")) { |
---|
54 | requestIdEnabled = true; |
---|
55 | responseCallback.pass(new Response(true, null, null)); |
---|
56 | return; |
---|
57 | } |
---|
58 | responseCallback.pass(new Response(false, "unknown feature: " + feature, null)); |
---|
59 | return; |
---|
60 | } |
---|
61 | |
---|
62 | } |
---|
63 | log.error("unhandled request: {}", request); |
---|
64 | responseCallback.pass(new Response(false, "unhandled", null)); |
---|
65 | } |
---|
66 | |
---|
67 | |
---|
68 | |
---|
69 | protected final void respond(final Response response, final Integer id) { |
---|
70 | senderThread.dispatch(new RunAt<Connection>(requestHandler) { |
---|
71 | @Override |
---|
72 | protected void runAt() { |
---|
73 | if (response.getFiles() != null) { |
---|
74 | for (File f : response.getFiles()) { |
---|
75 | putFile(f, id); |
---|
76 | } |
---|
77 | } |
---|
78 | StringBuilder statusLine = new StringBuilder(); |
---|
79 | statusLine.append(response.getOk() ? "ok" : "error").append(idToString(id)); |
---|
80 | if (Strings.notEmpty(response.getComment())) { |
---|
81 | Request.quoteValue(statusLine.append(" "), response.getComment()); |
---|
82 | } |
---|
83 | putLine(statusLine.toString()); |
---|
84 | flushOut(); |
---|
85 | } |
---|
86 | }); |
---|
87 | |
---|
88 | } |
---|
89 | |
---|
90 | |
---|
91 | protected void processNextInputBatch() { |
---|
92 | final Holder<Integer> id = new Holder<>(); |
---|
93 | final String line = getLine(); |
---|
94 | try { |
---|
95 | Pair<CharSequence, CharSequence> command = Request.takeIdentifier(line); |
---|
96 | final Pair<Integer, CharSequence> rest = takeRequestId(command.second); |
---|
97 | id.set(rest.first); |
---|
98 | |
---|
99 | final Request request = Request.parse(command.first, rest.second); |
---|
100 | |
---|
101 | if (log.isTraceEnabled()) { |
---|
102 | log.trace("read request: {}", request); |
---|
103 | } |
---|
104 | |
---|
105 | handleRequest(request, new ServerSideResponseFuture() { |
---|
106 | @Override |
---|
107 | protected void result(Response response) { |
---|
108 | respond(response, rest.first); |
---|
109 | } |
---|
110 | }); |
---|
111 | } catch (FramsticksException e) { |
---|
112 | e.arg("id", id.get()).arg("line", line); |
---|
113 | log.error("error: ", e); |
---|
114 | respond(new Response(false, "invalid input: " + e.getMsg(), null), id.get()); |
---|
115 | return; |
---|
116 | } |
---|
117 | |
---|
118 | } |
---|
119 | } |
---|