source: java/main/src/main/java/com/framsticks/communication/ClientSideManagedConnection.java @ 101

Last change on this file since 101 was 101, checked in by psniegowski, 11 years ago

HIGHLIGHTS:

  • improve tree side notes
  • improve GUI layout
  • add foldable list of occured events to EventControl?
  • improve automatic type conversion in proxy listeners
  • implement several Access functionalities as algorithms independent of Access type
  • introduce draft base classes for distributed experiments
  • automatically register dependant Java classes to FramsClass? registry
  • add testing prime experiment and configuration
  • simplify and improve task dispatching

CHANGELOG:
Improve task dispatching in RemoteTree?.

GUI no longer hangs on connection problems.

Make all dispatchers joinables.

Refactorize Thread dispatcher.

Remove Task and PeriodicTask?.

Use Java utilities in those situations.

Reworking tasks dispatching.

Fix bug in EventControl? listener dispatching.

Minor improvements.

Add testing configuration for ExternalProcess? in GUI.

More improvement to prime.

Support for USERREADONLY in GUI.

Add that flag to various params in Java classes.

Remove redundant register clauses from several FramsClassAnnotations?.

Automatically gather and register dependant classes.

Add configuration for prime.

Improve Simulator class.

Add prime.xml configuration.

Introduce draft Experiment and Simulator classes.

Add prime experiment tests.

Enclose typical map with listeners into SimpleUniqueList?.

Needfile works in GUI.

Improve needfile handling in Browser.

More improvement with NeedFile?.

Implementing needfile.

Update test.

Rename ChangeEvent? to TestChangeEvent?.

Automatic argument type search in RemoteTree? listeners.

MultiParamLoader? uses AccessProvider?. By default old implementation
enclosed in AccessStash? or Registry.

Minor changes.

Rename SourceInterface? to Source.

Also improve toString of File and ListSource?.

Remove unused SimpleSource? class.

Add clearing in HistoryControl?.

Show entries in table at EventControl?.

Improve EventControl?.

Add listeners registration to EventControl?.

Add foldable table to HistoryControl?.

Add control row to Procedure and Event controls.

Improve layout of controls.

Another minor change to gui layout.

Minor improvement in the SliderControl?.

Minor changes.

Move ReflectionAccess?.Backend to separate file.

It was to cluttered.

Cleanup in ReflectionAccess?.

Move setMin, setMax, setDef to AccessOperations?.

Extract loading operation into AccessOperations?.

Append Framsticks to name of UnsupportedOperationException?.

The java.lang.UnsupportedOperationException? was shadowing this class.

Rename params.Util to params.ParamsUtil?.

Several improvements.

Minor changes.

Implement revert functionality.

Improve local changes management.

Minor improvement.

Remove methods rendered superfluous after SideNoteKey? improvement.

Improve SideNoteKey?.

It is now generic type, so explicit type specification at
call site is no more needed.

Introduce SideNoteKey? interface.

Only Objects implementing that key may be used as side note keys.

Minor improvements.

Use strings instead of ValueControls? in several gui mappings.

