source: java/main/src/main/java/com/framsticks/communication/ClientSideManagedConnection.java @ 107

Last change on this file since 107 was 107, checked in by psniegowski, 11 years ago

HIGHLIGHTS:

  • add SimultorProviders? hierarchy
  • start Framsticks server over SSH
  • FJF compatible with Framsticks 4.0rc3
  • reading and writing of standard.expt
  • a proof-of-concept implementation of StandardExperiment?

CHANGELOG:
Optionally return FreeAccess? from registry.

Add SimulatorRange?.

StandardExperiment? with genotypes circulation.

Automate registration around StandardState?.

More improvements to StandardExperiment?.

Skeleton version of StandardExperiment?.

Test saving of StandardState?.

Standard experiment state is being loaded.

More development towards StandardState? reading.

Work on reading standard experiment state.

Add classes for standard experiment.

Update example standard.expt

Add FreeAccess? and FreeObject?.

Made compatible with version 4.0rc3

Change deserialization policy.

Improve SSH support.

Working running simulator over SSH.

Fix joining bug in Experiment.

Working version of SimulatorRunner?.

Add more SimulatorProviders?.

Working PrimeExperimentTest? with 4.0rc3

Add references to deserialization.

Add OpaqueObject? and it's serialization.

Add deserialization of dictionaries.

Partial implementation of deserialization.

Add more tests for deserialization.

Prepare tests for deserialization.

Add proper result to prime experiment test.

Minor fixes to simulators providers.

Draft version of SimulatorProvider?.

Add SimulatorProvider? interface.

