source: java/main/src/main/java/com/framsticks/communication/ClientSideManagedConnection.java @ 100

Last change on this file since 100 was 100, checked in by psniegowski, 11 years ago

HIGHLIGHTS:

  • add <include/> to configuration
  • add side notes to tree
    • used to store arbitrary information alongside the tree structure
  • migrate to log4j2
    • supports lazy string evaluation of passed arguments
  • improve GUI tree
    • it stays in synchronization with actual state (even in high load test scenario)
  • improve panel management in GUI
  • make loading objects in GUI more lazy
  • offload parsing to connection receiver thread
    • info parsing
    • first step of objects parsing
  • fix connection parsing bug (eof in long values)
  • support zero-arguments procedure in table view

CHANGELOG:
Implement procedure calls from table view.

Refactorization around procedures in tables.

Add table editor for buttons.

Render buttons in the the list view.

Further improve Columns.

Add Column class for TableModel?.

Accept also non-arguments ProcedureParams? in tableView.

Increase maximal TextAreaControl? size.

Add tooltip to ProcedureControl?.

Fix bug of interpreting eofs in long values by connection reader.

Further rework connection parsing.

Simplify client connection processing.

Test ListChange? modification.

Test ListChange? events with java server.

Add TestChild?.

Fix bug with fast deregistering when connecting to running server.

Another minor refactorization in TreeOperations?.

Fix bug in SimpleAbstractAccess? loading routine.

Another minor improvement.

Minor change.

Make reading of List objects two-phase.

Another minor change.

Dispatch parsing into receiver thread.

Another step.

Enclose passing value in ObjectParam? case in closure.

Minor step.

Minor change on way to offload parsing.

Temporarily comment out single ValueParam? get.

It will be generalized to multi ValueParam?.

Process info in receiver thread.

Add DispatchingExceptionHandler?.

Make waits in browser test longer.

Use FETCHED_MARK.

It is honored in GUI, where it used to decide whether to get values

after user action.

It is set in standard algorithm for processing fetched values.

Add remove operation to side notes.

Make loading more lazy.

Improve loading policy.

On node choose load itself, on node expansion, load children.

Minor improvement.

Fix bug with panel interleaving.

Minor improvements.

Improve panel management.

More cleaning around panels.

Reorganize panels.

Further improve tree.

Fix bug in TreeModel?.

Remove children from TreeNode?.

Implement TreeNode? hashCode and equals.

Make TreeNode? delegate equals and hashcode to internal reference.

Move listeners from TreeNode? to side notes.

Store path.textual as a side note.

Side note params instead of accesses for objects.

More refactorizations.

In TreeNode? bindAccess based on side notes.

Minor step.

Hide createAccess.

Rename AccessInterface? to Access.

Minor changes.

Several improvements in high load scenarios.

Change semantics of ArrayListAccess?.set(index, null);

It now removes the element, making list shorter
(it was set to null before).

Add path remove handler.

Handle exceptions in Connection.

Update .gitignore

Configure logging to file.

Move registration to TreeModel?.

Further refactorization.

Minor refactorization.

Minor improvements.

Use specialized event also for Modify action of ListChange?.

Use remove events.

Use the insertion events for tree.

Further improve tree refreshing.

Further improve reacting on events in GUI.

Fix problem with not adding objects on addition list change.

Migrate to log4j lazy String construction interface.

Migrate imports to log4j2.

Drop dependency on adapter to version 1.2.

Switch log4j implementation to log4j2.

Add dirty mark to the NodeAtFrame?.

Make selecting in AccessInterfaces? type safe.

Ignore containers size settings in Model and Genotype.

Use tree side notes to remember local changes and panels.

Add sideNotes to tree.

They will be used to store various accompanying information
right in the tree.

Use ReferenceIdentityMap? from apache in TreeNode?.

It suits the need perfectly (weak semantics on both key and value).

Make ArrayListParam? do not react size changes.

Guard in TableModel? before not yet loaded objects.

Add <include/> clause and AutoInjector?.

Extract common columns configuration to separate xml,
that can be included by other configurations.