File size: 13.3 KB
Line 
1package com.framsticks.communication;
2
3import com.framsticks.communication.queries.ApplicationRequest;
4import com.framsticks.communication.queries.CallRequest;
5import com.framsticks.communication.queries.NeedFile;
6import com.framsticks.communication.queries.NeedFileAcceptor;
7import com.framsticks.communication.queries.ProtocolRequest;
8import com.framsticks.communication.queries.RegisterRequest;
9import com.framsticks.communication.queries.UseRequest;
10import com.framsticks.communication.queries.VersionRequest;
11import com.framsticks.core.Path;
12import com.framsticks.params.ListSource;
13import com.framsticks.util.*;
14import com.framsticks.util.dispatching.AtOnceDispatcher;
15import com.framsticks.util.dispatching.Dispatcher;
16import com.framsticks.util.dispatching.Dispatching;
17import com.framsticks.util.dispatching.ExceptionResultHandler;
18import com.framsticks.util.dispatching.Future;
19import com.framsticks.util.dispatching.FutureHandler;
20import com.framsticks.util.dispatching.JoinableState;
21import com.framsticks.util.lang.Casting;
22import com.framsticks.util.lang.Pair;
23import com.framsticks.util.lang.Strings;
24import com.framsticks.params.EventListener;
25
26import org.apache.logging.log4j.Logger;
27import org.apache.logging.log4j.LogManager;
28
29import java.util.*;
30import java.util.regex.Matcher;
31
32import javax.annotation.Nonnull;
33import javax.annotation.Nullable;
34
35import com.framsticks.util.dispatching.RunAt;
36
37/**
38 * @author Piotr Sniegowski
39 */
40public class ClientSideManagedConnection extends ManagedConnection {
41
42        private final static Logger log = LogManager.getLogger(ClientSideManagedConnection.class);
43
44        private final List<Runnable> applicationRequestsBuffer = new LinkedList<>();
45        private boolean isHandshakeDone = false;
46
47        protected NeedFileAcceptor needFileAcceptor;
48
49        /**
50         * @return the needFileAcceptor
51         */
52        public NeedFileAcceptor getNeedFileAcceptor() {
53                return needFileAcceptor;
54        }
55
56        /**
57         * @param needFileAcceptor the needFileAcceptor to set
58         */
59        public void setNeedFileAcceptor(NeedFileAcceptor needFileAcceptor) {
60                this.needFileAcceptor = needFileAcceptor;
61        }
62
63        /**
64         * @return the requestedVersion
65         */
66        public int getRequestedVersion() {
67                return requestedVersion;
68        }
69
70        /**
71         * @param requestedVersion the requestedVersion to set
72         */
73        public void setRequestedVersion(int requestedVersion) {
74                this.requestedVersion = requestedVersion;
75        }
76
77        protected int requestedVersion = 4;
78
79        public ClientSideManagedConnection() {
80                setDescription("client connection");
81                protocolVersion = -1;
82        }
83
84        protected List<String> readFileContent() {
85                List<String> content = new LinkedList<String>();
86                String line;
87                boolean longValue = false;
88                while (true) {
89                        line = getLine();
90                        if (longValue) {
91                                if (line.endsWith("~") && !line.endsWith("\\~")) {
92                                        longValue = false;
93                                }
94                        } else {
95                                if (line.equals("eof")) {
96                                        break;
97                                }
98                                if (line.endsWith(":~")) {
99                                        longValue = true;
100                                }
101                        }
102                        content.add(line);
103                }
104                return content;
105        }
106
107        private static class SentQuery<C> {
108
109                Request request;
110                ClientSideResponseFuture callback;
111                Dispatcher<C> dispatcher;
112                protected final List<File> files = new ArrayList<File>();
113
114                public List<File> getFiles() {
115                        return files;
116                }
117
118                @Override
119                public String toString() {
120                        return request.toString();
121                }
122
123                public void dispatchResponseProcess(final Response response) {
124                        Dispatching.dispatchIfNotActive(dispatcher, new RunAt<C>(callback) {
125                                @Override
126                                protected void runAt() {
127                                        callback.pass(response);
128                                }
129                        });
130                }
131        }
132
133        public void send(ProtocolRequest request, ClientSideResponseFuture callback) {
134                //TODO RunAt
135                sendImplementation(request, AtOnceDispatcher.getInstance(), callback);
136        }
137
138
139
140        public <C> void send(final ApplicationRequest request, final Dispatcher<C> dispatcher, final ClientSideResponseFuture callback) {
141                synchronized (applicationRequestsBuffer) {
142                        if (!isHandshakeDone) {
143                                applicationRequestsBuffer.add(new Runnable() {
144                                        @Override
145                                        public void run() {
146                                                sendImplementation(request, dispatcher, callback);
147                                        }
148                                });
149                                return;
150                        }
151                }
152                sendImplementation(request, dispatcher, callback);
153        }
154
155        private <C> void sendImplementation(Request request, Dispatcher<C> dispatcher, ClientSideResponseFuture callback) {
156                callback.setRequest(request);
157
158                if (getState().ordinal() > JoinableState.RUNNING.ordinal()) {
159                        throw new FramsticksException().msg("connection is not connected").arg("connection", this);
160                }
161
162                final SentQuery<C> sentQuery = new SentQuery<C>();
163                sentQuery.request = request;
164                sentQuery.callback = callback;
165                sentQuery.dispatcher = dispatcher;
166
167
168                senderThread.dispatch(new RunAt<Connection>(callback) {
169                        @Override
170                        protected void runAt() {
171                                Integer id = sentQueries.put(null, sentQuery);
172
173                                String command = sentQuery.request.getCommand();
174                                StringBuilder message = new StringBuilder();
175                                message.append(command);
176                                if (id != null) {
177                                        message.append(" ").append(id);
178                                }
179                                message.append(" ");
180                                sentQuery.request.construct(message);
181                                String out = message.toString();
182
183                                putLine(out);
184                                flushOut();
185                                log.debug("sending query: {}", out);
186                        }
187                });
188        }
189
190        @Override
191        public String toString() {
192                return "client connection " + address;
193        }
194
195        private void sendQueryVersion(final int version, final Future<Void> future) {
196                send(new VersionRequest().version(version), new ClientSideResponseFuture(future) {
197                        @Override
198                        protected void processOk(Response response) {
199                                protocolVersion = version;
200                                if (version < requestedVersion) {
201                                        /** it is an implicit loop here*/
202                                        sendQueryVersion(version + 1, future);
203                                        return;
204                                }
205                                send(new UseRequest().feature("request_id"), new ClientSideResponseFuture(future) {
206
207                                        @Override
208                                        protected void processOk(Response response) {
209                                                requestIdEnabled = true;
210                                                future.pass(null);
211                                        }
212                                });
213
214                        }
215                });
216        }
217
218        protected class IdCollection<T> {
219
220
221                protected final Map<Integer, T> map = new HashMap<>();
222                protected T current;
223
224                public Integer put(Integer idProposition, T value) {
225                        synchronized (ClientSideManagedConnection.this) {
226                                while (!(requestIdEnabled || current == null)) {
227                                        try {
228                                                ClientSideManagedConnection.this.wait();
229                                        } catch (InterruptedException ignored) {
230                                                break;
231                                        }
232                                }
233                                if (!requestIdEnabled) {
234                                        current = value;
235                                        return null;
236                                }
237                                if (idProposition == null) {
238                                        idProposition = nextQueryId++;
239                                }
240                                map.put(idProposition, value);
241                                return idProposition;
242                        }
243                }
244
245                public void clear(Integer id) {
246                        if (requestIdEnabled) {
247                                current = null;
248                        } else {
249                                map.remove(id);
250                        }
251                }
252
253                public @Nonnull T fetch(@Nullable Integer id, boolean remove) {
254                        synchronized (ClientSideManagedConnection.this) {
255                                try {
256                                        if (id == null) {
257                                                if (requestIdEnabled) {
258                                                        throw new FramsticksException().msg("request_id is enabled and id is missing");
259                                                }
260                                                T result = current;
261                                                current = null;
262                                                ClientSideManagedConnection.this.notifyAll();
263                                                return result;
264                                        }
265                                        if (!map.containsKey(id)) {
266                                                throw new FramsticksException().msg("id is unknown").arg("id", id);
267                                        }
268
269                                        T result = map.get(id);
270                                        if (remove) {
271                                                map.remove(id);
272                                        }
273                                        return result;
274
275                                } catch (FramsticksException e) {
276                                        throw new FramsticksException().msg("failed to match response to sent query").cause(e);
277                                }
278                        }
279                }
280        }
281
282        protected IdCollection<SentQuery<?>> sentQueries = new IdCollection<>();
283        protected IdCollection<NeedFile> needFiles = new IdCollection<>();
284
285        private int nextQueryId = 0;
286
287        protected void processEvent(String rest) {
288                Matcher matcher = Request.EVENT_PATTERN.matcher(rest);
289                if (!matcher.matches()) {
290                        throw new FramsticksException().msg("invalid event line").arg("rest", rest);
291                }
292                String fileLine = getLine();
293                if (!fileLine.equals("file")) {
294                        throw new FramsticksException().msg("expected file line").arg("got", fileLine);
295                }
296                String eventObjectPath = Strings.takeGroup(rest, matcher, 1).toString();
297                String eventCalleePath = Strings.takeGroup(rest, matcher, 2).toString();
298                final File file = new File("", new ListSource(readFileContent()));
299                log.debug("firing event {}", eventObjectPath);
300                EventListener<File> listener;
301                synchronized (registeredListeners) {
302                        listener = registeredListeners.get(eventObjectPath);
303                }
304                if (listener == null) {
305                        throw new FramsticksException().msg("failed to find registered event").arg("event path", eventObjectPath).arg("object", eventCalleePath);
306                }
307                listener.action(file);
308        }
309
310        protected void processNeedFile(Pair<Integer, CharSequence> rest) {
311                final Integer id = rest.first;
312                String suggestedName = null;
313                String description = null;
314                Pair<CharSequence, CharSequence> s = Request.takeString(rest.second);
315                if (s != null) {
316                        suggestedName = s.first.toString();
317                        Pair<CharSequence, CharSequence> d = Request.takeString(s.second);
318                        if (d != null) {
319                                description = d.first.toString();
320                        }
321                }
322
323                final Future<File> future = new Future<File>() {
324
325                        protected void send(final File result) {
326                                log.info("sending file: " + result);
327                                needFiles.clear(id);
328                                sendFile(null, result, id, ClientSideManagedConnection.this);
329
330                        }
331
332                        @Override
333                        protected void result(File result) {
334                                send(result);
335                        }
336
337                        @Override
338                        public void handle(FramsticksException exception) {
339                                send(new File("", ListSource.createFrom("# invalid", "# " + exception.getMessage())));
340                        }
341                };
342
343                NeedFile needFile = new NeedFile(suggestedName, description, future);
344
345                if (needFileAcceptor.acceptNeed(needFile)) {
346                        return;
347                }
348
349                future.handle(new FramsticksException().msg("acceptor did not accepted need"));
350        }
351
352        protected void processFile(Pair<Integer, CharSequence> rest) {
353                final SentQuery<?> sentQuery = sentQueries.fetch(rest.first, false);
354
355                String currentFilePath = rest.second.toString();
356                if (!Strings.notEmpty(currentFilePath)) {
357                        currentFilePath = Casting.throwCast(ApplicationRequest.class, sentQuery.request).getPath();
358                }
359
360                sentQuery.files.add(new File(currentFilePath, new ListSource(readFileContent())));
361        }
362
363        protected void processMessageStartingWith(final String header) {
364                try {
365                        final Pair<CharSequence, CharSequence> command = Request.takeIdentifier(header);
366                        if (command == null) {
367                                throw new FramsticksException().msg("failed to parse command");
368                        }
369                        final CharSequence keyword = command.first;
370                        if (keyword.equals("event")) {
371                                processEvent(command.second.toString());
372                                return;
373                        }
374
375                        final Pair<Integer, CharSequence> rest = takeRequestId(command.second);
376                        if (rest == null) {
377                                throw new FramsticksException().msg("failed to parse optional id and remainder");
378                        }
379
380                        if (keyword.equals("file")) {
381                                processFile(rest);
382                                return;
383                        }
384                        if (keyword.equals("ok") || keyword.equals("error")) {
385
386                                final SentQuery<?> sentQuery = sentQueries.fetch(rest.first, true);
387
388                                log.debug("parsing response for request {}", sentQuery);
389
390                                sentQuery.dispatchResponseProcess(new Response(command.first.equals("ok"), rest.second.toString(), sentQuery.getFiles()));
391                                return;
392                        }
393                        if (keyword.equals("needfile")) {
394                                processNeedFile(rest);
395                                return;
396                        }
397
398                        throw new FramsticksException().msg("unknown command keyword").arg("keyword", keyword);
399                } catch (FramsticksException e) {
400                        throw new FramsticksException().msg("failed to process message").arg("starting with line", header).cause(e);
401                }
402        }
403
404        protected final ExceptionResultHandler closeOnFailure = new ExceptionResultHandler() {
405
406                @Override
407                public void handle(FramsticksException exception) {
408                        interruptJoinable();
409                        // finish();
410                }
411        };
412
413        @Override
414        protected void receiverThreadRoutine() {
415                startClientConnection(this);
416
417                sendQueryVersion(1, new FutureHandler<Void>(closeOnFailure) {
418
419                        @Override
420                        protected void result(Void result) {
421                                synchronized (applicationRequestsBuffer) {
422                                        isHandshakeDone = true;
423                                        for (Runnable r : applicationRequestsBuffer) {
424                                                r.run();
425                                        }
426                                        applicationRequestsBuffer.clear();
427                                }
428                        }
429                });
430
431                processInputBatchesUntilClosed();
432        }
433
434        protected void processNextInputBatch() {
435                processMessageStartingWith(getLine());
436        }
437
438        protected final Map<String, EventListener<File>> registeredListeners = new HashMap<>();
439
440        public <C> void addListener(String path, final EventListener<File> listener, final Dispatcher<C> dispatcher, final Future<Void> future) {
441                send(new RegisterRequest().path(path), dispatcher, new ClientSideResponseFuture(future) {
442                        @Override
443                        protected void processOk(Response response) {
444                                synchronized (registeredListeners) {
445                                        registeredListeners.put(Path.validateString(response.getComment()), listener);
446                                }
447                                future.pass(null);
448                        }
449                });
450        }
451
452        public <C> void removeListener(EventListener<File> listener, final Dispatcher<C> dispatcher, final Future<Void> future) {
453                String eventPath = null;
454                synchronized (registeredListeners) {
455                        for (Map.Entry<String, EventListener<File>> e : registeredListeners.entrySet()) {
456                                if (e.getValue() == listener) {
457                                        eventPath = e.getKey();
458                                        break;
459                                }
460                        }
461                }
462                if (eventPath == null) {
463                        future.handle(new FramsticksException().msg("listener is not registered").arg("listener", listener));
464                        return;
465                }
466
467                final String finalEventPath = eventPath;
468                                //TODO add arguments to the exception
469                send(new CallRequest().procedure("remove").path(eventPath), dispatcher, new ClientSideResponseFuture(future) {
470
471                        @Override
472                        protected void processOk(Response response) {
473                                synchronized (registeredListeners) {
474                                        registeredListeners.remove(finalEventPath);
475                                }
476                                future.pass(null);
477                        }
478                });
479        }
480}
Note: See TracBrowser for help on using the repository browser.