source: java/main/src/main/java/com/framsticks/communication/ClientSideManagedConnection.java @ 98

Last change on this file since 98 was 98, checked in by psniegowski, 11 years ago

HIGHLIGHTS:

CHANGELOG:
Get data also on tree expansion.

Use nice framstick icon for empty nodes.

Update panel after reload if it is current.

Add shallow reload procedure.

Cut Gui prefix from several tree classes.

Bring back counter of GuiTreeNode?.

Use IdentityHashMap? were it is more appriopriate.

Remove TreeListener?.

Do not use TreeListener? in GUI.

Minor change.

Done migration to GuiTreeModel?.

BrowserTest? in that version always crashes frams.linux.

Move rendering implementation into GuiAbstractNode?.

Use hand-crafted list in GuiTreeNode?.

Generally, it would be a great place for WeakIdentityHashMap?
(but there is none in Java Collection Framework).

Remove superfluous logging.

Fix bug in GuiTreeNode?.

Use IdentityHashMap? instead of HashMap?.

Improve structure update.

Filter out invalid uids in UniqueListAccess?.

Improve TreeCellRenderer?.

Add filtering in TrackConsole?.

Improve TreeModel?.

More changes.

More improvements.

More changes.

Remove TreeNode?.

Support MetaNode? in the GuiTreeModel?.

Implement more in GuiTreeModel?.

Add CompositeParam? interface to FramsClass? and AccessInterface?.

Allow access by number to UniqueList?.

Add UidComparator?.

Use TreeMap? as a default accessee in unique list.

It keeps order of keys.

Introduce classes to use with new TreeModel?.

Another step.

Migrate from TreeNode? to Node in many places.

Remove some uses of TreeNode? as DefaultMutableTreeNode?.

Remove Path from TreeNode? interface.

Remove Path from TreeNode?.

Add Path recration from node feature.

Reworking TreeCellRenderer?.

Minor change of TreeOperations? interface.

Remove last methods from TreeNode?.

Another minor step.

Do not store reference to TreeAtFrame? in TreeNode?.

Add proxy exceptionHandler to StatusBar?.

Move panels management to TreeAtFrame?.

Store localChanges in the NodeAtFrame?.

More cleanup.

Move name computing to TreeCellRenderer?.

Move tooltip and icon computations to TreeCellRenderer?.

More dispatches removed.

Remove most dispatching from TreeNode?.

TreeNode? does not actually redispatch tasks.

Make Tree embedded in Browser use SwingDispatcher?.

Make lazy binding of Tree with Dispatcher.

Minor changes.

Organizational change in AbstractTree?.

Make AbstractTree? compose from Thread instead of inherit from it.

Make SwingDispatcher? and AtOnceDispatcher? Joinable compatible.

Add ListPanelProvider?.

Improve Controls readonly and enabled handling.

Properly pass ExceptionHandlers? in more places.

Make Tree.get accept ValueParam?.

  • This is to allow access number of list elements.

Remove not needed get redirection in ClientAtServer?.

Rename tryResolve to tryGet.

Unify tryResolveAndGet into tryResolve.

Remove resolveTop from Tree interface.

Make Tree.get accept Future<Path>.

Use get to implement resolveTop also in ObjectTree?.

Unify resolveTop and get in RemoteTree?.

Another minor step.

More minor changes in tree operations.

Minor organizational changes.

In RemoteTree? first fetch info for root.

Reworking resolving.

Minor changes.

Make ListAccess? return proxy iterators (instead of creating temporary collection).

Let AccessInterface? return Iterable<Param>.

Improve resolving.

More improvements.

First working completion in ManagedConsole?.

Rename resolve to resolveTop.

This reflects the actuall functionality.

Change semantic of tryResolve and tryResolveAndGet.

