source: java/main/src/main/java/com/framsticks/communication/ClientSideManagedConnection.java @ 99

Last change on this file since 99 was 99, checked in by psniegowski, 11 years ago

HIGHLIGTS:

  • complete events implementation
  • add CLI in Java Framsticks server
  • add automatic registration for events in GUI
  • improve objects fetching (object are never overwritten with new instances)
  • properly react for ListChange? events
  • add ListPanel? with table view
    • columns to be shown may be statically specified in configuration
    • currently modyfying data through tables is not available
  • improve maven configuration
    • configuration file may be specified without touching pom.xml

CHANGELOG:
Extract constants from Flags into ParamFlags? and SetStateFlags?.

Extract flags I/O to FlagsUtils? class.

Configured maven to exec given resource configuration.

For example:
mvn exec:exec -Dframsticks.config=/configs/managed-console.xml

Cleanup pom.xml

Rename ObjectTree? to LocalTree? (also make LocalTree? and RemoteTree? final).

Minor change.

Add maximum number of columns in ListPanelProvider?.

Improve ColumnsConfig? interpretation.

Automatically fill FramsClass?.name if trying to construct empty.

Improve identitifer case mangling in XmlLoader?.

Introduce configurable ColumnsConfig?.

Draft working version of ListPanel?.

Table is being shown (although empty).

More improvements to table building.

Move some functionality from Frame to TreeModel?.

Move tree classes in gui to separate package.

Remove old table related classes.

Add draft implementation of TableModel?.

Redirect ParamBuilder?.forAccess to AccessInterface?.

Optimize ParamBuilder?.forAccess()

Do not clear list when loading.

Do not load fetched values directly.

Implement different AccessInterface? copying policy.

Optimize fetching values routine.

Remove Mode enum (work out get semantics).

Some improvements to ListChange? handling.

Improve UniqueListAccess?.

Add reaction for ListChanges? in the TreeNode?.

EventListeners? are being added in the TreeNode?.

Listeners for ListParams? are now very naive (they download
whole list).

Automatially register on events in GUI.

Events are working in RemoteTree? and Server.

Move listeners to the ClientSideManagedConnection?.

Remove old classes responsible for event subscriptions.

Improve event reading.

Improve events handling at server side.

Add register attribute in FramsClassAnnotation?
to automatically also register other classes.

Registering events works.

Setup for remote listeners registration.

More improvements.

Minor changes.

Add rootTree to the ClientAtServer?.

Moving CLI to the ClientAtServer?.

Fix bug: use Void.TYPE instead of Void.class

More development around CLI.

  • Improve Path resolving.

Add synthetic root to ObjectTree?.

It is needed to allow sybling for the original root
that would containg CLI.

Some work with registering events in RemoteTree?.

Draft implementation of listener registering in RemoteTree?.

Support events registration in the ObjectTree?.

Add events support to ReflectionAccess?.

EventParam? is recognized by ParamCandidate?.

Prepare interface for Events across project.

Add EventListener? and API for listeners in Tree.