File size: 11.4 KB
Line 
1package com.framsticks.communication;
2
3import com.framsticks.communication.queries.ApplicationRequest;
4import com.framsticks.communication.queries.CallRequest;
5import com.framsticks.communication.queries.ProtocolRequest;
6import com.framsticks.communication.queries.RegisterRequest;
7import com.framsticks.communication.queries.UseRequest;
8import com.framsticks.communication.queries.VersionRequest;
9import com.framsticks.core.Path;
10import com.framsticks.params.ListSource;
11import com.framsticks.util.*;
12import com.framsticks.util.dispatching.AtOnceDispatcher;
13import com.framsticks.util.dispatching.Dispatcher;
14import com.framsticks.util.dispatching.Dispatching;
15import com.framsticks.util.dispatching.ExceptionResultHandler;
16import com.framsticks.util.dispatching.Future;
17import com.framsticks.util.dispatching.FutureHandler;
18import com.framsticks.util.dispatching.JoinableState;
19import com.framsticks.util.lang.Casting;
20import com.framsticks.util.lang.Pair;
21import com.framsticks.util.lang.Strings;
22import com.framsticks.params.EventListener;
23
24import org.apache.logging.log4j.Logger;
25import org.apache.logging.log4j.LogManager;
26
27import java.util.*;
28import java.util.regex.Matcher;
29
30import javax.annotation.Nonnull;
31import javax.annotation.Nullable;
32
33import com.framsticks.util.dispatching.RunAt;
34
35/**
36 * @author Piotr Sniegowski
37 */
38public class ClientSideManagedConnection extends ManagedConnection {
39
40        private final static Logger log = LogManager.getLogger(ClientSideManagedConnection.class);
41
42        private final List<Runnable> applicationRequestsBuffer = new LinkedList<>();
43        private boolean isHandshakeDone = false;
44
45
46        /**
47         * @return the requestedVersion
48         */
49        public int getRequestedVersion() {
50                return requestedVersion;
51        }
52
53        /**
54         * @param requestedVersion the requestedVersion to set
55         */
56        public void setRequestedVersion(int requestedVersion) {
57                this.requestedVersion = requestedVersion;
58        }
59
60        protected int requestedVersion = 4;
61
62        public ClientSideManagedConnection() {
63                setDescription("client connection");
64                protocolVersion = -1;
65        }
66
67
68
69        protected List<String> readFileContent() {
70                List<String> content = new LinkedList<String>();
71                String line;
72                boolean longValue = false;
73                while (true) {
74                        line = getLine();
75                        if (longValue) {
76                                if (line.endsWith("~") && !line.endsWith("\\~")) {
77                                        longValue = false;
78                                }
79                        } else {
80                                if (line.equals("eof")) {
81                                        break;
82                                }
83                                if (line.endsWith(":~")) {
84                                        longValue = true;
85                                }
86                        }
87                        content.add(line);
88                }
89                return content;
90        }
91
92        private static class SentQuery<C> {
93
94                Request request;
95                ClientSideResponseFuture callback;
96                Dispatcher<C> dispatcher;
97                protected final List<File> files = new ArrayList<File>();
98
99                public List<File> getFiles() {
100                        return files;
101                }
102
103                @Override
104                public String toString() {
105                        return request.toString();
106                }
107
108                public void dispatchResponseProcess(final Response response) {
109                        Dispatching.dispatchIfNotActive(dispatcher, new RunAt<C>(callback) {
110                                @Override
111                                protected void runAt() {
112                                        callback.pass(response);
113                                }
114                        });
115                }
116        }
117
118        private Map<Integer, SentQuery<?>> queryMap = new HashMap<>();
119
120        private SentQuery<?> currentlySentQuery;
121
122        public void send(ProtocolRequest request, ClientSideResponseFuture callback) {
123                //TODO RunAt
124                sendImplementation(request, AtOnceDispatcher.getInstance(), callback);
125        }
126
127
128
129        public <C> void send(final ApplicationRequest request, final Dispatcher<C> dispatcher, final ClientSideResponseFuture callback) {
130                synchronized (applicationRequestsBuffer) {
131                        if (!isHandshakeDone) {
132                                applicationRequestsBuffer.add(new Runnable() {
133                                        @Override
134                                        public void run() {
135                                                sendImplementation(request, dispatcher, callback);
136                                        }
137                                });
138                                return;
139                        }
140                }
141                sendImplementation(request, dispatcher, callback);
142        }
143
144        private <C> void sendImplementation(Request request, Dispatcher<C> dispatcher, ClientSideResponseFuture callback) {
145                callback.setRequest(request);
146
147                if (getState().ordinal() > JoinableState.RUNNING.ordinal()) {
148                        throw new FramsticksException().msg("connection is not connected").arg("connection", this);
149                }
150
151                final SentQuery<C> sentQuery = new SentQuery<C>();
152                sentQuery.request = request;
153                sentQuery.callback = callback;
154                sentQuery.dispatcher = dispatcher;
155
156                senderThread.dispatch(new RunAt<Connection>(callback) {
157                        @Override
158                        protected void runAt() {
159                                Integer id;
160                                synchronized (ClientSideManagedConnection.this) {
161
162                                        while (!(requestIdEnabled || currentlySentQuery == null)) {
163                                                try {
164                                                        ClientSideManagedConnection.this.wait();
165                                                } catch (InterruptedException ignored) {
166                                                        break;
167                                                }
168                                        }
169                                        if (requestIdEnabled) {
170                                                queryMap.put(nextQueryId, sentQuery);
171                                                id = nextQueryId++;
172                                        } else {
173                                                currentlySentQuery = sentQuery;
174                                                id = null;
175                                        }
176                                }
177                                String command = sentQuery.request.getCommand();
178                                StringBuilder message = new StringBuilder();
179                                message.append(command);
180                                if (id != null) {
181                                        message.append(" ").append(id);
182                                }
183                                message.append(" ");
184                                sentQuery.request.construct(message);
185                                String out = message.toString();
186
187                                putLine(out);
188                                flushOut();
189                                log.debug("sending query: {}", out);
190
191                        }
192                });
193                /*
194                synchronized (this) {
195                        log.debug("queueing query: {}", query);
196                        queryQueue.offer(sentQuery);
197                        notifyAll();
198                }
199                 */
200        }
201
202        @Override
203        public String toString() {
204                return "client connection " + address;
205        }
206
207
208        private void sendQueryVersion(final int version, final Future<Void> future) {
209                send(new VersionRequest().version(version), new ClientSideResponseFuture(future) {
210                        @Override
211                        protected void processOk(Response response) {
212                                protocolVersion = version;
213                                if (version < requestedVersion) {
214                                        /** it is an implicit loop here*/
215                                        sendQueryVersion(version + 1, future);
216                                        return;
217                                }
218                                send(new UseRequest().feature("request_id"), new ClientSideResponseFuture(future) {
219
220                                        @Override
221                                        protected void processOk(Response response) {
222                                                requestIdEnabled = true;
223                                                future.pass(null);
224                                        }
225                                });
226
227                        }
228                });
229        }
230
231        private synchronized @Nonnull SentQuery<?> fetchQuery(@Nullable Integer id, boolean remove) {
232                try {
233                        if (id == null) {
234                                if (requestIdEnabled) {
235                                        throw new FramsticksException().msg("request_id is enabled and id is missing");
236                                }
237                                SentQuery<?> result = currentlySentQuery;
238                                if (remove) {
239                                        currentlySentQuery = null;
240                                        notifyAll();
241                                }
242                                return result;
243                        }
244
245                        if (!queryMap.containsKey(id)) {
246                                throw new FramsticksException().msg("id is unknown").arg("id", id);
247                        }
248
249                        SentQuery<?> result = queryMap.get(id);
250                        if (remove) {
251                                queryMap.remove(id);
252                        }
253                        return result;
254
255                } catch (FramsticksException e) {
256                        throw new FramsticksException().msg("failed to match response to sent query").cause(e);
257                }
258        }
259
260        private int nextQueryId = 0;
261
262        protected void processEvent(String rest) {
263                Matcher matcher = Request.EVENT_PATTERN.matcher(rest);
264                if (!matcher.matches()) {
265                        throw new FramsticksException().msg("invalid event line").arg("rest", rest);
266                }
267                String fileLine = getLine();
268                if (!fileLine.equals("file")) {
269                        throw new FramsticksException().msg("expected file line").arg("got", fileLine);
270                }
271                String eventObjectPath = Request.takeGroup(rest, matcher, 1).toString();
272                String eventCalleePath = Request.takeGroup(rest, matcher, 2).toString();
273                final File file = new File("", new ListSource(readFileContent()));
274                log.debug("firing event {}", eventObjectPath);
275                EventListener<File> listener;
276                synchronized (registeredListeners) {
277                        listener = registeredListeners.get(eventObjectPath);
278                }
279                if (listener  == null) {
280                        throw new FramsticksException().msg("failed to find registered event").arg("event path", eventObjectPath).arg("object", eventCalleePath);
281                }
282                listener.action(file);
283        }
284
285        protected void processFile(Pair<Integer, CharSequence> rest) {
286                final SentQuery<?> sentQuery = fetchQuery(rest.first, false);
287
288                String currentFilePath = rest.second.toString();
289                if (!Strings.notEmpty(currentFilePath)) {
290                        currentFilePath = Casting.throwCast(ApplicationRequest.class, sentQuery.request).getPath();
291                }
292
293                sentQuery.files.add(new File(currentFilePath, new ListSource(readFileContent())));
294
295        }
296
297        protected void processMessageStartingWith(final String header) {
298                try {
299                        final Pair<CharSequence, CharSequence> command = Request.takeIdentifier(header);
300                        if (command == null) {
301                                throw new FramsticksException().msg("failed to parse command");
302                        }
303                        final CharSequence keyword = command.first;
304                        if (keyword.equals("event")) {
305                                processEvent(command.second.toString());
306                                return;
307                        }
308
309                        final Pair<Integer, CharSequence> rest = takeRequestId(command.second);
310                        if (rest == null) {
311                                throw new FramsticksException().msg("failed to parse optional id and remainder");
312                        }
313
314                        if (keyword.equals("file")) {
315                                processFile(rest);
316                                return;
317                        }
318                        if (keyword.equals("ok") || keyword.equals("error")) {
319
320                                final SentQuery<?> sentQuery = fetchQuery(rest.first, true);
321
322                                log.debug("parsing response for request {}", sentQuery);
323
324                                sentQuery.dispatchResponseProcess(new Response(command.first.equals("ok"), rest.second.toString(), sentQuery.getFiles()));
325                                return;
326                        }
327
328                        throw new FramsticksException().msg("unknown command keyword").arg("keyword", keyword);
329                } catch (FramsticksException e) {
330                        throw new FramsticksException().msg("failed to process message").arg("starting with line", header).cause(e);
331                }
332        }
333
334        protected final ExceptionResultHandler closeOnFailure = new ExceptionResultHandler() {
335
336                @Override
337                public void handle(FramsticksException exception) {
338                        interrupt();
339                        // finish();
340                }
341        };
342
343        @Override
344        protected void receiverThreadRoutine() {
345                startClientConnection(this);
346
347                sendQueryVersion(1, new FutureHandler<Void>(closeOnFailure) {
348
349                        @Override
350                        protected void result(Void result) {
351                                synchronized (applicationRequestsBuffer) {
352                                        isHandshakeDone = true;
353                                        for (Runnable r : applicationRequestsBuffer) {
354                                                r.run();
355                                        }
356                                        applicationRequestsBuffer.clear();
357                                }
358                        }
359                });
360
361                processInputBatchesUntilClosed();
362        }
363
364        protected void processNextInputBatch() {
365                processMessageStartingWith(getLine());
366        }
367
368        protected final Map<String, EventListener<File>> registeredListeners = new HashMap<>();
369
370        public <C> void addListener(String path, final EventListener<File> listener, final Dispatcher<C> dispatcher, final Future<Void> future) {
371                send(new RegisterRequest().path(path), dispatcher, new ClientSideResponseFuture(future) {
372                        @Override
373                        protected void processOk(Response response) {
374                                synchronized (registeredListeners) {
375                                        registeredListeners.put(Path.validateString(response.getComment()), listener);
376                                }
377                                future.pass(null);
378                        }
379                });
380        }
381
382        public <C> void removeListener(EventListener<File> listener, final Dispatcher<C> dispatcher, final Future<Void> future) {
383                String eventPath = null;
384                synchronized (registeredListeners) {
385                        for (Map.Entry<String, EventListener<File>> e : registeredListeners.entrySet()) {
386                                if (e.getValue() == listener) {
387                                        eventPath = e.getKey();
388                                        break;
389                                }
390                        }
391                }
392                if (eventPath == null) {
393                        future.handle(new FramsticksException().msg("listener is not registered").arg("listener", listener));
394                        return;
395                }
396
397                final String finalEventPath = eventPath;
398                                //TODO add arguments to the exception
399                send(new CallRequest().procedure("remove").path(eventPath), dispatcher, new ClientSideResponseFuture(future) {
400
401                        @Override
402                        protected void processOk(Response response) {
403                                synchronized (registeredListeners) {
404                                        registeredListeners.remove(finalEventPath);
405                                }
406                                future.pass(null);
407                        }
408                });
409        }
410}
Note: See TracBrowser for help on using the repository browser.