source: java/main/src/main/java/com/framsticks/communication/ClientConnection.java @ 85

Last change on this file since 85 was 85, checked in by psniegowski, 11 years ago

HIGHLIGHTS:

  • upgrade to Java 7
    • use try-multi-catch clauses
    • use try-with-resources were appropriate
  • configure FindBugs? (use mvn site and then navigate in browser to the report)
    • remove most bugs found
  • parametrize Dispatching environment (Dispatcher, RunAt?) to enforce more control on the place of closures actual call

CHANGELOG:
Rework FavouritesXMLFactory.

FindBugs?. Thread start.

FindBugs?. Minor change.

FindBugs?. Iterate over entrySet.

FindBugs?. Various.

FindBug?.

FindBug?. Encoding.

FindBug?. Final fields.

FindBug?.

Remove synchronization bug in ClientConnection?.

Experiments with findbugs.

Finish parametrization.

Make RunAt? an abstract class.

More changes in parametrization.

More changes in parametrizing dispatching.

Several changes to parametrize tasks.

Rename Runnable to RunAt?.

Add specific framsticks Runnable.

Add JSR305 (annotations).

Add findbugs reporting.

More improvements to ParamBuilder? wording.

Make FramsClass? accept also ParamBuilder?.

Change wording of ParamBuilder?.

Change wording of Request creation.

Use Java 7 exception catch syntax.

Add ScopeEnd? class.

Upgrade to Java 7.