File size: 13.7 KB
Line 
1package com.framsticks.communication;
2
3import com.framsticks.communication.queries.ApplicationRequest;
4import com.framsticks.communication.queries.CallRequest;
5import com.framsticks.communication.queries.NeedFile;
6import com.framsticks.communication.queries.NeedFileAcceptor;
7import com.framsticks.communication.queries.ProtocolRequest;
8import com.framsticks.communication.queries.RegisterRequest;
9import com.framsticks.communication.queries.UseRequest;
10import com.framsticks.communication.queries.VersionRequest;
11import com.framsticks.params.ListSource;
12import com.framsticks.structure.Path;
13import com.framsticks.util.*;
14import com.framsticks.util.dispatching.AtOnceDispatcher;
15import com.framsticks.util.dispatching.Dispatcher;
16import com.framsticks.util.dispatching.Dispatching;
17import com.framsticks.util.dispatching.FutureHandler;
18import com.framsticks.util.dispatching.Future;
19import com.framsticks.util.dispatching.JoinableState;
20import com.framsticks.util.lang.Casting;
21import com.framsticks.util.lang.Pair;
22import com.framsticks.util.lang.Strings;
23import com.framsticks.params.EventListener;
24
25import org.apache.logging.log4j.Logger;
26import org.apache.logging.log4j.LogManager;
27
28import java.util.*;
29import java.util.regex.Matcher;
30
31import javax.annotation.Nonnull;
32import javax.annotation.Nullable;
33
34import com.framsticks.util.dispatching.RunAt;
35
36/**
37 * @author Piotr Sniegowski
38 */
39public class ClientSideManagedConnection extends ManagedConnection {
40
41        private final static Logger log = LogManager.getLogger(ClientSideManagedConnection.class);
42
43        private final List<Runnable> applicationRequestsBuffer = new LinkedList<>();
44        private boolean isHandshakeDone = false;
45
46        protected NeedFileAcceptor needFileAcceptor;
47
48        /**
49         * @return the needFileAcceptor
50         */
51        public NeedFileAcceptor getNeedFileAcceptor() {
52                return needFileAcceptor;
53        }
54
55        /**
56         * @param needFileAcceptor the needFileAcceptor to set
57         */
58        public void setNeedFileAcceptor(NeedFileAcceptor needFileAcceptor) {
59                this.needFileAcceptor = needFileAcceptor;
60        }
61
62        /**
63         * @return the requestedVersion
64         */
65        public int getRequestedVersion() {
66                return requestedVersion;
67        }
68
69        /**
70         * @param requestedVersion the requestedVersion to set
71         */
72        public void setRequestedVersion(int requestedVersion) {
73                this.requestedVersion = requestedVersion;
74        }
75
76        protected int requestedVersion = 4;
77
78        public ClientSideManagedConnection() {
79                setDescription("client connection");
80                protocolVersion = -1;
81                requestedFeatures.add("request_id");
82                // requestedFeatures.add("call_empty_result");
83                requestedFeatures.add("needfile_id");
84        }
85
86        protected List<String> readFileContent() {
87                List<String> content = new LinkedList<String>();
88                String line;
89                boolean longValue = false;
90                while (true) {
91                        line = getLine();
92                        if (longValue) {
93                                if (line.endsWith("~") && !line.endsWith("\\~")) {
94                                        longValue = false;
95                                }
96                        } else {
97                                if (line.equals("eof")) {
98                                        break;
99                                }
100                                if (line.endsWith(":~")) {
101                                        longValue = true;
102                                }
103                        }
104                        content.add(line);
105                }
106                return content;
107        }
108
109        private static class SentQuery<C> {
110
111                Request request;
112                ClientSideResponseFuture callback;
113                Dispatcher<C> dispatcher;
114                protected final List<File> files = new ArrayList<File>();
115
116                public List<File> getFiles() {
117                        return files;
118                }
119
120                @Override
121                public String toString() {
122                        return request.toString();
123                }
124
125                public void dispatchResponseProcess(final Response response) {
126                        Dispatching.dispatchIfNotActive(dispatcher, new RunAt<C>(callback) {
127                                @Override
128                                protected void runAt() {
129                                        callback.pass(response);
130                                }
131                        });
132                }
133        }
134
135        public void send(ProtocolRequest request, ClientSideResponseFuture callback) {
136                sendImplementation(request, AtOnceDispatcher.getInstance(), callback);
137        }
138
139
140
141        public <C> void send(final ApplicationRequest request, final Dispatcher<C> dispatcher, final ClientSideResponseFuture callback) {
142                synchronized (applicationRequestsBuffer) {
143                        if (!isHandshakeDone) {
144                                applicationRequestsBuffer.add(new Runnable() {
145                                        @Override
146                                        public void run() {
147                                                sendImplementation(request, dispatcher, callback);
148                                        }
149                                });
150                                return;
151                        }
152                }
153                sendImplementation(request, dispatcher, callback);
154        }
155
156        private <C> void sendImplementation(Request request, Dispatcher<C> dispatcher, ClientSideResponseFuture callback) {
157                callback.setRequest(request);
158
159                if (getState().ordinal() > JoinableState.RUNNING.ordinal()) {
160                        throw new FramsticksException().msg("connection is not connected").arg("connection", this);
161                }
162
163                final SentQuery<C> sentQuery = new SentQuery<C>();
164                sentQuery.request = request;
165                sentQuery.callback = callback;
166                sentQuery.dispatcher = dispatcher;
167
168
169                senderThread.dispatch(new RunAt<Connection>(callback) {
170                        @Override
171                        protected void runAt() {
172                                Integer id = sentQueries.put(null, sentQuery);
173
174                                String command = sentQuery.request.getCommand();
175                                StringBuilder message = new StringBuilder();
176                                message.append(command);
177                                if (id != null) {
178                                        message.append(" ").append(id);
179                                }
180                                message.append(" ");
181                                sentQuery.request.construct(message);
182                                String out = message.toString();
183
184                                putLine(out);
185                                flushOut();
186                                log.debug("sending query: {}", out);
187                        }
188                });
189        }
190
191        @Override
192        public String toString() {
193                return "client connection " + address;
194        }
195
196        private void sendNextUseRequest(final Iterator<String> featuresIterator, final FutureHandler<Void> future) {
197                if (!featuresIterator.hasNext()) {
198                        future.pass(null);
199                        return;
200                }
201                final String feature = featuresIterator.next();
202
203                send(new UseRequest().feature(feature), new ClientSideResponseFuture(future) {
204
205                        @Override
206                        protected void processOk(Response response) {
207                                if (feature.equals("request_id")) {
208                                        requestIdEnabled = true;
209                                }
210                                sendNextUseRequest(featuresIterator, future);
211                        }
212                });
213        }
214
215        private void sendQueryVersion(final int version, final FutureHandler<Void> future) {
216                send(new VersionRequest().version(version), new ClientSideResponseFuture(future) {
217                        @Override
218                        protected void processOk(Response response) {
219                                protocolVersion = version;
220                                if (version < requestedVersion) {
221                                        /** it is an implicit loop here*/
222                                        sendQueryVersion(version + 1, future);
223                                        return;
224                                }
225                                sendNextUseRequest(requestedFeatures.iterator(), future);
226
227                        }
228                });
229        }
230
231        protected class IdCollection<T> {
232
233
234                protected final Map<Integer, T> map = new HashMap<>();
235                protected T current;
236
237                public Integer put(Integer idProposition, T value) {
238                        synchronized (ClientSideManagedConnection.this) {
239                                while (!(requestIdEnabled || current == null)) {
240                                        try {
241                                                ClientSideManagedConnection.this.wait();
242                                        } catch (InterruptedException ignored) {
243                                                break;
244                                        }
245                                }
246                                if (!requestIdEnabled) {
247                                        current = value;
248                                        return null;
249                                }
250                                if (idProposition == null) {
251                                        idProposition = nextQueryId++;
252                                }
253                                map.put(idProposition, value);
254                                return idProposition;
255                        }
256                }
257
258                public void clear(Integer id) {
259                        if (requestIdEnabled) {
260                                current = null;
261                        } else {
262                                map.remove(id);
263                        }
264                }
265
266                public @Nonnull T fetch(@Nullable Integer id, boolean remove) {
267                        synchronized (ClientSideManagedConnection.this) {
268                                try {
269                                        if (id == null) {
270                                                if (requestIdEnabled) {
271                                                        throw new FramsticksException().msg("request_id is enabled and id is missing");
272                                                }
273                                                T result = current;
274                                                current = null;
275                                                ClientSideManagedConnection.this.notifyAll();
276                                                return result;
277                                        }
278                                        if (!map.containsKey(id)) {
279                                                throw new FramsticksException().msg("id is unknown").arg("id", id);
280                                        }
281
282                                        T result = map.get(id);
283                                        if (remove) {
284                                                map.remove(id);
285                                        }
286                                        return result;
287
288                                } catch (FramsticksException e) {
289                                        throw new FramsticksException().msg("failed to match response to sent query").cause(e);
290                                }
291                        }
292                }
293        }
294
295        protected IdCollection<SentQuery<?>> sentQueries = new IdCollection<>();
296        protected IdCollection<NeedFile> needFiles = new IdCollection<>();
297
298        private int nextQueryId = 0;
299
300        protected void processEvent(String rest) {
301                Matcher matcher = Request.EVENT_PATTERN.matcher(rest);
302                if (!matcher.matches()) {
303                        throw new FramsticksException().msg("invalid event line").arg("rest", rest);
304                }
305                String fileLine = getLine();
306                if (!fileLine.equals("file")) {
307                        throw new FramsticksException().msg("expected file line").arg("got", fileLine);
308                }
309                String eventObjectPath = Strings.takeGroup(rest, matcher, 1).toString();
310                String eventCalleePath = Strings.takeGroup(rest, matcher, 2).toString();
311                final File file = new File("", new ListSource(readFileContent()));
312                log.debug("firing event {}", eventObjectPath);
313                EventListener<File> listener;
314                synchronized (registeredListeners) {
315                        listener = registeredListeners.get(eventObjectPath);
316                }
317                if (listener == null) {
318                        throw new FramsticksException().msg("failed to find registered event").arg("event path", eventObjectPath).arg("object", eventCalleePath);
319                }
320                listener.action(file);
321        }
322
323        protected void processNeedFile(Pair<Integer, CharSequence> rest) {
324                final Integer id = rest.first;
325                String suggestedName = null;
326                String description = null;
327                Pair<CharSequence, CharSequence> s = Request.takeString(rest.second);
328                if (s != null) {
329                        suggestedName = s.first.toString();
330                        Pair<CharSequence, CharSequence> d = Request.takeString(s.second);
331                        if (d != null) {
332                                description = d.first.toString();
333                        }
334                }
335
336                final FutureHandler<File> future = new FutureHandler<File>() {
337
338                        protected void send(final File result) {
339                                log.debug("sending file: " + result);
340                                needFiles.clear(id);
341                                sendFile(null, result, id, ClientSideManagedConnection.this);
342                        }
343
344                        @Override
345                        protected void result(File result) {
346                                send(result);
347                        }
348
349                        @Override
350                        public void handle(FramsticksException exception) {
351                                send(new File("", ListSource.createFrom("# invalid", "# " + exception.getMessage())));
352                        }
353                };
354
355                NeedFile needFile = new NeedFile(suggestedName, description, future);
356
357                if (needFileAcceptor.acceptNeed(needFile)) {
358                        return;
359                }
360
361                future.handle(new FramsticksException().msg("acceptor did not accepted need"));
362        }
363
364        protected void processFile(Pair<Integer, CharSequence> rest) {
365                final SentQuery<?> sentQuery = sentQueries.fetch(rest.first, false);
366
367                String currentFilePath = rest.second.toString();
368                if (!Strings.notEmpty(currentFilePath)) {
369                        currentFilePath = Casting.throwCast(ApplicationRequest.class, sentQuery.request).getPath();
370                }
371
372                sentQuery.files.add(new File(currentFilePath, new ListSource(readFileContent())));
373        }
374
375        protected void processMessageStartingWith(final String header) {
376                try {
377                        final Pair<CharSequence, CharSequence> command = Request.takeIdentifier(header);
378                        if (command == null) {
379                                throw new FramsticksException().msg("failed to parse command");
380                        }
381                        final CharSequence keyword = command.first;
382                        if (keyword.equals("event")) {
383                                processEvent(command.second.toString());
384                                return;
385                        }
386
387                        final Pair<Integer, CharSequence> rest = takeRequestId(command.second);
388                        if (rest == null) {
389                                throw new FramsticksException().msg("failed to parse optional id and remainder");
390                        }
391
392                        if (keyword.equals("file")) {
393                                processFile(rest);
394                                return;
395                        }
396                        if (keyword.equals("ok") || keyword.equals("error")) {
397
398                                final SentQuery<?> sentQuery = sentQueries.fetch(rest.first, true);
399
400                                log.debug("parsing response for request {}", sentQuery);
401
402                                sentQuery.dispatchResponseProcess(new Response(command.first.equals("ok"), rest.second.toString(), sentQuery.getFiles()));
403                                return;
404                        }
405                        if (keyword.equals("needfile")) {
406                                processNeedFile(rest);
407                                return;
408                        }
409
410                        throw new FramsticksException().msg("unknown command keyword").arg("keyword", keyword);
411                } catch (FramsticksException e) {
412                        throw new FramsticksException().msg("failed to process message").arg("starting with line", header).cause(e);
413                }
414        }
415
416        protected final ExceptionHandler closeOnFailure = new ExceptionHandler() {
417
418                @Override
419                public void handle(FramsticksException exception) {
420                        interruptJoinable();
421                        // finish();
422                }
423        };
424
425        @Override
426        protected void receiverThreadRoutine() {
427                startClientConnection(this);
428
429                sendQueryVersion(1, new Future<Void>(closeOnFailure) {
430
431                        @Override
432                        protected void result(Void result) {
433                                synchronized (applicationRequestsBuffer) {
434                                        isHandshakeDone = true;
435                                        for (Runnable r : applicationRequestsBuffer) {
436                                                r.run();
437                                        }
438                                        applicationRequestsBuffer.clear();
439                                }
440                        }
441                });
442
443                processInputBatchesUntilClosed();
444        }
445
446        protected void processNextInputBatch() {
447                processMessageStartingWith(getLine());
448        }
449
450        protected final Map<String, EventListener<File>> registeredListeners = new HashMap<>();
451
452        public <C> void addListener(String path, final EventListener<File> listener, final Dispatcher<C> dispatcher, final FutureHandler<Void> future) {
453                send(new RegisterRequest().path(path), dispatcher, new ClientSideResponseFuture(future) {
454                        @Override
455                        protected void processOk(Response response) {
456                                synchronized (registeredListeners) {
457                                        registeredListeners.put(Path.validateString(response.getComment()), listener);
458                                }
459                                future.pass(null);
460                        }
461                });
462        }
463
464        public <C> void removeListener(EventListener<File> listener, final Dispatcher<C> dispatcher, final FutureHandler<Void> future) {
465                String eventPath = null;
466                synchronized (registeredListeners) {
467                        for (Map.Entry<String, EventListener<File>> e : registeredListeners.entrySet()) {
468                                if (e.getValue() == listener) {
469                                        eventPath = e.getKey();
470                                        break;
471                                }
472                        }
473                }
474                if (eventPath == null) {
475                        future.handle(new FramsticksException().msg("listener is not registered").arg("listener", listener));
476                        return;
477                }
478
479                final String finalEventPath = eventPath;
480                //TODO add arguments to the exception
481                send(new CallRequest().procedure("remove").path(eventPath), dispatcher, new ClientSideResponseFuture(future) {
482
483                        @Override
484                        protected void processOk(Response response) {
485                                synchronized (registeredListeners) {
486                                        registeredListeners.remove(finalEventPath);
487                                }
488                                future.pass(null);
489                        }
490                });
491        }
492}
Note: See TracBrowser for help on using the repository browser.