source: java/main/src/main/java/com/framsticks/communication/ClientSideManagedConnection.java @ 97

Last change on this file since 97 was 97, checked in by psniegowski, 11 years ago

HIGHLIGHTS:

  • add proper exception passing between communication sides:

if exception occur during handling client request, it is
automatically passed as comment to error response.

it may be used to snoop communication between peers

  • fix algorithm choosing text controls in GUI
  • allow GUI testing in virtual frame buffer (xvfb)

FEST had some problem with xvfb but workaround was found

supports tab-completion based on requests history

CHANGELOG:
Further improve handling of exceptions in GUI.

Add StatusBar? implementing ExceptionResultHandler?.

Make completion processing asynchronous.

Minor changes.

Improve completion in console.

Improve history in InteractiveConsole?.

First working version of DirectConsole?.

Minor changes.

Make Connection.address non final.

It is more suitable to use in configuration.

Improvement of consoles.

Improve PopupMenu? and closing of FrameJoinable?.

Fix BrowserTest?.

Found bug with FEST running under xvfb.

JButtonFixture.click() is not working under xvfb.
GuiTest? has wrapper which uses JButton.doClick() directly.

Store CompositeParam? param in TreeNode?.

Simplify ClientSideManagedConnection? connecting.

There is now connectedFunctor needed, ApplicationRequests? can be
send right after creation. They are buffered until the version
and features are negotiated.

Narow down interface of ClientSideManagedConnection?.

Allow that connection specialization send only
ApplicationRequests?.

Improve policy of text control choosing.

Change name of Genotype in BrowserTest?.

Make BrowserTest? change name of Genotype.

Minor change.

First working draft of TrackConsole?.

Simplify Consoles.

More improvements with gui joinables.

Unify initialization on gui joinables.

More rework of Frame based entities.

Refactorize structure of JFrames based entities.

Extract GuiTest? from BrowserBaseTest?.

Reorganize Console classes structure.

Add Collection view to JoinableCollection?.

Configure timeout in testing.

Minor changes.

Rework connections hierarchy.

Add Mode to the get operation.

Make get and set in Tree take PrimitiveParam?.

Unify naming of operations.

Make RunAt? use the given ExceptionHandler?.

It wraps the virtual runAt() method call with
try-catch passing exception to handler.

Force RunAt? to include ExceptionHandler?.

Improve ClientAtServer?.

Minor change.

Another sweep with FindBugs?.

Rename Instance to Tree.

Minor changes.

Minor changes.

Further clarify semantics of Futures.

Add FutureHandler?.

FutureHandler? is refinement of Future, that proxifies
exception handling to ExceptionResultHandler? given
at construction time.

Remove StateFunctor? (use Future<Void> instead).

Make Connection use Future<Void>.

Unparametrize *ResponseFuture?.

Remove StateCallback? not needed anymore.

Distinguish between sides of ResponseFuture?.

Base ResponseCallback? on Future (now ResponseFuture?).

Make asynchronous store taking Future for flags.

Implement storeValue in ObjectInstance?.

