source: java/main/src/main/java/com/framsticks/communication/ClientConnection.java @ 88

Last change on this file since 88 was 88, checked in by psniegowski, 11 years ago

HIGHLIGHTS:

  • loading f0 schema with XmlLoader?
  • use XmlLoader? to load configuration
  • introduce unified fork-join model of various entities

(Instances, Connections, GUI Frames, etc.),
all those entities clean up gracefully on
shutdown, which may be initialized by user
or by some entity

  • basing on above, simplify several organizing classes

(Observer, main class)

(to host native frams server process from Java level)

CHANGELOG:
Remove redundant Observer class.

Clean up in AbstractJoinable?.

Update ExternalProcess? class to changes in joining model.

Another sweep through code with FindBugs?.

Find bug with not joining RemoteInstance?.

Joining almost works.

Much improved joining model.

More improvement to joining model.

Add logging messages around joinable operations.

Rename methods in AbstractJoinable?.

Improve Joinable.

Rewrite of entity structure.

More simplifications with entities.

Further improve joinables.

Let Frame compose from JFrame instead of inheriting.

Add join classes.

Improvements of closing.

Add Builder interface.

Add FramsServerTest?.xml

FramsServer? may be configured through xml.

Make Framsticks main class an Observer of Entities.

Make Observer a generic type.

Remove variables regarding to removed endpoint.

Simplify observer (remove endpoints).

More changes to Observer and Endpoint.

Minor improvements.

Add OutputListener? to ExternalProcess?.

Improve testing of ExternalProcess?.

Add ExternalProcess? runner.

Rename the Program class to Framsticks.

Migrate Program to use XmlLoader? configuration.

First steps with configuration using XmlLoader?.

Fix several bugs.

Move all f0 classes to apriopriate package.

XmlLoader? is able to load Schema.

XmlLoader? is loading classes and props.

Add GroupBuilder?.

