source: java/main/src/main/java/com/framsticks/communication/ClientConnection.java @ 96

Last change on this file since 96 was 96, checked in by psniegowski, 11 years ago

HIGHLIGHTS:

  • cleanup Instance management
    • extract Instance interface
    • extract Instance common algorithms to InstanceUtils?
  • fix closing issues: Ctrl+C or window close button

properly shutdown whole program

by Java Framsticks framework

  • fix parsing and printing of all request types
  • hide exception passing in special handle method of closures
    • substantially improve readability of closures
    • basically enable use of exception in asynchronous closures

(thrown exception is transported back to the caller)

  • implement call request on both sides

CHANGELOG:
Further improve calling.

Improve instance calling.

Calling is working on both sides.

Improve exception handling in testing.

Waiters do not supercede other apllication exception being thrown.

Finished parsing and printing of all request types (with tests).

Move implementation and tests of request parsing to Request.

Add tests for Requests.

Improve waits in asynchronours tests.

Extract more algorithms to InstanceUtils?.

Extract Instance.resolve to InstanceUtils?.

Improve naming.

Improve passing exception in InstanceClient?.

Hide calling of passed functor in StateCallback?.

Hide Exception passing in asynchronous closures.

Hide exception passing in Future.

Make ResponseCallback? an abstract class.

Make Future an abstract class.

Minor change.

Move getPath to Path.to()

Move bindAccess to InstanceUtils?.

Extract common things to InstanceUtils?.

Fix synchronization bug in Connection.

Move resolve to InstanceUtils?.

Allow names of Joinable to be dynamic.

Add support for set request server side.

More fixes in communication.

Fix issues with parsing in connection.

Cut new line characters when reading.

More improvements.

Migrate closures to FramsticksException?.

Several changes.

Extract resolveAndFetch to InstanceUtils? algorithms.

Test resolving and fetching.

More fixes with function signature deduction.

Do not print default values in SimpleAbstractAccess?.

Add test of FramsClass? printing.

Improve FramsticksException? messages.

Add explicit dispatcher synchronization feature.

Rework assertions in tests.

Previous solution was not generic enough.

Allow addition of joinables to collection after start.

Extract SimulatorInstance? from RemoteInstance?.

Remove PrivateJoinableCollection?.

Improve connections.

Move shutdown hook to inside the Monitor.

It should work in TestNG tests, but it seems that
hooks are not called.

In ServerTest? client connects to testing server.

Move socket initialization to receiver thread.