File size: 10.5 KB
Line 
1package com.framsticks.communication;
2
3import com.framsticks.communication.queries.ApplicationRequest;
4import com.framsticks.communication.queries.ProtocolRequest;
5import com.framsticks.communication.queries.RegistrationRequest;
6import com.framsticks.communication.queries.UseRequest;
7import com.framsticks.communication.queries.VersionRequest;
8import com.framsticks.communication.util.LoggingStateCallback;
9import com.framsticks.params.ListSource;
10import com.framsticks.util.*;
11import com.framsticks.util.dispatching.AtOnceDispatcher;
12import com.framsticks.util.dispatching.Dispatcher;
13import com.framsticks.util.dispatching.Dispatching;
14import com.framsticks.util.dispatching.ExceptionResultHandler;
15import com.framsticks.util.dispatching.Future;
16import com.framsticks.util.dispatching.FutureHandler;
17import com.framsticks.util.dispatching.JoinableState;
18import com.framsticks.util.lang.Pair;
19import com.framsticks.util.lang.Strings;
20
21import org.apache.log4j.Logger;
22
23import java.util.*;
24import java.util.regex.Matcher;
25import com.framsticks.util.dispatching.RunAt;
26
27/**
28 * @author Piotr Sniegowski
29 */
30public class ClientSideManagedConnection extends ManagedConnection {
31
32        private final static Logger log = Logger.getLogger(ClientSideManagedConnection.class);
33
34        protected final Map<String, Subscription<?>> subscriptions = new HashMap<>();
35
36        private final List<Runnable> applicationRequestsBuffer = new LinkedList<>();
37        private boolean isHandshakeDone = false;
38
39        /**
40         * @return the requestedVersion
41         */
42        public int getRequestedVersion() {
43                return requestedVersion;
44        }
45
46        /**
47         * @param requestedVersion the requestedVersion to set
48         */
49        public void setRequestedVersion(int requestedVersion) {
50                this.requestedVersion = requestedVersion;
51        }
52
53        protected int requestedVersion = 4;
54
55        public ClientSideManagedConnection() {
56                setDescription("client connection");
57                protocolVersion = -1;
58        }
59
60
61        private static abstract class InboundMessage {
62                protected String currentFilePath;
63                protected List<String> currentFileContent;
64                protected final List<File> files = new ArrayList<File>();
65
66                public abstract void eof();
67
68                protected void initCurrentFile(String path) {
69                        currentFileContent = new LinkedList<String>();
70                        currentFilePath = path;
71                }
72
73                protected void finishCurrentFile() {
74                        if (currentFileContent == null) {
75                                return;
76                        }
77                        files.add(new File(currentFilePath, new ListSource(currentFileContent)));
78                        currentFilePath = null;
79                        currentFileContent = null;
80                }
81
82                public abstract void startFile(String path);
83
84                public final void addLine(String line) {
85                        assert line != null;
86                        assert currentFileContent != null;
87                        currentFileContent.add(line);
88                }
89
90                public List<File> getFiles() {
91                        return files;
92                }
93        }
94
95        private static class EventFire extends InboundMessage {
96                public final Subscription<?> subscription;
97
98                private EventFire(Subscription<?> subscription) {
99                        this.subscription = subscription;
100                }
101
102                public void startFile(String path) {
103                        assert path == null;
104                        initCurrentFile(null);
105                }
106
107                @Override
108                public void eof() {
109                        finishCurrentFile();
110
111                        subscription.dispatchCall(getFiles());
112                }
113        }
114
115        private static class SentQuery<C> extends InboundMessage {
116                Request request;
117                ClientSideResponseFuture callback;
118                Dispatcher<C> dispatcher;
119
120                public void startFile(String path) {
121                        finishCurrentFile();
122                        if (!Strings.notEmpty(path)) {
123                                assert request instanceof ApplicationRequest;
124                                path = ((ApplicationRequest) request).getPath();
125                        }
126                        Strings.assureNotEmpty(path);
127                        initCurrentFile(path);
128                }
129
130                public void eof() {
131                        assert Strings.notEmpty(currentFilePath);
132                        finishCurrentFile();
133                        //no-operation
134                }
135
136                @Override
137                public String toString() {
138                        return request.toString();
139                }
140
141                public void dispatchResponseProcess(final Response response) {
142                        Dispatching.dispatchIfNotActive(dispatcher, new RunAt<C>(callback) {
143                                @Override
144                                protected void runAt() {
145                                        callback.pass(response);
146                                }
147                        });
148                }
149        }
150
151        private Map<Integer, SentQuery<?>> queryMap = new HashMap<>();
152
153        private SentQuery<?> currentlySentQuery;
154
155        public void send(ProtocolRequest request, ClientSideResponseFuture callback) {
156                //TODO RunAt
157                sendImplementation(request, AtOnceDispatcher.getInstance(), callback);
158        }
159
160
161
162        public <C> void send(final ApplicationRequest request, final Dispatcher<C> dispatcher, final ClientSideResponseFuture callback) {
163                synchronized (applicationRequestsBuffer) {
164                        if (!isHandshakeDone) {
165                                applicationRequestsBuffer.add(new Runnable() {
166                                        @Override
167                                        public void run() {
168                                                sendImplementation(request, dispatcher, callback);
169                                        }
170                                });
171                                return;
172                        }
173                }
174                sendImplementation(request, dispatcher, callback);
175        }
176
177        private <C> void sendImplementation(Request request, Dispatcher<C> dispatcher, ClientSideResponseFuture callback) {
178
179                if (getState().ordinal() > JoinableState.RUNNING.ordinal()) {
180                        log.fatal("not connected");
181                        return;
182                }
183
184                final SentQuery<C> sentQuery = new SentQuery<C>();
185                sentQuery.request = request;
186                sentQuery.callback = callback;
187                sentQuery.dispatcher = dispatcher;
188
189                senderThread.dispatch(new RunAt<Connection>(callback) {
190                        @Override
191                        protected void runAt() {
192                                Integer id;
193                                synchronized (ClientSideManagedConnection.this) {
194
195                                        while (!(requestIdEnabled || currentlySentQuery == null)) {
196                                                try {
197                                                        ClientSideManagedConnection.this.wait();
198                                                } catch (InterruptedException ignored) {
199                                                        break;
200                                                }
201                                        }
202                                        if (requestIdEnabled) {
203                                                queryMap.put(nextQueryId, sentQuery);
204                                                id = nextQueryId++;
205                                        } else {
206                                                currentlySentQuery = sentQuery;
207                                                id = null;
208                                        }
209                                }
210                                String command = sentQuery.request.getCommand();
211                                StringBuilder message = new StringBuilder();
212                                message.append(command);
213                                if (id != null) {
214                                        message.append(" ").append(id);
215                                }
216                                message.append(" ");
217                                sentQuery.request.construct(message);
218                                String out = message.toString();
219
220                                putLine(out);
221                                flushOut();
222                                log.debug("sending query: " + out);
223
224                        }
225                });
226                /*
227                synchronized (this) {
228                        log.debug("queueing query: " + query);
229                        queryQueue.offer(sentQuery);
230                        notifyAll();
231                }
232                 */
233        }
234
235        @Override
236        public String toString() {
237                return "client connection " + address;
238        }
239
240        public <C> void subscribe(final String path, final Dispatcher<C> dispatcher, final SubscriptionCallback<? extends C> callback) {
241                send(new RegistrationRequest().path(path), AtOnceDispatcher.getInstance(), new ClientSideResponseFuture(callback) {
242                        @Override
243                        protected void processOk(Response response) {
244                                assert response.getFiles().isEmpty();
245                                Subscription<C> subscription = new Subscription<C>(ClientSideManagedConnection.this, path, response.getComment(), dispatcher);
246                                log.debug("registered on event: " + subscription);
247                                synchronized (subscriptions) {
248                                        subscriptions.put(subscription.getRegisteredPath(), subscription);
249                                }
250                                subscription.setEventCallback(callback.subscribed(subscription));
251                                if (subscription.getEventCallback() == null) {
252                                        log.info("subscription for " + path + " aborted");
253                                        subscription.unsubscribe(new LoggingStateCallback(log, "abort subscription"));
254                                }
255                        }
256                });
257        }
258
259        private void sendQueryVersion(final int version, final Future<Void> future) {
260                send(new VersionRequest().version(version), new ClientSideResponseFuture(future) {
261                        @Override
262                        protected void processOk(Response response) {
263                                protocolVersion = version;
264                                if (version < requestedVersion) {
265                                        /** it is an implicit loop here*/
266                                        sendQueryVersion(version + 1, future);
267                                        return;
268                                }
269                                send(new UseRequest().feature("request_id"), new ClientSideResponseFuture(future) {
270
271                                        @Override
272                                        protected void processOk(Response response) {
273                                                requestIdEnabled = true;
274                                                future.pass(null);
275                                        }
276                                });
277
278                        }
279                });
280        }
281
282        private synchronized SentQuery<?> fetchQuery(Integer id, boolean remove) {
283                if (id == null) {
284                        if (requestIdEnabled) {
285                                return null;
286                        }
287                        SentQuery<?> result = currentlySentQuery;
288                        if (remove) {
289                                currentlySentQuery = null;
290                                notifyAll();
291                        }
292                        return result;
293                }
294                if (queryMap.containsKey(id)) {
295                        SentQuery<?> result = queryMap.get(id);
296                        if (remove) {
297                                queryMap.remove(id);
298                        }
299                        return result;
300                }
301                return null;
302        }
303
304        private int nextQueryId = 0;
305
306        protected void processMessage(InboundMessage inboundMessage) {
307                if (inboundMessage == null) {
308                        log.error("failed to use any inbound message");
309                        return;
310                }
311
312                String line;
313                while (!(line = getLine()).startsWith("eof")) {
314                        // log.debug("line: " + line);
315                        inboundMessage.addLine(line);
316                }
317                inboundMessage.eof();
318        }
319
320        protected void processEvent(String rest) {
321                Matcher matcher = Request.EVENT_PATTERN.matcher(rest);
322                if (!matcher.matches()) {
323                        log.error("invalid event line: " + rest);
324                        return;
325                }
326                Subscription<?> subscription = subscriptions.get(matcher.group(1));
327                if (subscription == null) {
328                        log.error("non subscribed event: " + matcher.group(1));
329                        return;
330                }
331                EventFire event = new EventFire(subscription);
332                event.startFile(null);
333                processMessage(event);
334        }
335
336        protected void processMessageStartingWith(String line) {
337                try {
338                        Pair<CharSequence, CharSequence> command = Request.takeIdentifier(line);
339                        if (command.first.equals("event")) {
340                                processEvent(command.second.toString());
341                                return;
342                        }
343                        Pair<Integer, CharSequence> rest = takeRequestId(command.second);
344
345                        if (command.first.equals("file")) {
346                                SentQuery<?> sentQuery = fetchQuery(rest.first, false);
347                                sentQuery.startFile(rest.second.toString());
348                                processMessage(sentQuery);
349                                return;
350                        }
351
352                        SentQuery<?> sentQuery = fetchQuery(rest.first, true);
353                        if (sentQuery == null) {
354                                return;
355                        }
356                        log.debug("parsing response for request " + sentQuery);
357
358                        sentQuery.dispatchResponseProcess(new Response(command.first.equals("ok"), rest.second.toString(), sentQuery.getFiles()));
359                } catch (FramsticksException e) {
360                        throw new FramsticksException().msg("failed to process message").arg("starting with line", line).cause(e);
361                }
362        }
363
364        protected final ExceptionResultHandler closeOnFailure = new ExceptionResultHandler() {
365
366                @Override
367                public void handle(FramsticksException exception) {
368                        interrupt();
369                        // finish();
370                }
371        };
372
373        @Override
374        protected void receiverThreadRoutine() {
375                startClientConnection(this);
376
377                sendQueryVersion(1, new FutureHandler<Void>(closeOnFailure) {
378
379                        @Override
380                        protected void result(Void result) {
381                                synchronized (applicationRequestsBuffer) {
382                                        isHandshakeDone = true;
383                                        for (Runnable r : applicationRequestsBuffer) {
384                                                r.run();
385                                        }
386                                        applicationRequestsBuffer.clear();
387                                }
388                        }
389                });
390
391                processInputBatchesUntilClosed();
392        }
393
394        protected void processNextInputBatch() {
395                processMessageStartingWith(getLine());
396        }
397
398}
Note: See TracBrowser for help on using the repository browser.