File size: 10.2 KB
Line 
1package com.framsticks.communication;
2
3import com.framsticks.communication.queries.ApplicationRequest;
4import com.framsticks.communication.queries.RegistrationRequest;
5import com.framsticks.communication.queries.UseRequest;
6import com.framsticks.communication.queries.VersionRequest;
7import com.framsticks.communication.util.LoggingStateCallback;
8import com.framsticks.params.ListSource;
9import com.framsticks.util.*;
10import com.framsticks.util.dispatching.AtOnceDispatcher;
11import com.framsticks.util.dispatching.Dispatcher;
12import com.framsticks.util.dispatching.Dispatching;
13import com.framsticks.util.lang.Pair;
14import com.framsticks.util.lang.Strings;
15import org.apache.log4j.Logger;
16
17import java.io.IOException;
18import java.net.Socket;
19import java.net.SocketException;
20import java.util.*;
21import java.util.regex.Matcher;
22import java.util.regex.Pattern;
23import com.framsticks.util.dispatching.RunAt;
24
25/**
26 * @author Piotr Sniegowski
27 */
28public class ClientConnection extends Connection {
29
30        private final static Logger log = Logger.getLogger(ClientConnection.class);
31
32        protected final Map<String, Subscription<?>> subscriptions = new HashMap<>();
33
34        /**
35         * @param connectedFunctor the connectedFunctor to set
36         */
37        public void setConnectedFunctor(StateFunctor connectedFunctor) {
38                this.connectedFunctor = connectedFunctor;
39        }
40
41
42        protected StateFunctor connectedFunctor;
43
44        @Override
45        protected void joinableStart() {
46                try {
47                        log.info("connecting to " + address);
48
49                        socket = new Socket(hostName, port);
50
51                        socket.setSoTimeout(500);
52
53                        log.info("connected to " + hostName + ":" + port);
54                        connected = true;
55                        runThreads();
56
57                        connectedFunctor.call(null);
58                } catch (SocketException e) {
59                        log.error("failed to connect: " + e);
60                        connectedFunctor.call(e);
61                } catch (IOException e) {
62                        log.error("buffer creation failure");
63                        connectedFunctor.call(e);
64                        // close();
65                }
66        }
67
68
69        private static abstract class InboundMessage {
70                protected String currentFilePath;
71                protected List<String> currentFileContent;
72                protected final List<File> files = new ArrayList<File>();
73
74                public abstract void eof();
75
76                protected void initCurrentFile(String path) {
77                        currentFileContent = new LinkedList<String>();
78                        currentFilePath = path;
79                }
80                protected void finishCurrentFile() {
81                        if (currentFileContent == null) {
82                                return;
83                        }
84                        files.add(new File(currentFilePath, new ListSource(currentFileContent)));
85                        currentFilePath = null;
86                        currentFileContent= null;
87                }
88
89                public abstract void startFile(String path);
90
91                public void addLine(String line) {
92                        assert line != null;
93                        assert currentFileContent != null;
94                        currentFileContent.add(line.substring(0, line.length() - 1));
95                }
96
97                public List<File> getFiles() {
98                        return files;
99                }
100        }
101
102        private static class EventFire extends InboundMessage {
103                public final Subscription<?> subscription;
104
105                private EventFire(Subscription<?> subscription) {
106                        this.subscription = subscription;
107                }
108
109                public void startFile(String path) {
110                        assert path == null;
111                        initCurrentFile(null);
112                }
113
114                @Override
115                public void eof() {
116                        finishCurrentFile();
117
118                        subscription.dispatchCall(getFiles());
119                }
120        }
121
122        private static class SentQuery<C> extends InboundMessage {
123                Request request;
124                ResponseCallback<? extends C> callback;
125                Dispatcher<C> dispatcher;
126
127                public void startFile(String path) {
128                        finishCurrentFile();
129                        if (path == null) {
130                                assert request instanceof ApplicationRequest;
131                                path = ((ApplicationRequest)request).getPath();
132                        }
133                        initCurrentFile(path);
134                }
135
136                public void eof() {
137                        finishCurrentFile();
138                        //no-operation
139                }
140
141                @Override
142                public String toString() {
143                        return request.toString();
144                }
145
146                public void dispatchResponseProcess(final Response response) {
147                        Dispatching.invokeLaterOrNow(dispatcher, new RunAt<C>() {
148                                @Override
149                                public void run() {
150                                        callback.process(response);
151                                }
152                        });
153                }
154        }
155        private Map<Integer, SentQuery<?>> queryMap = new HashMap<>();
156
157
158        protected final String hostName;
159        protected final int port;
160
161        private static Pattern addressPattern = Pattern.compile("^([^:]*)(:([0-9]+))?$");
162
163        public ClientConnection(String address) {
164                super(address);
165                Matcher matcher = addressPattern.matcher(address);
166                if (!matcher.matches()) {
167                        log.fatal("invalid address: " + address);
168                        hostName = null;
169                        port = 0;
170                        return;
171                }
172                hostName = matcher.group(1);
173                port = matcher.group(3) != null ? Integer.parseInt(matcher.group(3)) : 9009;
174        }
175
176        private SentQuery<?> currentlySentQuery;
177
178
179        public <C extends Connection> void send(Request request, ResponseCallback<C> callback) {
180                //TODO RunAt
181                send(request, AtOnceDispatcher.getInstance(), callback);
182        }
183
184        public <C> void send(Request request, Dispatcher<C> dispatcher, ResponseCallback<? extends C> callback) {
185
186                if (!isConnected()) {
187                        log.fatal("not connected");
188                        return;
189                }
190                final SentQuery<C> sentQuery = new SentQuery<C>();
191                sentQuery.request = request;
192                sentQuery.callback = callback;
193                sentQuery.dispatcher = dispatcher;
194
195                senderThread.invokeLater(new RunAt<Connection>(){
196                        @Override
197                        public void run() {
198                                Integer id;
199                                synchronized (ClientConnection.this) {
200
201                                        while (!(requestIdEnabled || currentlySentQuery == null)) {
202                                                try {
203                                                        ClientConnection.this.wait();
204                                                } catch (InterruptedException ignored) {
205                                                        break;
206                                                }
207                                        }
208                                        if (requestIdEnabled) {
209                                                queryMap.put(nextQueryId, sentQuery);
210                                                id = nextQueryId++;
211                                        } else {
212                                                currentlySentQuery = sentQuery;
213                                                id = null;
214                                        }
215                                }
216                                String command = sentQuery.request.getCommand();
217                                StringBuilder message = new StringBuilder();
218                                message.append(command);
219                                if (id != null) {
220                                        message.append(" ").append(id);
221                                }
222                                sentQuery.request.construct(message);
223                                String out = message.toString();
224
225                                output.println(out);
226                                log.debug("sending query: " + out);
227
228                        }
229                });
230                /*
231                synchronized (this) {
232                        log.debug("queueing query: " + query);
233                        queryQueue.offer(sentQuery);
234                        notifyAll();
235                }
236                */
237        }
238
239
240        @Override
241        public String toString() {
242                return "client connection " + address;
243        }
244
245        public <C> void subscribe(final String path, final Dispatcher<C> dispatcher, final SubscriptionCallback<? extends C> callback) {
246                send(new RegistrationRequest().path(path), new ResponseCallback<Connection>() {
247                        @Override
248                        public void process(Response response) {
249                                if (!response.getOk()) {
250                                        log.error("failed to register on event: " + path);
251                                        callback.subscribed(null);
252                                        return;
253                                }
254                                assert response.getFiles().isEmpty();
255                                Subscription<C> subscription = new Subscription<C>(ClientConnection.this, path, response.getComment(), dispatcher);
256                                log.debug("registered on event: " + subscription);
257                                synchronized (subscriptions) {
258                                        subscriptions.put(subscription.getRegisteredPath(), subscription);
259                                }
260                                subscription.setEventCallback(callback.subscribed(subscription));
261                                if (subscription.getEventCallback() == null) {
262                                        log.info("subscription for " + path + " aborted");
263                                        subscription.unsubscribe(new LoggingStateCallback<C>(log, "abort subscription"));
264                                }
265                        }
266                });
267        }
268
269        public void negotiateProtocolVersion(StateFunctor stateFunctor) {
270                protocolVersion = -1;
271                sendQueryVersion(1, stateFunctor);
272        }
273
274        public void sendQueryVersion(final int version, final StateFunctor stateFunctor) {
275                send(new VersionRequest().version(version), new StateCallback<Connection>() {
276                        @Override
277                        public void call(Exception e) {
278                                if (e != null) {
279                                        log.fatal("failed to upgrade protocol to version: " + version);
280                                        return;
281                                }
282                                protocolVersion = version;
283                                if (version < 4) {
284                                        /** it is an implicit loop here*/
285                                        sendQueryVersion(version + 1, stateFunctor);
286                                        return;
287                                }
288                                send(new UseRequest().feature("request_id"), new StateCallback<Connection>() {
289                                        @Override
290                                        public void call(Exception e) {
291                                                requestIdEnabled = e == null;
292                                                /*
293                                                synchronized (ClientConnection.this) {
294                                                        ClientConnection.this.notifyAll();
295                                                }
296                                                */
297                                                if (!requestIdEnabled) {
298                                                        log.fatal("protocol negotiation failed");
299                                                        stateFunctor.call(new Exception("protocol negotiation failed", e));
300                                                        return;
301                                                }
302                                                stateFunctor.call(null);
303                                        }
304                                });
305
306                        }
307                });
308        }
309
310
311        private synchronized SentQuery<?> fetchQuery(Integer id, boolean remove) {
312                if (id == null) {
313                        if (requestIdEnabled) {
314                                return null;
315                        }
316                        SentQuery<?> result = currentlySentQuery;
317                        if (remove) {
318                                currentlySentQuery = null;
319                                notifyAll();
320                        }
321                        return result;
322                }
323                if (queryMap.containsKey(id)) {
324                        SentQuery<?> result = queryMap.get(id);
325                        if (remove) {
326                                queryMap.remove(id);
327                        }
328                        return result;
329                }
330                return null;
331        }
332
333        private int nextQueryId = 0;
334
335        protected void processMessage(InboundMessage inboundMessage) {
336                if (inboundMessage == null) {
337                        log.error("failed to use any inbound message");
338                        return;
339                }
340
341                String line;
342                while (!(line = getLine()).startsWith("eof")) {
343                        // log.debug("line: " + line);
344                        inboundMessage.addLine(line);
345                }
346                inboundMessage.eof();
347        }
348
349        protected void processEvent(String rest) {
350                Matcher matcher = eventPattern.matcher(rest);
351                if (!matcher.matches()) {
352                        log.error("invalid event line: " + rest);
353                        return;
354                }
355                Subscription<?> subscription = subscriptions.get(matcher.group(1));
356                if (subscription == null) {
357                        log.error("non subscribed event: " + matcher.group(1));
358                        return;
359                }
360                EventFire event = new EventFire(subscription);
361                event.startFile(null);
362                processMessage(event);
363        }
364
365
366        protected void processMessageStartingWith(String line) {
367                Pair<String, String> command = Strings.splitIntoPair(line, ' ', "\n");
368                if (command.first.equals("event")) {
369                        processEvent(command.second);
370                        return;
371                }
372                Pair<Integer, String> rest = parseRest(command.second);
373
374                if (command.first.equals("file")) {
375                        SentQuery<?> sentQuery = fetchQuery(rest.first, false);
376                        sentQuery.startFile(rest.second);
377                        processMessage(sentQuery);
378                        return;
379                }
380
381                SentQuery<?> sentQuery = fetchQuery(rest.first, true);
382                if (sentQuery == null) {
383                        return;
384                }
385                log.debug("parsing response for request " + sentQuery);
386
387                sentQuery.dispatchResponseProcess(new Response(command.first.equals("ok"), rest.second, sentQuery.getFiles()));
388        }
389
390        @Override
391        protected void receiverThreadRoutine() {
392                while (connected) {
393                        try {
394                                processMessageStartingWith(getLine());
395                        } catch (Exception e) {
396                                break;
397                        }
398                }
399        }
400
401
402}
Note: See TracBrowser for help on using the repository browser.