File size: 11.3 KB
Line 
1package com.framsticks.communication;
2
3import com.framsticks.communication.queries.ApplicationRequest;
4import com.framsticks.communication.queries.CallRequest;
5import com.framsticks.communication.queries.ProtocolRequest;
6import com.framsticks.communication.queries.RegisterRequest;
7import com.framsticks.communication.queries.UseRequest;
8import com.framsticks.communication.queries.VersionRequest;
9import com.framsticks.core.Path;
10import com.framsticks.params.ListSource;
11import com.framsticks.util.*;
12import com.framsticks.util.dispatching.AtOnceDispatcher;
13import com.framsticks.util.dispatching.Dispatcher;
14import com.framsticks.util.dispatching.Dispatching;
15import com.framsticks.util.dispatching.ExceptionResultHandler;
16import com.framsticks.util.dispatching.Future;
17import com.framsticks.util.dispatching.FutureHandler;
18import com.framsticks.util.dispatching.JoinableState;
19import com.framsticks.util.lang.Pair;
20import com.framsticks.util.lang.Strings;
21import com.framsticks.params.EventListener;
22
23import org.apache.log4j.Logger;
24
25import java.util.*;
26import java.util.regex.Matcher;
27import com.framsticks.util.dispatching.RunAt;
28
29/**
30 * @author Piotr Sniegowski
31 */
32public class ClientSideManagedConnection extends ManagedConnection {
33
34        private final static Logger log = Logger.getLogger(ClientSideManagedConnection.class);
35
36        private final List<Runnable> applicationRequestsBuffer = new LinkedList<>();
37        private boolean isHandshakeDone = false;
38
39
40        /**
41         * @return the requestedVersion
42         */
43        public int getRequestedVersion() {
44                return requestedVersion;
45        }
46
47        /**
48         * @param requestedVersion the requestedVersion to set
49         */
50        public void setRequestedVersion(int requestedVersion) {
51                this.requestedVersion = requestedVersion;
52        }
53
54        protected int requestedVersion = 4;
55
56        public ClientSideManagedConnection() {
57                setDescription("client connection");
58                protocolVersion = -1;
59        }
60
61
62        private static abstract class InboundMessage {
63                protected String currentFilePath;
64                protected List<String> currentFileContent;
65                protected final List<File> files = new ArrayList<File>();
66
67                public abstract void eof();
68
69                protected void initCurrentFile(String path) {
70                        currentFileContent = new LinkedList<String>();
71                        currentFilePath = path;
72                }
73
74                protected void finishCurrentFile() {
75                        if (currentFileContent == null) {
76                                return;
77                        }
78                        files.add(new File(currentFilePath, new ListSource(currentFileContent)));
79                        currentFilePath = null;
80                        currentFileContent = null;
81                }
82
83                public abstract void startFile(String path);
84
85                public final void addLine(String line) {
86                        assert line != null;
87                        assert currentFileContent != null;
88                        currentFileContent.add(line);
89                }
90
91                public List<File> getFiles() {
92                        return files;
93                }
94        }
95
96        protected List<String> readFileContent() {
97                List<String> content = new LinkedList<String>();
98                String line;
99                while (!(line = getLine()).startsWith("eof")) {
100                        content.add(line);
101                }
102                return content;
103        }
104
105        private static class SentQuery<C> extends InboundMessage {
106                Request request;
107                ClientSideResponseFuture callback;
108                Dispatcher<C> dispatcher;
109
110                public void startFile(String path) {
111                        finishCurrentFile();
112                        if (!Strings.notEmpty(path)) {
113                                assert request instanceof ApplicationRequest;
114                                path = ((ApplicationRequest) request).getPath();
115                        }
116                        Strings.assureNotEmpty(path);
117                        initCurrentFile(path);
118                }
119
120                public void eof() {
121                        assert Strings.notEmpty(currentFilePath);
122                        finishCurrentFile();
123                        //no-operation
124                }
125
126                @Override
127                public String toString() {
128                        return request.toString();
129                }
130
131                public void dispatchResponseProcess(final Response response) {
132                        Dispatching.dispatchIfNotActive(dispatcher, new RunAt<C>(callback) {
133                                @Override
134                                protected void runAt() {
135                                        callback.pass(response);
136                                }
137                        });
138                }
139        }
140
141        private Map<Integer, SentQuery<?>> queryMap = new HashMap<>();
142
143        private SentQuery<?> currentlySentQuery;
144
145        public void send(ProtocolRequest request, ClientSideResponseFuture callback) {
146                //TODO RunAt
147                sendImplementation(request, AtOnceDispatcher.getInstance(), callback);
148        }
149
150
151
152        public <C> void send(final ApplicationRequest request, final Dispatcher<C> dispatcher, final ClientSideResponseFuture callback) {
153                synchronized (applicationRequestsBuffer) {
154                        if (!isHandshakeDone) {
155                                applicationRequestsBuffer.add(new Runnable() {
156                                        @Override
157                                        public void run() {
158                                                sendImplementation(request, dispatcher, callback);
159                                        }
160                                });
161                                return;
162                        }
163                }
164                sendImplementation(request, dispatcher, callback);
165        }
166
167        private <C> void sendImplementation(Request request, Dispatcher<C> dispatcher, ClientSideResponseFuture callback) {
168                callback.setRequest(request);
169
170                if (getState().ordinal() > JoinableState.RUNNING.ordinal()) {
171                        log.fatal("not connected");
172                        return;
173                }
174
175                final SentQuery<C> sentQuery = new SentQuery<C>();
176                sentQuery.request = request;
177                sentQuery.callback = callback;
178                sentQuery.dispatcher = dispatcher;
179
180                senderThread.dispatch(new RunAt<Connection>(callback) {
181                        @Override
182                        protected void runAt() {
183                                Integer id;
184                                synchronized (ClientSideManagedConnection.this) {
185
186                                        while (!(requestIdEnabled || currentlySentQuery == null)) {
187                                                try {
188                                                        ClientSideManagedConnection.this.wait();
189                                                } catch (InterruptedException ignored) {
190                                                        break;
191                                                }
192                                        }
193                                        if (requestIdEnabled) {
194                                                queryMap.put(nextQueryId, sentQuery);
195                                                id = nextQueryId++;
196                                        } else {
197                                                currentlySentQuery = sentQuery;
198                                                id = null;
199                                        }
200                                }
201                                String command = sentQuery.request.getCommand();
202                                StringBuilder message = new StringBuilder();
203                                message.append(command);
204                                if (id != null) {
205                                        message.append(" ").append(id);
206                                }
207                                message.append(" ");
208                                sentQuery.request.construct(message);
209                                String out = message.toString();
210
211                                putLine(out);
212                                flushOut();
213                                log.debug("sending query: " + out);
214
215                        }
216                });
217                /*
218                synchronized (this) {
219                        log.debug("queueing query: " + query);
220                        queryQueue.offer(sentQuery);
221                        notifyAll();
222                }
223                 */
224        }
225
226        @Override
227        public String toString() {
228                return "client connection " + address;
229        }
230
231
232        private void sendQueryVersion(final int version, final Future<Void> future) {
233                send(new VersionRequest().version(version), new ClientSideResponseFuture(future) {
234                        @Override
235                        protected void processOk(Response response) {
236                                protocolVersion = version;
237                                if (version < requestedVersion) {
238                                        /** it is an implicit loop here*/
239                                        sendQueryVersion(version + 1, future);
240                                        return;
241                                }
242                                send(new UseRequest().feature("request_id"), new ClientSideResponseFuture(future) {
243
244                                        @Override
245                                        protected void processOk(Response response) {
246                                                requestIdEnabled = true;
247                                                future.pass(null);
248                                        }
249                                });
250
251                        }
252                });
253        }
254
255        private synchronized SentQuery<?> fetchQuery(Integer id, boolean remove) {
256                if (id == null) {
257                        if (requestIdEnabled) {
258                                return null;
259                        }
260                        SentQuery<?> result = currentlySentQuery;
261                        if (remove) {
262                                currentlySentQuery = null;
263                                notifyAll();
264                        }
265                        return result;
266                }
267                if (queryMap.containsKey(id)) {
268                        SentQuery<?> result = queryMap.get(id);
269                        if (remove) {
270                                queryMap.remove(id);
271                        }
272                        return result;
273                }
274                return null;
275        }
276
277        private int nextQueryId = 0;
278
279        protected void processMessage(InboundMessage inboundMessage) {
280                if (inboundMessage == null) {
281                        log.error("failed to use any inbound message");
282                        return;
283                }
284
285                String line;
286                while (!(line = getLine()).startsWith("eof")) {
287                        // log.debug("line: " + line);
288                        inboundMessage.addLine(line);
289                }
290                inboundMessage.eof();
291        }
292
293        protected void processEvent(String rest) {
294                Matcher matcher = Request.EVENT_PATTERN.matcher(rest);
295                if (!matcher.matches()) {
296                        throw new FramsticksException().msg("invalid event line").arg("rest", rest);
297                }
298                String fileLine = getLine();
299                if (!fileLine.equals("file")) {
300                        throw new FramsticksException().msg("expected file line").arg("got", fileLine);
301                }
302                String eventObjectPath = Request.takeGroup(rest, matcher, 1).toString();
303                String eventCalleePath = Request.takeGroup(rest, matcher, 2).toString();
304                final File file = new File("", new ListSource(readFileContent()));
305                log.debug("firing event " + eventObjectPath);
306                EventListener<File> listener;
307                synchronized (registeredListeners) {
308                        listener = registeredListeners.get(eventObjectPath);
309                }
310                if (listener  == null) {
311                        throw new FramsticksException().msg("failed to find registered event").arg("event path", eventObjectPath).arg("object", eventCalleePath);
312                }
313                listener.action(file);
314        }
315
316        protected void processMessageStartingWith(String line) {
317                try {
318                        Pair<CharSequence, CharSequence> command = Request.takeIdentifier(line);
319                        if (command.first.equals("event")) {
320                                processEvent(command.second.toString());
321                                return;
322                        }
323                        Pair<Integer, CharSequence> rest = takeRequestId(command.second);
324
325                        if (command.first.equals("file")) {
326                                SentQuery<?> sentQuery = fetchQuery(rest.first, false);
327                                sentQuery.startFile(rest.second.toString());
328                                processMessage(sentQuery);
329                                return;
330                        }
331
332                        SentQuery<?> sentQuery = fetchQuery(rest.first, true);
333                        if (sentQuery == null) {
334                                return;
335                        }
336                        log.debug("parsing response for request " + sentQuery);
337
338                        sentQuery.dispatchResponseProcess(new Response(command.first.equals("ok"), rest.second.toString(), sentQuery.getFiles()));
339                } catch (FramsticksException e) {
340                        throw new FramsticksException().msg("failed to process message").arg("starting with line", line).cause(e);
341                }
342        }
343
344        protected final ExceptionResultHandler closeOnFailure = new ExceptionResultHandler() {
345
346                @Override
347                public void handle(FramsticksException exception) {
348                        interrupt();
349                        // finish();
350                }
351        };
352
353        @Override
354        protected void receiverThreadRoutine() {
355                startClientConnection(this);
356
357                sendQueryVersion(1, new FutureHandler<Void>(closeOnFailure) {
358
359                        @Override
360                        protected void result(Void result) {
361                                synchronized (applicationRequestsBuffer) {
362                                        isHandshakeDone = true;
363                                        for (Runnable r : applicationRequestsBuffer) {
364                                                r.run();
365                                        }
366                                        applicationRequestsBuffer.clear();
367                                }
368                        }
369                });
370
371                processInputBatchesUntilClosed();
372        }
373
374        protected void processNextInputBatch() {
375                processMessageStartingWith(getLine());
376        }
377
378        protected final Map<String, EventListener<File>> registeredListeners = new HashMap<>();
379
380        public <C> void addListener(String path, final EventListener<File> listener, final Dispatcher<C> dispatcher, final Future<Void> future) {
381                send(new RegisterRequest().path(path), dispatcher, new ClientSideResponseFuture(future) {
382                        @Override
383                        protected void processOk(Response response) {
384                                synchronized (registeredListeners) {
385                                        registeredListeners.put(Path.validateString(response.getComment()), listener);
386                                }
387                                future.pass(null);
388                        }
389                });
390        }
391
392        public <C> void removeListener(EventListener<File> listener, final Dispatcher<C> dispatcher, final Future<Void> future) {
393                String eventPath = null;
394                synchronized (registeredListeners) {
395                        for (Map.Entry<String, EventListener<File>> e : registeredListeners.entrySet()) {
396                                if (e.getValue() == listener) {
397                                        eventPath = e.getKey();
398                                        break;
399                                }
400                        }
401                }
402                if (eventPath == null) {
403                        future.handle(new FramsticksException().msg("listener is not registered").arg("listener", listener));
404                        return;
405                }
406
407                final String finalEventPath = eventPath;
408                                //TODO add arguments to the exception
409                send(new CallRequest().procedure("remove").path(eventPath), dispatcher, new ClientSideResponseFuture(future) {
410
411                        @Override
412                        protected void processOk(Response response) {
413                                synchronized (registeredListeners) {
414                                        registeredListeners.remove(finalEventPath);
415                                }
416                                future.pass(null);
417                        }
418                });
419        }
420}
Note: See TracBrowser for help on using the repository browser.