source: java/main/src/main/java/com/framsticks/communication/ClientSideManagedConnection.java @ 102

Last change on this file since 102 was 102, checked in by psniegowski, 11 years ago

HIGHLIGHTS:

for Joinables running

CHANGELOG:
Add WorkPackageLogic? and classes representing prime experiment state.

Add classes for PrimeExperiment? state.

Extract single netload routine in Simulator.

Working netload with dummy content in PrimeExperiment?.

More development with NetLoadSaveLogic? and PrimeExperiment?.

Improvement around prime.

Improve BufferedDispatcher?.isActive logic.

Add prime-all.xml configuration.

Manual connecting to existing simulators from GUI.

Guard in SimulatorConnector? against expdef mismatch.

Guard against empty target dispatcher in BufferedDispatcher?.

Make BufferedDispatcher? a Dispatcher (and Joinable).

Minor improvements.

Done StackedJoinable?, improve Experiment.

Develop StackedJoinable?.

Add StackedJoinable? utility joinables controller.

Add dependency on apache-commons-lang.

Add ready ListChange? on Simulators.

Improve hints in ListChange?.

Several improvements.

Found bug with dispatching in Experiment.

Minor improvements.

Fix bug with early finishing Server.

Many changes in Dispatching.

Fix bug with connection.

Do not obfuscate log with socket related exceptions.

Add SocketClosedException?.

Add SimulatorConnector?.

Work out conception of experiment composing of logics building blocks.

Rename SinkInterface? to Sink.

Move saving of Accesses into AccessOperations?.

Some improvements to Experiment.

Improve joinables.

Fix issue with joinables closing.

Add direct and managed consoles to popup menu.