File size: 10.1 KB
Line 
1package com.framsticks.communication;
2
3import com.framsticks.communication.queries.ApplicationRequest;
4import com.framsticks.communication.queries.RegistrationRequest;
5import com.framsticks.communication.queries.UseRequest;
6import com.framsticks.communication.queries.VersionRequest;
7import com.framsticks.communication.util.LoggingStateCallback;
8import com.framsticks.params.ListSource;
9import com.framsticks.util.*;
10import com.framsticks.util.dispatching.AtOnceDispatcher;
11import com.framsticks.util.dispatching.Dispatcher;
12import com.framsticks.util.dispatching.Dispatching;
13import com.framsticks.util.lang.Pair;
14import com.framsticks.util.lang.Strings;
15import org.apache.log4j.Logger;
16
17import java.io.IOException;
18import java.net.Socket;
19import java.net.SocketException;
20import java.util.*;
21import java.util.regex.Matcher;
22import java.util.regex.Pattern;
23import com.framsticks.util.dispatching.RunAt;
24
25/**
26 * @author Piotr Sniegowski
27 */
28public class ClientConnection extends Connection {
29
30        private final static Logger log = Logger.getLogger(ClientConnection.class);
31
32        protected final Map<String, Subscription<?>> subscriptions = new HashMap<>();
33
34        public String getAddress() {
35                return address;
36        }
37
38        public void connect(StateFunctor connectedFunctor) {
39                try {
40                        log.info("connecting to " + address);
41
42                        socket = new Socket(hostName, port);
43
44                        socket.setSoTimeout(500);
45
46                        log.info("connected to " + hostName + ":" + port);
47                        connected = true;
48
49                        runThreads();
50
51                        connectedFunctor.call(null);
52                } catch (SocketException e) {
53                        log.error("failed to connect: " + e);
54                        connectedFunctor.call(e);
55                } catch (IOException e) {
56                        log.error("buffer creation failure");
57                        connectedFunctor.call(e);
58                        close();
59                }
60        }
61
62        private static abstract class InboundMessage {
63                protected String currentFilePath;
64                protected List<String> currentFileContent;
65                protected final List<File> files = new ArrayList<File>();
66
67                public abstract void eof();
68
69                protected void initCurrentFile(String path) {
70                        currentFileContent = new LinkedList<String>();
71                        currentFilePath = path;
72                }
73                protected void finishCurrentFile() {
74                        if (currentFileContent == null) {
75                                return;
76                        }
77                        files.add(new File(currentFilePath, new ListSource(currentFileContent)));
78                        currentFilePath = null;
79                        currentFileContent= null;
80                }
81
82                public abstract void startFile(String path);
83
84                public void addLine(String line) {
85                        assert line != null;
86                        assert currentFileContent != null;
87                        currentFileContent.add(line.substring(0, line.length() - 1));
88                }
89
90                public List<File> getFiles() {
91                        return files;
92                }
93        }
94
95        private static class EventFire extends InboundMessage {
96                public final Subscription<?> subscription;
97
98                private EventFire(Subscription<?> subscription) {
99                        this.subscription = subscription;
100                }
101
102                public void startFile(String path) {
103                        assert path == null;
104                        initCurrentFile(null);
105                }
106
107                @Override
108                public void eof() {
109                        finishCurrentFile();
110
111                        subscription.dispatchCall(getFiles());
112                }
113        }
114
115        private static class SentQuery<C> extends InboundMessage {
116                Request request;
117                ResponseCallback<? extends C> callback;
118                Dispatcher<C> dispatcher;
119
120                public void startFile(String path) {
121                        finishCurrentFile();
122                        if (path == null) {
123                                assert request instanceof ApplicationRequest;
124                                path = ((ApplicationRequest)request).getPath();
125                        }
126                        initCurrentFile(path);
127                }
128
129                public void eof() {
130                        finishCurrentFile();
131                        //no-operation
132                }
133
134                @Override
135                public String toString() {
136                        return request.toString();
137                }
138
139                public void dispatchResponseProcess(final Response response) {
140                        Dispatching.invokeLaterOrNow(dispatcher, new RunAt<C>() {
141                                @Override
142                                public void run() {
143                                        callback.process(response);
144                                }
145                        });
146                }
147        }
148        private Map<Integer, SentQuery<?>> queryMap = new HashMap<>();
149
150
151        protected final String address;
152        protected final String hostName;
153        protected final int port;
154
155        private static Pattern addressPattern = Pattern.compile("^([^:]*)(:([0-9]+))?$");
156
157        public ClientConnection(String address) {
158                assert address != null;
159                this.address = address;
160                Matcher matcher = addressPattern.matcher(address);
161                if (!matcher.matches()) {
162                        log.fatal("invalid address: " + address);
163                        hostName = null;
164                        port = 0;
165                        return;
166                }
167                hostName = matcher.group(1);
168                port = matcher.group(3) != null ? Integer.parseInt(matcher.group(3)) : 9009;
169        }
170
171        private SentQuery<?> currentlySentQuery;
172
173
174        public <C extends Connection> void send(Request request, ResponseCallback<C> callback) {
175                //TODO RunAt
176                send(request, AtOnceDispatcher.getInstance(), callback);
177        }
178
179        public <C> void send(Request request, Dispatcher<C> dispatcher, ResponseCallback<? extends C> callback) {
180
181                if (!isConnected()) {
182                        log.fatal("not connected");
183                        return;
184                }
185                final SentQuery<C> sentQuery = new SentQuery<C>();
186                sentQuery.request = request;
187                sentQuery.callback = callback;
188                sentQuery.dispatcher = dispatcher;
189
190                senderThread.invokeLater(new RunAt<Connection>(){
191                        @Override
192                        public void run() {
193                                Integer id;
194                                synchronized (ClientConnection.this) {
195
196                                        while (!(requestIdEnabled || currentlySentQuery == null)) {
197                                                try {
198                                                        ClientConnection.this.wait();
199                                                } catch (InterruptedException ignored) {
200                                                        break;
201                                                }
202                                        }
203                                        if (requestIdEnabled) {
204                                                queryMap.put(nextQueryId, sentQuery);
205                                                id = nextQueryId++;
206                                        } else {
207                                                currentlySentQuery = sentQuery;
208                                                id = null;
209                                        }
210                                }
211                                String command = sentQuery.request.getCommand();
212                                StringBuilder message = new StringBuilder();
213                                message.append(command);
214                                if (id != null) {
215                                        message.append(" ").append(id);
216                                }
217                                sentQuery.request.construct(message);
218                                String out = message.toString();
219
220                                output.println(out);
221                                log.debug("sending query: " + out);
222
223                        }
224                });
225                /*
226                synchronized (this) {
227                        log.debug("queueing query: " + query);
228                        queryQueue.offer(sentQuery);
229                        notifyAll();
230                }
231                */
232        }
233
234
235        @Override
236        public String toString() {
237                return address;
238        }
239
240        public <C> void subscribe(final String path, final Dispatcher<C> dispatcher, final SubscriptionCallback<? extends C> callback) {
241                send(new RegistrationRequest().path(path), new ResponseCallback<Connection>() {
242                        @Override
243                        public void process(Response response) {
244                                if (!response.getOk()) {
245                                        log.error("failed to register on event: " + path);
246                                        callback.subscribed(null);
247                                        return;
248                                }
249                                assert response.getFiles().isEmpty();
250                                Subscription<C> subscription = new Subscription<C>(ClientConnection.this, path, response.getComment(), dispatcher);
251                                log.debug("registered on event: " + subscription);
252                                synchronized (subscriptions) {
253                                        subscriptions.put(subscription.getRegisteredPath(), subscription);
254                                }
255                                subscription.setEventCallback(callback.subscribed(subscription));
256                                if (subscription.getEventCallback() == null) {
257                                        log.info("subscription for " + path + " aborted");
258                                        subscription.unsubscribe(new LoggingStateCallback<C>(log, "abort subscription"));
259                                }
260                        }
261                });
262        }
263
264        public void negotiateProtocolVersion(StateFunctor stateFunctor) {
265                protocolVersion = -1;
266                sendQueryVersion(1, stateFunctor);
267        }
268
269        public void sendQueryVersion(final int version, final StateFunctor stateFunctor) {
270                send(new VersionRequest().version(version), new StateCallback<Connection>() {
271                        @Override
272                        public void call(Exception e) {
273                                if (e != null) {
274                                        log.fatal("failed to upgrade protocol to version: " + version);
275                                        return;
276                                }
277                                protocolVersion = version;
278                                if (version < 4) {
279                                        /** it is an implicit loop here*/
280                                        sendQueryVersion(version + 1, stateFunctor);
281                                        return;
282                                }
283                                send(new UseRequest().feature("request_id"), new StateCallback<Connection>() {
284                                        @Override
285                                        public void call(Exception e) {
286                                                requestIdEnabled = e == null;
287                                                /*
288                                                synchronized (ClientConnection.this) {
289                                                        ClientConnection.this.notifyAll();
290                                                }
291                                                */
292                                                if (!requestIdEnabled) {
293                                                        log.fatal("protocol negotiation failed");
294                                                        stateFunctor.call(new Exception("protocol negotiation failed", e));
295                                                        return;
296                                                }
297                                                stateFunctor.call(null);
298                                        }
299                                });
300
301                        }
302                });
303        }
304
305
306        private synchronized SentQuery<?> fetchQuery(Integer id, boolean remove) {
307                if (id == null) {
308                        if (requestIdEnabled) {
309                                return null;
310                        }
311                        SentQuery<?> result = currentlySentQuery;
312                        if (remove) {
313                                currentlySentQuery = null;
314                                notifyAll();
315                        }
316                        return result;
317                }
318                if (queryMap.containsKey(id)) {
319                        SentQuery<?> result = queryMap.get(id);
320                        if (remove) {
321                                queryMap.remove(id);
322                        }
323                        return result;
324                }
325                return null;
326        }
327
328        private int nextQueryId = 0;
329
330        protected void processMessage(InboundMessage inboundMessage) throws Exception {
331                if (inboundMessage == null) {
332                        log.error("failed to use any inbound message");
333                        return;
334                }
335
336                String line;
337                while (!(line = getLine()).startsWith("eof")) {
338                        // log.debug("line: " + line);
339                        inboundMessage.addLine(line);
340                }
341                inboundMessage.eof();
342        }
343
344        protected void processEvent(String rest) throws Exception {
345                Matcher matcher = eventPattern.matcher(rest);
346                if (!matcher.matches()) {
347                        log.error("invalid event line: " + rest);
348                        return;
349                }
350                Subscription<?> subscription = subscriptions.get(matcher.group(1));
351                if (subscription == null) {
352                        log.error("non subscribed event: " + matcher.group(1));
353                        return;
354                }
355                EventFire event = new EventFire(subscription);
356                event.startFile(null);
357                processMessage(event);
358        }
359
360
361        protected void processMessageStartingWith(String line) throws Exception {
362                Pair<String, String> command = Strings.splitIntoPair(line, ' ', "\n");
363                if (command.first.equals("event")) {
364                        processEvent(command.second);
365                        return;
366                }
367                Pair<Integer, String> rest = parseRest(command.second);
368
369                if (command.first.equals("file")) {
370                        SentQuery<?> sentQuery = fetchQuery(rest.first, false);
371                        sentQuery.startFile(rest.second);
372                        processMessage(sentQuery);
373                        return;
374                }
375
376                SentQuery<?> sentQuery = fetchQuery(rest.first, true);
377                if (sentQuery == null) {
378                        return;
379                }
380                log.debug("parsing response for request " + sentQuery);
381
382                sentQuery.dispatchResponseProcess(new Response(command.first.equals("ok"), rest.second, sentQuery.getFiles()));
383        }
384
385        @Override
386        protected void receiverThreadRoutine() throws Exception {
387                while (connected) {
388                        processMessageStartingWith(getLine());
389                }
390        }
391
392}
Note: See TracBrowser for help on using the repository browser.