source: java/main/src/main/java/com/framsticks/communication/ClientSideManagedConnection.java @ 193

Last change on this file since 193 was 193, checked in by Maciej Komosinski, 10 years ago

Set svn:eol-style native for all textual files

  • Property svn:eol-style set to native
File size: 13.7 KB
RevLine 
[97]1package com.framsticks.communication;
2
3import com.framsticks.communication.queries.ApplicationRequest;
[99]4import com.framsticks.communication.queries.CallRequest;
[101]5import com.framsticks.communication.queries.NeedFile;
6import com.framsticks.communication.queries.NeedFileAcceptor;
[97]7import com.framsticks.communication.queries.ProtocolRequest;
[99]8import com.framsticks.communication.queries.RegisterRequest;
[97]9import com.framsticks.communication.queries.UseRequest;
10import com.framsticks.communication.queries.VersionRequest;
11import com.framsticks.params.ListSource;
[105]12import com.framsticks.structure.Path;
[97]13import com.framsticks.util.*;
14import com.framsticks.util.dispatching.AtOnceDispatcher;
15import com.framsticks.util.dispatching.Dispatcher;
16import com.framsticks.util.dispatching.Dispatching;
[105]17import com.framsticks.util.dispatching.FutureHandler;
[97]18import com.framsticks.util.dispatching.Future;
19import com.framsticks.util.dispatching.JoinableState;
[100]20import com.framsticks.util.lang.Casting;
[97]21import com.framsticks.util.lang.Pair;
22import com.framsticks.util.lang.Strings;
[99]23import com.framsticks.params.EventListener;
[97]24
[100]25import org.apache.logging.log4j.Logger;
26import org.apache.logging.log4j.LogManager;
[97]27
28import java.util.*;
29import java.util.regex.Matcher;
[100]30
31import javax.annotation.Nonnull;
32import javax.annotation.Nullable;
33
[97]34import com.framsticks.util.dispatching.RunAt;
35
36/**
37 * @author Piotr Sniegowski
38 */
39public class ClientSideManagedConnection extends ManagedConnection {
40
[100]41        private final static Logger log = LogManager.getLogger(ClientSideManagedConnection.class);
[97]42
43        private final List<Runnable> applicationRequestsBuffer = new LinkedList<>();
44        private boolean isHandshakeDone = false;
45
[101]46        protected NeedFileAcceptor needFileAcceptor;
[99]47
[97]48        /**
[101]49         * @return the needFileAcceptor
50         */
51        public NeedFileAcceptor getNeedFileAcceptor() {
52                return needFileAcceptor;
53        }
54
55        /**
56         * @param needFileAcceptor the needFileAcceptor to set
57         */
58        public void setNeedFileAcceptor(NeedFileAcceptor needFileAcceptor) {
59                this.needFileAcceptor = needFileAcceptor;
60        }
61
62        /**
[97]63         * @return the requestedVersion
64         */
65        public int getRequestedVersion() {
66                return requestedVersion;
67        }
68
69        /**
70         * @param requestedVersion the requestedVersion to set
71         */
72        public void setRequestedVersion(int requestedVersion) {
73                this.requestedVersion = requestedVersion;
74        }
75
76        protected int requestedVersion = 4;
77
78        public ClientSideManagedConnection() {
79                setDescription("client connection");
80                protocolVersion = -1;
[103]81                requestedFeatures.add("request_id");
[107]82                // requestedFeatures.add("call_empty_result");
[103]83                requestedFeatures.add("needfile_id");
[97]84        }
85
[99]86        protected List<String> readFileContent() {
87                List<String> content = new LinkedList<String>();
88                String line;
[100]89                boolean longValue = false;
90                while (true) {
91                        line = getLine();
92                        if (longValue) {
93                                if (line.endsWith("~") && !line.endsWith("\\~")) {
94                                        longValue = false;
95                                }
96                        } else {
97                                if (line.equals("eof")) {
98                                        break;
99                                }
100                                if (line.endsWith(":~")) {
101                                        longValue = true;
102                                }
103                        }
[99]104                        content.add(line);
[97]105                }
[99]106                return content;
[97]107        }
108
[100]109        private static class SentQuery<C> {
110
[97]111                Request request;
112                ClientSideResponseFuture callback;
113                Dispatcher<C> dispatcher;
[100]114                protected final List<File> files = new ArrayList<File>();
[97]115
[100]116                public List<File> getFiles() {
117                        return files;
[97]118                }
119
120                @Override
121                public String toString() {
122                        return request.toString();
123                }
124
125                public void dispatchResponseProcess(final Response response) {
126                        Dispatching.dispatchIfNotActive(dispatcher, new RunAt<C>(callback) {
127                                @Override
128                                protected void runAt() {
129                                        callback.pass(response);
130                                }
131                        });
132                }
133        }
134
135        public void send(ProtocolRequest request, ClientSideResponseFuture callback) {
136                sendImplementation(request, AtOnceDispatcher.getInstance(), callback);
137        }
138
139
140
141        public <C> void send(final ApplicationRequest request, final Dispatcher<C> dispatcher, final ClientSideResponseFuture callback) {
142                synchronized (applicationRequestsBuffer) {
143                        if (!isHandshakeDone) {
144                                applicationRequestsBuffer.add(new Runnable() {
145                                        @Override
146                                        public void run() {
147                                                sendImplementation(request, dispatcher, callback);
148                                        }
149                                });
150                                return;
151                        }
152                }
153                sendImplementation(request, dispatcher, callback);
154        }
155
156        private <C> void sendImplementation(Request request, Dispatcher<C> dispatcher, ClientSideResponseFuture callback) {
[98]157                callback.setRequest(request);
[97]158
159                if (getState().ordinal() > JoinableState.RUNNING.ordinal()) {
[100]160                        throw new FramsticksException().msg("connection is not connected").arg("connection", this);
[97]161                }
162
163                final SentQuery<C> sentQuery = new SentQuery<C>();
164                sentQuery.request = request;
165                sentQuery.callback = callback;
166                sentQuery.dispatcher = dispatcher;
167
[101]168
[97]169                senderThread.dispatch(new RunAt<Connection>(callback) {
170                        @Override
171                        protected void runAt() {
[101]172                                Integer id = sentQueries.put(null, sentQuery);
[97]173
174                                String command = sentQuery.request.getCommand();
175                                StringBuilder message = new StringBuilder();
176                                message.append(command);
177                                if (id != null) {
178                                        message.append(" ").append(id);
179                                }
180                                message.append(" ");
181                                sentQuery.request.construct(message);
182                                String out = message.toString();
183
184                                putLine(out);
185                                flushOut();
[100]186                                log.debug("sending query: {}", out);
[97]187                        }
188                });
189        }
190
191        @Override
192        public String toString() {
193                return "client connection " + address;
194        }
195
[105]196        private void sendNextUseRequest(final Iterator<String> featuresIterator, final FutureHandler<Void> future) {
[103]197                if (!featuresIterator.hasNext()) {
198                        future.pass(null);
199                        return;
200                }
201                final String feature = featuresIterator.next();
202
203                send(new UseRequest().feature(feature), new ClientSideResponseFuture(future) {
204
205                        @Override
206                        protected void processOk(Response response) {
207                                if (feature.equals("request_id")) {
208                                        requestIdEnabled = true;
209                                }
210                                sendNextUseRequest(featuresIterator, future);
211                        }
212                });
213        }
214
[105]215        private void sendQueryVersion(final int version, final FutureHandler<Void> future) {
[97]216                send(new VersionRequest().version(version), new ClientSideResponseFuture(future) {
217                        @Override
218                        protected void processOk(Response response) {
219                                protocolVersion = version;
220                                if (version < requestedVersion) {
221                                        /** it is an implicit loop here*/
222                                        sendQueryVersion(version + 1, future);
223                                        return;
224                                }
[103]225                                sendNextUseRequest(requestedFeatures.iterator(), future);
[97]226
227                        }
228                });
229        }
230
[101]231        protected class IdCollection<T> {
232
233
234                protected final Map<Integer, T> map = new HashMap<>();
235                protected T current;
236
237                public Integer put(Integer idProposition, T value) {
238                        synchronized (ClientSideManagedConnection.this) {
239                                while (!(requestIdEnabled || current == null)) {
240                                        try {
241                                                ClientSideManagedConnection.this.wait();
242                                        } catch (InterruptedException ignored) {
243                                                break;
244                                        }
[100]245                                }
[101]246                                if (!requestIdEnabled) {
247                                        current = value;
248                                        return null;
[100]249                                }
[101]250                                if (idProposition == null) {
251                                        idProposition = nextQueryId++;
252                                }
253                                map.put(idProposition, value);
254                                return idProposition;
[97]255                        }
[101]256                }
[100]257
[101]258                public void clear(Integer id) {
259                        if (requestIdEnabled) {
260                                current = null;
261                        } else {
262                                map.remove(id);
[97]263                        }
[101]264                }
[100]265
[101]266                public @Nonnull T fetch(@Nullable Integer id, boolean remove) {
267                        synchronized (ClientSideManagedConnection.this) {
268                                try {
269                                        if (id == null) {
270                                                if (requestIdEnabled) {
271                                                        throw new FramsticksException().msg("request_id is enabled and id is missing");
272                                                }
273                                                T result = current;
274                                                current = null;
275                                                ClientSideManagedConnection.this.notifyAll();
276                                                return result;
277                                        }
278                                        if (!map.containsKey(id)) {
279                                                throw new FramsticksException().msg("id is unknown").arg("id", id);
280                                        }
281
282                                        T result = map.get(id);
283                                        if (remove) {
284                                                map.remove(id);
285                                        }
286                                        return result;
287
288                                } catch (FramsticksException e) {
289                                        throw new FramsticksException().msg("failed to match response to sent query").cause(e);
290                                }
[97]291                        }
292                }
293        }
294
[101]295        protected IdCollection<SentQuery<?>> sentQueries = new IdCollection<>();
296        protected IdCollection<NeedFile> needFiles = new IdCollection<>();
297
[97]298        private int nextQueryId = 0;
299
300        protected void processEvent(String rest) {
301                Matcher matcher = Request.EVENT_PATTERN.matcher(rest);
302                if (!matcher.matches()) {
[99]303                        throw new FramsticksException().msg("invalid event line").arg("rest", rest);
[97]304                }
[99]305                String fileLine = getLine();
306                if (!fileLine.equals("file")) {
307                        throw new FramsticksException().msg("expected file line").arg("got", fileLine);
[97]308                }
[101]309                String eventObjectPath = Strings.takeGroup(rest, matcher, 1).toString();
310                String eventCalleePath = Strings.takeGroup(rest, matcher, 2).toString();
[99]311                final File file = new File("", new ListSource(readFileContent()));
[100]312                log.debug("firing event {}", eventObjectPath);
[99]313                EventListener<File> listener;
314                synchronized (registeredListeners) {
315                        listener = registeredListeners.get(eventObjectPath);
316                }
[101]317                if (listener == null) {
[99]318                        throw new FramsticksException().msg("failed to find registered event").arg("event path", eventObjectPath).arg("object", eventCalleePath);
319                }
320                listener.action(file);
[97]321        }
322
[101]323        protected void processNeedFile(Pair<Integer, CharSequence> rest) {
324                final Integer id = rest.first;
325                String suggestedName = null;
326                String description = null;
327                Pair<CharSequence, CharSequence> s = Request.takeString(rest.second);
328                if (s != null) {
329                        suggestedName = s.first.toString();
330                        Pair<CharSequence, CharSequence> d = Request.takeString(s.second);
331                        if (d != null) {
332                                description = d.first.toString();
333                        }
334                }
335
[105]336                final FutureHandler<File> future = new FutureHandler<File>() {
[101]337
338                        protected void send(final File result) {
[103]339                                log.debug("sending file: " + result);
[101]340                                needFiles.clear(id);
341                                sendFile(null, result, id, ClientSideManagedConnection.this);
342                        }
343
344                        @Override
345                        protected void result(File result) {
346                                send(result);
347                        }
348
349                        @Override
350                        public void handle(FramsticksException exception) {
351                                send(new File("", ListSource.createFrom("# invalid", "# " + exception.getMessage())));
352                        }
353                };
354
355                NeedFile needFile = new NeedFile(suggestedName, description, future);
356
357                if (needFileAcceptor.acceptNeed(needFile)) {
358                        return;
359                }
360
361                future.handle(new FramsticksException().msg("acceptor did not accepted need"));
362        }
363
[100]364        protected void processFile(Pair<Integer, CharSequence> rest) {
[101]365                final SentQuery<?> sentQuery = sentQueries.fetch(rest.first, false);
[100]366
367                String currentFilePath = rest.second.toString();
368                if (!Strings.notEmpty(currentFilePath)) {
369                        currentFilePath = Casting.throwCast(ApplicationRequest.class, sentQuery.request).getPath();
370                }
371
372                sentQuery.files.add(new File(currentFilePath, new ListSource(readFileContent())));
373        }
374
375        protected void processMessageStartingWith(final String header) {
[97]376                try {
[100]377                        final Pair<CharSequence, CharSequence> command = Request.takeIdentifier(header);
378                        if (command == null) {
379                                throw new FramsticksException().msg("failed to parse command");
380                        }
381                        final CharSequence keyword = command.first;
382                        if (keyword.equals("event")) {
[97]383                                processEvent(command.second.toString());
384                                return;
385                        }
386
[100]387                        final Pair<Integer, CharSequence> rest = takeRequestId(command.second);
388                        if (rest == null) {
389                                throw new FramsticksException().msg("failed to parse optional id and remainder");
390                        }
391
392                        if (keyword.equals("file")) {
393                                processFile(rest);
[97]394                                return;
395                        }
[100]396                        if (keyword.equals("ok") || keyword.equals("error")) {
[97]397
[101]398                                final SentQuery<?> sentQuery = sentQueries.fetch(rest.first, true);
[100]399
400                                log.debug("parsing response for request {}", sentQuery);
401
402                                sentQuery.dispatchResponseProcess(new Response(command.first.equals("ok"), rest.second.toString(), sentQuery.getFiles()));
[97]403                                return;
404                        }
[101]405                        if (keyword.equals("needfile")) {
406                                processNeedFile(rest);
407                                return;
408                        }
[97]409
[100]410                        throw new FramsticksException().msg("unknown command keyword").arg("keyword", keyword);
[97]411                } catch (FramsticksException e) {
[100]412                        throw new FramsticksException().msg("failed to process message").arg("starting with line", header).cause(e);
[97]413                }
414        }
415
[105]416        protected final ExceptionHandler closeOnFailure = new ExceptionHandler() {
[97]417
418                @Override
419                public void handle(FramsticksException exception) {
[101]420                        interruptJoinable();
[97]421                        // finish();
422                }
423        };
424
425        @Override
426        protected void receiverThreadRoutine() {
427                startClientConnection(this);
428
[105]429                sendQueryVersion(1, new Future<Void>(closeOnFailure) {
[97]430
431                        @Override
432                        protected void result(Void result) {
433                                synchronized (applicationRequestsBuffer) {
434                                        isHandshakeDone = true;
435                                        for (Runnable r : applicationRequestsBuffer) {
436                                                r.run();
437                                        }
438                                        applicationRequestsBuffer.clear();
439                                }
440                        }
441                });
442
443                processInputBatchesUntilClosed();
444        }
445
446        protected void processNextInputBatch() {
447                processMessageStartingWith(getLine());
448        }
449
[99]450        protected final Map<String, EventListener<File>> registeredListeners = new HashMap<>();
451
[105]452        public <C> void addListener(String path, final EventListener<File> listener, final Dispatcher<C> dispatcher, final FutureHandler<Void> future) {
[99]453                send(new RegisterRequest().path(path), dispatcher, new ClientSideResponseFuture(future) {
454                        @Override
455                        protected void processOk(Response response) {
456                                synchronized (registeredListeners) {
457                                        registeredListeners.put(Path.validateString(response.getComment()), listener);
458                                }
459                                future.pass(null);
460                        }
461                });
462        }
463
[105]464        public <C> void removeListener(EventListener<File> listener, final Dispatcher<C> dispatcher, final FutureHandler<Void> future) {
[99]465                String eventPath = null;
466                synchronized (registeredListeners) {
467                        for (Map.Entry<String, EventListener<File>> e : registeredListeners.entrySet()) {
468                                if (e.getValue() == listener) {
469                                        eventPath = e.getKey();
470                                        break;
471                                }
472                        }
473                }
474                if (eventPath == null) {
475                        future.handle(new FramsticksException().msg("listener is not registered").arg("listener", listener));
476                        return;
477                }
478
479                final String finalEventPath = eventPath;
[102]480                //TODO add arguments to the exception
[99]481                send(new CallRequest().procedure("remove").path(eventPath), dispatcher, new ClientSideResponseFuture(future) {
482
483                        @Override
484                        protected void processOk(Response response) {
485                                synchronized (registeredListeners) {
486                                        registeredListeners.remove(finalEventPath);
487                                }
488                                future.pass(null);
489                        }
490                });
491        }
[97]492}
Note: See TracBrowser for help on using the repository browser.