File size: 13.3 KB
Line 
1package com.framsticks.communication;
2
3import com.framsticks.communication.queries.ApplicationRequest;
4import com.framsticks.communication.queries.CallRequest;
5import com.framsticks.communication.queries.NeedFile;
6import com.framsticks.communication.queries.NeedFileAcceptor;
7import com.framsticks.communication.queries.ProtocolRequest;
8import com.framsticks.communication.queries.RegisterRequest;
9import com.framsticks.communication.queries.UseRequest;
10import com.framsticks.communication.queries.VersionRequest;
11import com.framsticks.core.Path;
12import com.framsticks.params.ListSource;
13import com.framsticks.util.*;
14import com.framsticks.util.dispatching.AtOnceDispatcher;
15import com.framsticks.util.dispatching.Dispatcher;
16import com.framsticks.util.dispatching.Dispatching;
17import com.framsticks.util.dispatching.ExceptionResultHandler;
18import com.framsticks.util.dispatching.Future;
19import com.framsticks.util.dispatching.FutureHandler;
20import com.framsticks.util.dispatching.JoinableState;
21import com.framsticks.util.lang.Casting;
22import com.framsticks.util.lang.Pair;
23import com.framsticks.util.lang.Strings;
24import com.framsticks.params.EventListener;
25
26import org.apache.logging.log4j.Logger;
27import org.apache.logging.log4j.LogManager;
28
29import java.util.*;
30import java.util.regex.Matcher;
31
32import javax.annotation.Nonnull;
33import javax.annotation.Nullable;
34
35import com.framsticks.util.dispatching.RunAt;
36
37/**
38 * @author Piotr Sniegowski
39 */
40public class ClientSideManagedConnection extends ManagedConnection {
41
42        private final static Logger log = LogManager.getLogger(ClientSideManagedConnection.class);
43
44        private final List<Runnable> applicationRequestsBuffer = new LinkedList<>();
45        private boolean isHandshakeDone = false;
46
47        protected NeedFileAcceptor needFileAcceptor;
48
49        /**
50         * @return the needFileAcceptor
51         */
52        public NeedFileAcceptor getNeedFileAcceptor() {
53                return needFileAcceptor;
54        }
55
56        /**
57         * @param needFileAcceptor the needFileAcceptor to set
58         */
59        public void setNeedFileAcceptor(NeedFileAcceptor needFileAcceptor) {
60                this.needFileAcceptor = needFileAcceptor;
61        }
62
63        /**
64         * @return the requestedVersion
65         */
66        public int getRequestedVersion() {
67                return requestedVersion;
68        }
69
70        /**
71         * @param requestedVersion the requestedVersion to set
72         */
73        public void setRequestedVersion(int requestedVersion) {
74                this.requestedVersion = requestedVersion;
75        }
76
77        protected int requestedVersion = 4;
78
79        public ClientSideManagedConnection() {
80                setDescription("client connection");
81                protocolVersion = -1;
82        }
83
84        protected List<String> readFileContent() {
85                List<String> content = new LinkedList<String>();
86                String line;
87                boolean longValue = false;
88                while (true) {
89                        line = getLine();
90                        if (longValue) {
91                                if (line.endsWith("~") && !line.endsWith("\\~")) {
92                                        longValue = false;
93                                }
94                        } else {
95                                if (line.equals("eof")) {
96                                        break;
97                                }
98                                if (line.endsWith(":~")) {
99                                        longValue = true;
100                                }
101                        }
102                        content.add(line);
103                }
104                return content;
105        }
106
107        private static class SentQuery<C> {
108
109                Request request;
110                ClientSideResponseFuture callback;
111                Dispatcher<C> dispatcher;
112                protected final List<File> files = new ArrayList<File>();
113
114                public List<File> getFiles() {
115                        return files;
116                }
117
118                @Override
119                public String toString() {
120                        return request.toString();
121                }
122
123                public void dispatchResponseProcess(final Response response) {
124                        Dispatching.dispatchIfNotActive(dispatcher, new RunAt<C>(callback) {
125                                @Override
126                                protected void runAt() {
127                                        callback.pass(response);
128                                }
129                        });
130                }
131        }
132
133        public void send(ProtocolRequest request, ClientSideResponseFuture callback) {
134                sendImplementation(request, AtOnceDispatcher.getInstance(), callback);
135        }
136
137
138
139        public <C> void send(final ApplicationRequest request, final Dispatcher<C> dispatcher, final ClientSideResponseFuture callback) {
140                synchronized (applicationRequestsBuffer) {
141                        if (!isHandshakeDone) {
142                                applicationRequestsBuffer.add(new Runnable() {
143                                        @Override
144                                        public void run() {
145                                                sendImplementation(request, dispatcher, callback);
146                                        }
147                                });
148                                return;
149                        }
150                }
151                sendImplementation(request, dispatcher, callback);
152        }
153
154        private <C> void sendImplementation(Request request, Dispatcher<C> dispatcher, ClientSideResponseFuture callback) {
155                callback.setRequest(request);
156
157                if (getState().ordinal() > JoinableState.RUNNING.ordinal()) {
158                        throw new FramsticksException().msg("connection is not connected").arg("connection", this);
159                }
160
161                final SentQuery<C> sentQuery = new SentQuery<C>();
162                sentQuery.request = request;
163                sentQuery.callback = callback;
164                sentQuery.dispatcher = dispatcher;
165
166
167                senderThread.dispatch(new RunAt<Connection>(callback) {
168                        @Override
169                        protected void runAt() {
170                                Integer id = sentQueries.put(null, sentQuery);
171
172                                String command = sentQuery.request.getCommand();
173                                StringBuilder message = new StringBuilder();
174                                message.append(command);
175                                if (id != null) {
176                                        message.append(" ").append(id);
177                                }
178                                message.append(" ");
179                                sentQuery.request.construct(message);
180                                String out = message.toString();
181
182                                putLine(out);
183                                flushOut();
184                                log.debug("sending query: {}", out);
185                        }
186                });
187        }
188
189        @Override
190        public String toString() {
191                return "client connection " + address;
192        }
193
194        private void sendQueryVersion(final int version, final Future<Void> future) {
195                send(new VersionRequest().version(version), new ClientSideResponseFuture(future) {
196                        @Override
197                        protected void processOk(Response response) {
198                                protocolVersion = version;
199                                if (version < requestedVersion) {
200                                        /** it is an implicit loop here*/
201                                        sendQueryVersion(version + 1, future);
202                                        return;
203                                }
204                                send(new UseRequest().feature("request_id"), new ClientSideResponseFuture(future) {
205
206                                        @Override
207                                        protected void processOk(Response response) {
208                                                requestIdEnabled = true;
209                                                future.pass(null);
210                                        }
211                                });
212
213                        }
214                });
215        }
216
217        protected class IdCollection<T> {
218
219
220                protected final Map<Integer, T> map = new HashMap<>();
221                protected T current;
222
223                public Integer put(Integer idProposition, T value) {
224                        synchronized (ClientSideManagedConnection.this) {
225                                while (!(requestIdEnabled || current == null)) {
226                                        try {
227                                                ClientSideManagedConnection.this.wait();
228                                        } catch (InterruptedException ignored) {
229                                                break;
230                                        }
231                                }
232                                if (!requestIdEnabled) {
233                                        current = value;
234                                        return null;
235                                }
236                                if (idProposition == null) {
237                                        idProposition = nextQueryId++;
238                                }
239                                map.put(idProposition, value);
240                                return idProposition;
241                        }
242                }
243
244                public void clear(Integer id) {
245                        if (requestIdEnabled) {
246                                current = null;
247                        } else {
248                                map.remove(id);
249                        }
250                }
251
252                public @Nonnull T fetch(@Nullable Integer id, boolean remove) {
253                        synchronized (ClientSideManagedConnection.this) {
254                                try {
255                                        if (id == null) {
256                                                if (requestIdEnabled) {
257                                                        throw new FramsticksException().msg("request_id is enabled and id is missing");
258                                                }
259                                                T result = current;
260                                                current = null;
261                                                ClientSideManagedConnection.this.notifyAll();
262                                                return result;
263                                        }
264                                        if (!map.containsKey(id)) {
265                                                throw new FramsticksException().msg("id is unknown").arg("id", id);
266                                        }
267
268                                        T result = map.get(id);
269                                        if (remove) {
270                                                map.remove(id);
271                                        }
272                                        return result;
273
274                                } catch (FramsticksException e) {
275                                        throw new FramsticksException().msg("failed to match response to sent query").cause(e);
276                                }
277                        }
278                }
279        }
280
281        protected IdCollection<SentQuery<?>> sentQueries = new IdCollection<>();
282        protected IdCollection<NeedFile> needFiles = new IdCollection<>();
283
284        private int nextQueryId = 0;
285
286        protected void processEvent(String rest) {
287                Matcher matcher = Request.EVENT_PATTERN.matcher(rest);
288                if (!matcher.matches()) {
289                        throw new FramsticksException().msg("invalid event line").arg("rest", rest);
290                }
291                String fileLine = getLine();
292                if (!fileLine.equals("file")) {
293                        throw new FramsticksException().msg("expected file line").arg("got", fileLine);
294                }
295                String eventObjectPath = Strings.takeGroup(rest, matcher, 1).toString();
296                String eventCalleePath = Strings.takeGroup(rest, matcher, 2).toString();
297                final File file = new File("", new ListSource(readFileContent()));
298                log.debug("firing event {}", eventObjectPath);
299                EventListener<File> listener;
300                synchronized (registeredListeners) {
301                        listener = registeredListeners.get(eventObjectPath);
302                }
303                if (listener == null) {
304                        throw new FramsticksException().msg("failed to find registered event").arg("event path", eventObjectPath).arg("object", eventCalleePath);
305                }
306                listener.action(file);
307        }
308
309        protected void processNeedFile(Pair<Integer, CharSequence> rest) {
310                final Integer id = rest.first;
311                String suggestedName = null;
312                String description = null;
313                Pair<CharSequence, CharSequence> s = Request.takeString(rest.second);
314                if (s != null) {
315                        suggestedName = s.first.toString();
316                        Pair<CharSequence, CharSequence> d = Request.takeString(s.second);
317                        if (d != null) {
318                                description = d.first.toString();
319                        }
320                }
321
322                final Future<File> future = new Future<File>() {
323
324                        protected void send(final File result) {
325                                log.info("sending file: " + result);
326                                needFiles.clear(id);
327                                sendFile(null, result, id, ClientSideManagedConnection.this);
328
329                        }
330
331                        @Override
332                        protected void result(File result) {
333                                send(result);
334                        }
335
336                        @Override
337                        public void handle(FramsticksException exception) {
338                                send(new File("", ListSource.createFrom("# invalid", "# " + exception.getMessage())));
339                        }
340                };
341
342                NeedFile needFile = new NeedFile(suggestedName, description, future);
343
344                if (needFileAcceptor.acceptNeed(needFile)) {
345                        return;
346                }
347
348                future.handle(new FramsticksException().msg("acceptor did not accepted need"));
349        }
350
351        protected void processFile(Pair<Integer, CharSequence> rest) {
352                final SentQuery<?> sentQuery = sentQueries.fetch(rest.first, false);
353
354                String currentFilePath = rest.second.toString();
355                if (!Strings.notEmpty(currentFilePath)) {
356                        currentFilePath = Casting.throwCast(ApplicationRequest.class, sentQuery.request).getPath();
357                }
358
359                sentQuery.files.add(new File(currentFilePath, new ListSource(readFileContent())));
360        }
361
362        protected void processMessageStartingWith(final String header) {
363                try {
364                        final Pair<CharSequence, CharSequence> command = Request.takeIdentifier(header);
365                        if (command == null) {
366                                throw new FramsticksException().msg("failed to parse command");
367                        }
368                        final CharSequence keyword = command.first;
369                        if (keyword.equals("event")) {
370                                processEvent(command.second.toString());
371                                return;
372                        }
373
374                        final Pair<Integer, CharSequence> rest = takeRequestId(command.second);
375                        if (rest == null) {
376                                throw new FramsticksException().msg("failed to parse optional id and remainder");
377                        }
378
379                        if (keyword.equals("file")) {
380                                processFile(rest);
381                                return;
382                        }
383                        if (keyword.equals("ok") || keyword.equals("error")) {
384
385                                final SentQuery<?> sentQuery = sentQueries.fetch(rest.first, true);
386
387                                log.debug("parsing response for request {}", sentQuery);
388
389                                sentQuery.dispatchResponseProcess(new Response(command.first.equals("ok"), rest.second.toString(), sentQuery.getFiles()));
390                                return;
391                        }
392                        if (keyword.equals("needfile")) {
393                                processNeedFile(rest);
394                                return;
395                        }
396
397                        throw new FramsticksException().msg("unknown command keyword").arg("keyword", keyword);
398                } catch (FramsticksException e) {
399                        throw new FramsticksException().msg("failed to process message").arg("starting with line", header).cause(e);
400                }
401        }
402
403        protected final ExceptionResultHandler closeOnFailure = new ExceptionResultHandler() {
404
405                @Override
406                public void handle(FramsticksException exception) {
407                        interruptJoinable();
408                        // finish();
409                }
410        };
411
412        @Override
413        protected void receiverThreadRoutine() {
414                startClientConnection(this);
415
416                sendQueryVersion(1, new FutureHandler<Void>(closeOnFailure) {
417
418                        @Override
419                        protected void result(Void result) {
420                                synchronized (applicationRequestsBuffer) {
421                                        isHandshakeDone = true;
422                                        for (Runnable r : applicationRequestsBuffer) {
423                                                r.run();
424                                        }
425                                        applicationRequestsBuffer.clear();
426                                }
427                        }
428                });
429
430                processInputBatchesUntilClosed();
431        }
432
433        protected void processNextInputBatch() {
434                processMessageStartingWith(getLine());
435        }
436
437        protected final Map<String, EventListener<File>> registeredListeners = new HashMap<>();
438
439        public <C> void addListener(String path, final EventListener<File> listener, final Dispatcher<C> dispatcher, final Future<Void> future) {
440                send(new RegisterRequest().path(path), dispatcher, new ClientSideResponseFuture(future) {
441                        @Override
442                        protected void processOk(Response response) {
443                                synchronized (registeredListeners) {
444                                        registeredListeners.put(Path.validateString(response.getComment()), listener);
445                                }
446                                future.pass(null);
447                        }
448                });
449        }
450
451        public <C> void removeListener(EventListener<File> listener, final Dispatcher<C> dispatcher, final Future<Void> future) {
452                String eventPath = null;
453                synchronized (registeredListeners) {
454                        for (Map.Entry<String, EventListener<File>> e : registeredListeners.entrySet()) {
455                                if (e.getValue() == listener) {
456                                        eventPath = e.getKey();
457                                        break;
458                                }
459                        }
460                }
461                if (eventPath == null) {
462                        future.handle(new FramsticksException().msg("listener is not registered").arg("listener", listener));
463                        return;
464                }
465
466                final String finalEventPath = eventPath;
467                //TODO add arguments to the exception
468                send(new CallRequest().procedure("remove").path(eventPath), dispatcher, new ClientSideResponseFuture(future) {
469
470                        @Override
471                        protected void processOk(Response response) {
472                                synchronized (registeredListeners) {
473                                        registeredListeners.remove(finalEventPath);
474                                }
475                                future.pass(null);
476                        }
477                });
478        }
479}
Note: See TracBrowser for help on using the repository browser.