Add proper closing on Ctrl+C (don't use signals).

Fix bugs with server accepting connections.

Merge Entity into Joinable.

Reworking ServerInstance?.

Extract more algorithm to InstanceUtils?.

Extract some common functionality from AbstractInstance?.

Functions were placed in InstanceUtils?.

Hide registry of Instance.

Use ValueParam? in Instance interface.

Minor change.

Extract Instance interface.

Old Instance is now AbstractInstance?.

File size: 10.6 KB
Line 
1package com.framsticks.communication;
2
3import com.framsticks.communication.queries.ApplicationRequest;
4import com.framsticks.communication.queries.RegistrationRequest;
5import com.framsticks.communication.queries.UseRequest;
6import com.framsticks.communication.queries.VersionRequest;
7import com.framsticks.communication.util.LoggingStateCallback;
8import com.framsticks.params.ListSource;
9import com.framsticks.util.*;
10import com.framsticks.util.dispatching.AtOnceDispatcher;
11import com.framsticks.util.dispatching.Dispatcher;
12import com.framsticks.util.dispatching.Dispatching;
13import com.framsticks.util.dispatching.JoinableState;
14import com.framsticks.util.lang.Pair;
15import com.framsticks.util.lang.Strings;
16
17import org.apache.log4j.Level;
18import org.apache.log4j.Logger;
19
20import java.io.IOException;
21import java.net.Socket;
22import java.util.*;
23import java.util.regex.Matcher;
24import java.util.regex.Pattern;
25import com.framsticks.util.dispatching.RunAt;
26
27/**
28 * @author Piotr Sniegowski
29 */
30public class ClientConnection extends Connection {
31
32        private final static Logger log = Logger.getLogger(ClientConnection.class);
33
34        protected final Map<String, Subscription<?>> subscriptions = new HashMap<>();
35
36        /**
37         * @param connectedFunctor the connectedFunctor to set
38         */
39        public void setConnectedFunctor(StateFunctor connectedFunctor) {
40                this.connectedFunctor = connectedFunctor;
41        }
42
43        public ClientConnection(String address) {
44                super(address, "client connection");
45                Matcher matcher = addressPattern.matcher(address);
46                if (!matcher.matches()) {
47                        log.fatal("invalid address: " + address);
48                        hostName = null;
49                        port = 0;
50                        return;
51                }
52                hostName = matcher.group(1);
53                port = matcher.group(3) != null ? Integer.parseInt(matcher.group(3)) : 9009;
54        }
55
56        protected StateFunctor connectedFunctor;
57
58        private static abstract class InboundMessage {
59                protected String currentFilePath;
60                protected List<String> currentFileContent;
61                protected final List<File> files = new ArrayList<File>();
62
63                public abstract void eof();
64
65                protected void initCurrentFile(String path) {
66                        currentFileContent = new LinkedList<String>();
67                        currentFilePath = path;
68                }
69
70                protected void finishCurrentFile() {
71                        if (currentFileContent == null) {
72                                return;
73                        }
74                        files.add(new File(currentFilePath, new ListSource(currentFileContent)));
75                        currentFilePath = null;
76                        currentFileContent = null;
77                }
78
79                public abstract void startFile(String path);
80
81                public final void addLine(String line) {
82                        assert line != null;
83                        assert currentFileContent != null;
84                        currentFileContent.add(line);
85                }
86
87                public List<File> getFiles() {
88                        return files;
89                }
90        }
91
92        private static class EventFire extends InboundMessage {
93                public final Subscription<?> subscription;
94
95                private EventFire(Subscription<?> subscription) {
96                        this.subscription = subscription;
97                }
98
99                public void startFile(String path) {
100                        assert path == null;
101                        initCurrentFile(null);
102                }
103
104                @Override
105                public void eof() {
106                        finishCurrentFile();
107
108                        subscription.dispatchCall(getFiles());
109                }
110        }
111
112        private static class SentQuery<C> extends InboundMessage {
113                Request request;
114                ResponseCallback<? extends C> callback;
115                Dispatcher<C> dispatcher;
116
117                public void startFile(String path) {
118                        finishCurrentFile();
119                        if (!Strings.notEmpty(path)) {
120                                assert request instanceof ApplicationRequest;
121                                path = ((ApplicationRequest) request).getPath();
122                        }
123                        Strings.assureNotEmpty(path);
124                        initCurrentFile(path);
125                }
126
127                public void eof() {
128                        assert Strings.notEmpty(currentFilePath);
129                        finishCurrentFile();
130                        //no-operation
131                }
132
133                @Override
134                public String toString() {
135                        return request.toString();
136                }
137
138                public void dispatchResponseProcess(final Response response) {
139                        Dispatching.dispatchIfNotActive(dispatcher, new RunAt<C>() {
140                                @Override
141                                public void run() {
142                                        callback.process(response);
143                                }
144                        });
145                }
146        }
147
148        private Map<Integer, SentQuery<?>> queryMap = new HashMap<>();
149
150        protected final String hostName;
151        protected final int port;
152
153        private static Pattern addressPattern = Pattern.compile("^([^:]*)(:([0-9]+))?$");
154
155
156        private SentQuery<?> currentlySentQuery;
157
158        public <C extends Connection> void send(Request request, ResponseCallback<C> callback) {
159                //TODO RunAt
160                send(request, AtOnceDispatcher.getInstance(), callback);
161        }
162
163        public <C> void send(Request request, Dispatcher<C> dispatcher, ResponseCallback<? extends C> callback) {
164
165                if (getState().ordinal() > JoinableState.RUNNING.ordinal()) {
166                        log.fatal("not connected");
167                        return;
168                }
169
170                final SentQuery<C> sentQuery = new SentQuery<C>();
171                sentQuery.request = request;
172                sentQuery.callback = callback;
173                sentQuery.dispatcher = dispatcher;
174
175                senderThread.dispatch(new RunAt<Connection>() {
176                        @Override
177                        public void run() {
178                                Integer id;
179                                synchronized (ClientConnection.this) {
180
181                                        while (!(requestIdEnabled || currentlySentQuery == null)) {
182                                                try {
183                                                        ClientConnection.this.wait();
184                                                } catch (InterruptedException ignored) {
185                                                        break;
186                                                }
187                                        }
188                                        if (requestIdEnabled) {
189                                                queryMap.put(nextQueryId, sentQuery);
190                                                id = nextQueryId++;
191                                        } else {
192                                                currentlySentQuery = sentQuery;
193                                                id = null;
194                                        }
195                                }
196                                String command = sentQuery.request.getCommand();
197                                StringBuilder message = new StringBuilder();
198                                message.append(command);
199                                if (id != null) {
200                                        message.append(" ").append(id);
201                                }
202                                message.append(" ");
203                                sentQuery.request.construct(message);
204                                String out = message.toString();
205
206                                output.println(out);
207                                output.flush();
208                                log.debug("sending query: " + out);
209
210                        }
211                });
212                /*
213                synchronized (this) {
214                        log.debug("queueing query: " + query);
215                        queryQueue.offer(sentQuery);
216                        notifyAll();
217                }
218                 */
219        }
220
221        @Override
222        public String toString() {
223                return "client connection " + address;
224        }
225
226        public <C> void subscribe(final String path, final Dispatcher<C> dispatcher, final SubscriptionCallback<? extends C> callback) {
227                send(new RegistrationRequest().path(path), new ResponseCallback<Connection>() {
228                        @Override
229                        public void process(Response response) {
230                                if (!response.getOk()) {
231                                        log.error("failed to register on event: " + path);
232                                        callback.subscribed(null);
233                                        return;
234                                }
235                                assert response.getFiles().isEmpty();
236                                Subscription<C> subscription = new Subscription<C>(ClientConnection.this, path, response.getComment(), dispatcher);
237                                log.debug("registered on event: " + subscription);
238                                synchronized (subscriptions) {
239                                        subscriptions.put(subscription.getRegisteredPath(), subscription);
240                                }
241                                subscription.setEventCallback(callback.subscribed(subscription));
242                                if (subscription.getEventCallback() == null) {
243                                        log.info("subscription for " + path + " aborted");
244                                        subscription.unsubscribe(new LoggingStateCallback<C>(log, "abort subscription"));
245                                }
246                        }
247                });
248        }
249
250        public void negotiateProtocolVersion(StateFunctor stateFunctor) {
251                protocolVersion = -1;
252                sendQueryVersion(1, stateFunctor);
253        }
254
255        public void sendQueryVersion(final int version, StateFunctor stateFunctor) {
256                send(new VersionRequest().version(version), new StateCallback<Connection>(stateFunctor) {
257                        @Override
258                        public void callImpl() {
259                                protocolVersion = version;
260                                if (version < 4) {
261                                        /** it is an implicit loop here*/
262                                        sendQueryVersion(version + 1, move());
263                                        return;
264                                }
265                                send(new UseRequest().feature("request_id"), new StateCallback<Connection>(move()) {
266                                        @Override
267                                        public void handle(FramsticksException exception) {
268                                                requestIdEnabled = false;
269                                                log.fatal("protocol negotiation failed");
270                                                super.handle(new FramsticksException().msg("protocol negotiation failed").cause(exception));
271                                        }
272
273                                        @Override
274                                        public void callImpl() {
275                                                requestIdEnabled = true;
276                                        }
277                                });
278
279                        }
280                });
281        }
282
283        private synchronized SentQuery<?> fetchQuery(Integer id, boolean remove) {
284                if (id == null) {
285                        if (requestIdEnabled) {
286                                return null;
287                        }
288                        SentQuery<?> result = currentlySentQuery;
289                        if (remove) {
290                                currentlySentQuery = null;
291                                notifyAll();
292                        }
293                        return result;
294                }
295                if (queryMap.containsKey(id)) {
296                        SentQuery<?> result = queryMap.get(id);
297                        if (remove) {
298                                queryMap.remove(id);
299                        }
300                        return result;
301                }
302                return null;
303        }
304
305        private int nextQueryId = 0;
306
307        protected void processMessage(InboundMessage inboundMessage) {
308                if (inboundMessage == null) {
309                        log.error("failed to use any inbound message");
310                        return;
311                }
312
313                String line;
314                while (!(line = getLine()).startsWith("eof")) {
315                        // log.debug("line: " + line);
316                        inboundMessage.addLine(line);
317                }
318                inboundMessage.eof();
319        }
320
321        protected void processEvent(String rest) {
322                Matcher matcher = Request.EVENT_PATTERN.matcher(rest);
323                if (!matcher.matches()) {
324                        log.error("invalid event line: " + rest);
325                        return;
326                }
327                Subscription<?> subscription = subscriptions.get(matcher.group(1));
328                if (subscription == null) {
329                        log.error("non subscribed event: " + matcher.group(1));
330                        return;
331                }
332                EventFire event = new EventFire(subscription);
333                event.startFile(null);
334                processMessage(event);
335        }
336
337        protected void processMessageStartingWith(String line) {
338                try {
339                        Pair<CharSequence, CharSequence> command = Request.takeIdentifier(line);
340                        if (command.first.equals("event")) {
341                                processEvent(command.second.toString());
342                                return;
343                        }
344                        Pair<Integer, CharSequence> rest = takeRequestId(command.second);
345
346                        if (command.first.equals("file")) {
347                                SentQuery<?> sentQuery = fetchQuery(rest.first, false);
348                                sentQuery.startFile(rest.second.toString());
349                                processMessage(sentQuery);
350                                return;
351                        }
352
353                        SentQuery<?> sentQuery = fetchQuery(rest.first, true);
354                        if (sentQuery == null) {
355                                return;
356                        }
357                        log.debug("parsing response for request " + sentQuery);
358
359                        sentQuery.dispatchResponseProcess(new Response(command.first.equals("ok"), rest.second.toString(), sentQuery.getFiles()));
360                } catch (FramsticksException e) {
361                        throw new FramsticksException().msg("failed to process message").arg("starting with line", line).cause(e);
362                }
363        }
364
365        @Override
366        protected void receiverThreadRoutine() {
367                while (isRunning() && !isConnected()) {
368                        log.debug("connecting to " + address);
369                        try {
370                                socket = new Socket(hostName, port);
371                        } catch (IOException e) {
372                                log.info(this + " failed to connect (retrying): " + e);
373                                Dispatching.sleep(0.5);
374                        }
375                }
376
377                log.debug(this + " connected");
378                try {
379                        socket.setSoTimeout(500);
380                        setupStreams();
381                } catch (Exception e) {
382                        throw new FramsticksException().msg("failed to initialize socket").cause(e).arg("connection", this);
383                }
384
385                connectedFunctor.call();
386
387                while (isRunning() && isConnected()) {
388                        try {
389                                processMessageStartingWith(getLine());
390                        } catch (Exception e) {
391                                log.log(isRunning() ? Level.ERROR : Level.DEBUG, "caught exception: ", e);
392                                break;
393                        }
394                }
395                interrupt();
396                finish();
397        }
398}
Note: See TracBrowser for help on using the repository browser.