File size: 10.5 KB
Line 
1package com.framsticks.communication;
2
3import com.framsticks.communication.queries.ApplicationRequest;
4import com.framsticks.communication.queries.ProtocolRequest;
5import com.framsticks.communication.queries.RegistrationRequest;
6import com.framsticks.communication.queries.UseRequest;
7import com.framsticks.communication.queries.VersionRequest;
8import com.framsticks.communication.util.LoggingStateCallback;
9import com.framsticks.params.ListSource;
10import com.framsticks.util.*;
11import com.framsticks.util.dispatching.AtOnceDispatcher;
12import com.framsticks.util.dispatching.Dispatcher;
13import com.framsticks.util.dispatching.Dispatching;
14import com.framsticks.util.dispatching.ExceptionResultHandler;
15import com.framsticks.util.dispatching.Future;
16import com.framsticks.util.dispatching.FutureHandler;
17import com.framsticks.util.dispatching.JoinableState;
18import com.framsticks.util.lang.Pair;
19import com.framsticks.util.lang.Strings;
20
21import org.apache.log4j.Logger;
22
23import java.util.*;
24import java.util.regex.Matcher;
25import com.framsticks.util.dispatching.RunAt;
26
27/**
28 * @author Piotr Sniegowski
29 */
30public class ClientSideManagedConnection extends ManagedConnection {
31
32        private final static Logger log = Logger.getLogger(ClientSideManagedConnection.class);
33
34        protected final Map<String, Subscription<?>> subscriptions = new HashMap<>();
35
36        private final List<Runnable> applicationRequestsBuffer = new LinkedList<>();
37        private boolean isHandshakeDone = false;
38
39        /**
40         * @return the requestedVersion
41         */
42        public int getRequestedVersion() {
43                return requestedVersion;
44        }
45
46        /**
47         * @param requestedVersion the requestedVersion to set
48         */
49        public void setRequestedVersion(int requestedVersion) {
50                this.requestedVersion = requestedVersion;
51        }
52
53        protected int requestedVersion = 4;
54
55        public ClientSideManagedConnection() {
56                setDescription("client connection");
57                protocolVersion = -1;
58        }
59
60
61        private static abstract class InboundMessage {
62                protected String currentFilePath;
63                protected List<String> currentFileContent;
64                protected final List<File> files = new ArrayList<File>();
65
66                public abstract void eof();
67
68                protected void initCurrentFile(String path) {
69                        currentFileContent = new LinkedList<String>();
70                        currentFilePath = path;
71                }
72
73                protected void finishCurrentFile() {
74                        if (currentFileContent == null) {
75                                return;
76                        }
77                        files.add(new File(currentFilePath, new ListSource(currentFileContent)));
78                        currentFilePath = null;
79                        currentFileContent = null;
80                }
81
82                public abstract void startFile(String path);
83
84                public final void addLine(String line) {
85                        assert line != null;
86                        assert currentFileContent != null;
87                        currentFileContent.add(line);
88                }
89
90                public List<File> getFiles() {
91                        return files;
92                }
93        }
94
95        private static class EventFire extends InboundMessage {
96                public final Subscription<?> subscription;
97
98                private EventFire(Subscription<?> subscription) {
99                        this.subscription = subscription;
100                }
101
102                public void startFile(String path) {
103                        assert path == null;
104                        initCurrentFile(null);
105                }
106
107                @Override
108                public void eof() {
109                        finishCurrentFile();
110
111                        subscription.dispatchCall(getFiles());
112                }
113        }
114
115        private static class SentQuery<C> extends InboundMessage {
116                Request request;
117                ClientSideResponseFuture callback;
118                Dispatcher<C> dispatcher;
119
120                public void startFile(String path) {
121                        finishCurrentFile();
122                        if (!Strings.notEmpty(path)) {
123                                assert request instanceof ApplicationRequest;
124                                path = ((ApplicationRequest) request).getPath();
125                        }
126                        Strings.assureNotEmpty(path);
127                        initCurrentFile(path);
128                }
129
130                public void eof() {
131                        assert Strings.notEmpty(currentFilePath);
132                        finishCurrentFile();
133                        //no-operation
134                }
135
136                @Override
137                public String toString() {
138                        return request.toString();
139                }
140
141                public void dispatchResponseProcess(final Response response) {
142                        Dispatching.dispatchIfNotActive(dispatcher, new RunAt<C>(callback) {
143                                @Override
144                                protected void runAt() {
145                                        callback.pass(response);
146                                }
147                        });
148                }
149        }
150
151        private Map<Integer, SentQuery<?>> queryMap = new HashMap<>();
152
153        private SentQuery<?> currentlySentQuery;
154
155        public void send(ProtocolRequest request, ClientSideResponseFuture callback) {
156                //TODO RunAt
157                sendImplementation(request, AtOnceDispatcher.getInstance(), callback);
158        }
159
160
161
162        public <C> void send(final ApplicationRequest request, final Dispatcher<C> dispatcher, final ClientSideResponseFuture callback) {
163                synchronized (applicationRequestsBuffer) {
164                        if (!isHandshakeDone) {
165                                applicationRequestsBuffer.add(new Runnable() {
166                                        @Override
167                                        public void run() {
168                                                sendImplementation(request, dispatcher, callback);
169                                        }
170                                });
171                                return;
172                        }
173                }
174                sendImplementation(request, dispatcher, callback);
175        }
176
177        private <C> void sendImplementation(Request request, Dispatcher<C> dispatcher, ClientSideResponseFuture callback) {
178                callback.setRequest(request);
179
180                if (getState().ordinal() > JoinableState.RUNNING.ordinal()) {
181                        log.fatal("not connected");
182                        return;
183                }
184
185                final SentQuery<C> sentQuery = new SentQuery<C>();
186                sentQuery.request = request;
187                sentQuery.callback = callback;
188                sentQuery.dispatcher = dispatcher;
189
190                senderThread.dispatch(new RunAt<Connection>(callback) {
191                        @Override
192                        protected void runAt() {
193                                Integer id;
194                                synchronized (ClientSideManagedConnection.this) {
195
196                                        while (!(requestIdEnabled || currentlySentQuery == null)) {
197                                                try {
198                                                        ClientSideManagedConnection.this.wait();
199                                                } catch (InterruptedException ignored) {
200                                                        break;
201                                                }
202                                        }
203                                        if (requestIdEnabled) {
204                                                queryMap.put(nextQueryId, sentQuery);
205                                                id = nextQueryId++;
206                                        } else {
207                                                currentlySentQuery = sentQuery;
208                                                id = null;
209                                        }
210                                }
211                                String command = sentQuery.request.getCommand();
212                                StringBuilder message = new StringBuilder();
213                                message.append(command);
214                                if (id != null) {
215                                        message.append(" ").append(id);
216                                }
217                                message.append(" ");
218                                sentQuery.request.construct(message);
219                                String out = message.toString();
220
221                                putLine(out);
222                                flushOut();
223                                log.debug("sending query: " + out);
224
225                        }
226                });
227                /*
228                synchronized (this) {
229                        log.debug("queueing query: " + query);
230                        queryQueue.offer(sentQuery);
231                        notifyAll();
232                }
233                 */
234        }
235
236        @Override
237        public String toString() {
238                return "client connection " + address;
239        }
240
241        public <C> void subscribe(final String path, final Dispatcher<C> dispatcher, final SubscriptionCallback<? extends C> callback) {
242                send(new RegistrationRequest().path(path), AtOnceDispatcher.getInstance(), new ClientSideResponseFuture(callback) {
243                        @Override
244                        protected void processOk(Response response) {
245                                assert response.getFiles().isEmpty();
246                                Subscription<C> subscription = new Subscription<C>(ClientSideManagedConnection.this, path, response.getComment(), dispatcher);
247                                log.debug("registered on event: " + subscription);
248                                synchronized (subscriptions) {
249                                        subscriptions.put(subscription.getRegisteredPath(), subscription);
250                                }
251                                subscription.setEventCallback(callback.subscribed(subscription));
252                                if (subscription.getEventCallback() == null) {
253                                        log.info("subscription for " + path + " aborted");
254                                        subscription.unsubscribe(new LoggingStateCallback(log, "abort subscription"));
255                                }
256                        }
257                });
258        }
259
260        private void sendQueryVersion(final int version, final Future<Void> future) {
261                send(new VersionRequest().version(version), new ClientSideResponseFuture(future) {
262                        @Override
263                        protected void processOk(Response response) {
264                                protocolVersion = version;
265                                if (version < requestedVersion) {
266                                        /** it is an implicit loop here*/
267                                        sendQueryVersion(version + 1, future);
268                                        return;
269                                }
270                                send(new UseRequest().feature("request_id"), new ClientSideResponseFuture(future) {
271
272                                        @Override
273                                        protected void processOk(Response response) {
274                                                requestIdEnabled = true;
275                                                future.pass(null);
276                                        }
277                                });
278
279                        }
280                });
281        }
282
283        private synchronized SentQuery<?> fetchQuery(Integer id, boolean remove) {
284                if (id == null) {
285                        if (requestIdEnabled) {
286                                return null;
287                        }
288                        SentQuery<?> result = currentlySentQuery;
289                        if (remove) {
290                                currentlySentQuery = null;
291                                notifyAll();
292                        }
293                        return result;
294                }
295                if (queryMap.containsKey(id)) {
296                        SentQuery<?> result = queryMap.get(id);
297                        if (remove) {
298                                queryMap.remove(id);
299                        }
300                        return result;
301                }
302                return null;
303        }
304
305        private int nextQueryId = 0;
306
307        protected void processMessage(InboundMessage inboundMessage) {
308                if (inboundMessage == null) {
309                        log.error("failed to use any inbound message");
310                        return;
311                }
312
313                String line;
314                while (!(line = getLine()).startsWith("eof")) {
315                        // log.debug("line: " + line);
316                        inboundMessage.addLine(line);
317                }
318                inboundMessage.eof();
319        }
320
321        protected void processEvent(String rest) {
322                Matcher matcher = Request.EVENT_PATTERN.matcher(rest);
323                if (!matcher.matches()) {
324                        log.error("invalid event line: " + rest);
325                        return;
326                }
327                Subscription<?> subscription = subscriptions.get(matcher.group(1));
328                if (subscription == null) {
329                        log.error("non subscribed event: " + matcher.group(1));
330                        return;
331                }
332                EventFire event = new EventFire(subscription);
333                event.startFile(null);
334                processMessage(event);
335        }
336
337        protected void processMessageStartingWith(String line) {
338                try {
339                        Pair<CharSequence, CharSequence> command = Request.takeIdentifier(line);
340                        if (command.first.equals("event")) {
341                                processEvent(command.second.toString());
342                                return;
343                        }
344                        Pair<Integer, CharSequence> rest = takeRequestId(command.second);
345
346                        if (command.first.equals("file")) {
347                                SentQuery<?> sentQuery = fetchQuery(rest.first, false);
348                                sentQuery.startFile(rest.second.toString());
349                                processMessage(sentQuery);
350                                return;
351                        }
352
353                        SentQuery<?> sentQuery = fetchQuery(rest.first, true);
354                        if (sentQuery == null) {
355                                return;
356                        }
357                        log.debug("parsing response for request " + sentQuery);
358
359                        sentQuery.dispatchResponseProcess(new Response(command.first.equals("ok"), rest.second.toString(), sentQuery.getFiles()));
360                } catch (FramsticksException e) {
361                        throw new FramsticksException().msg("failed to process message").arg("starting with line", line).cause(e);
362                }
363        }
364
365        protected final ExceptionResultHandler closeOnFailure = new ExceptionResultHandler() {
366
367                @Override
368                public void handle(FramsticksException exception) {
369                        interrupt();
370                        // finish();
371                }
372        };
373
374        @Override
375        protected void receiverThreadRoutine() {
376                startClientConnection(this);
377
378                sendQueryVersion(1, new FutureHandler<Void>(closeOnFailure) {
379
380                        @Override
381                        protected void result(Void result) {
382                                synchronized (applicationRequestsBuffer) {
383                                        isHandshakeDone = true;
384                                        for (Runnable r : applicationRequestsBuffer) {
385                                                r.run();
386                                        }
387                                        applicationRequestsBuffer.clear();
388                                }
389                        }
390                });
391
392                processInputBatchesUntilClosed();
393        }
394
395        protected void processNextInputBatch() {
396                processMessageStartingWith(getLine());
397        }
398
399}
Note: See TracBrowser for help on using the repository browser.