source: java/main/src/main/java/com/framsticks/communication/ClientConnection.java @ 84

Last change on this file since 84 was 84, checked in by psniegowski, 11 years ago

HIGHLIGHTS:

  • simplification of entities management model
  • cleanup around params (improve hierarchy)
  • migrate from JUnit to TestNG
  • introduce FEST to automatically test GUI
  • improve slider control
  • loosen synchronization between gui tree and backend representation
  • and many other bug fixes

NOTICE:

  • a great many of lines is changed only because of substituting spaces with tabs

CHANGELOG (oldest changes at the bottom):

Some cleaning after fix found.

Fix bug with tree.

More changes with TreeNodes?.

Finally fix issue with tree.

Improve gui tree management.

Decouple update of values from fetch request in gui.

Minor changes.

Minor changes.

Minor change.

Change Path construction wording.

More fixes to SliderControl?.

Fix SliderControl?.

Fix SliderControl?.

Minor improvement.

Several changes.

Make NumberParam? a generic class.

Add robot to the gui test.

Setup common testing logging configuration.

Remove Parameters class.

Remove entityOwner from Parameters.

Move name out from Parameters class.

Move configuration to after the construction.

Simplify observers and endpoints.

Remove superfluous configureEntity overrides.

Add dependency on fest-swing-testng.

Use FEST for final print test.

Use FEST for more concise and readable assertions.

Divide test of F0Parser into multiple methods.

Migrate to TestNG

Minor change.

Change convention from LOGGER to log.

Fix reporting of errors during controls filling.

Bound maximal height of SliderControl?.

Minor improvements.

Improve tooltips for controls.

Also use Delimeted in more places.

Move static control utilities to Gui.

Rename package gui.components to controls.

Some cleaning in controls.

Improve Param classes placing.

Move ValueParam?, PrimitiveParam? and CompositeParam? one package up.

Improve ParamBuilder?.

Move getDef to ValueParam? and PrimitiveParam?.

Move getMax and getDef to ValueParam?.

Move getMin to ValueParam?.

Upgrade to laters apache commons versions.

Use filterInstanceof extensively.

Add instanceof filters.

Make ValueParam? in many places of Param.

Place assertions about ValueParam?.

Add ValueParam?

Rename ValueParam? to PrimitiveParam?

Minor changes.

Several improvements to params types.

Add NumberParam?.

Add TextControl? component.

Add .swp files to .gitignore

Greatly improved slider component.

Some improvements.

Make Param.reassign return also a state.

Add IterableIterator?.

Several changes.

  • Move util classes to better packages.
  • Remove warnings from eclim.

Several improvements.

Fix bug with BooleanParam?.

Some experiments with visualization.

Another fix to panel management.

Improve panel management.

Some refactorization around panels.

Add root class for panel.

File size: 10.0 KB
Line 
1package com.framsticks.communication;
2
3import com.framsticks.communication.queries.ApplicationRequest;
4import com.framsticks.communication.queries.RegistrationRequest;
5import com.framsticks.communication.queries.UseRequest;
6import com.framsticks.communication.queries.VersionRequest;
7import com.framsticks.communication.util.LoggingStateCallback;
8import com.framsticks.params.ListSource;
9import com.framsticks.util.*;
10import com.framsticks.util.dispatching.AtOnceDispatcher;
11import com.framsticks.util.dispatching.Dispatcher;
12import com.framsticks.util.dispatching.Dispatching;
13import com.framsticks.util.lang.Pair;
14import com.framsticks.util.lang.Strings;
15import org.apache.log4j.Logger;
16
17import java.io.IOException;
18import java.net.Socket;
19import java.net.SocketException;
20import java.util.*;
21import java.util.regex.Matcher;
22import java.util.regex.Pattern;
23
24/**
25 * @author Piotr Sniegowski
26 */
27public class ClientConnection extends Connection {
28
29        private final static Logger log = Logger.getLogger(ClientConnection.class);
30
31        protected final Map<String, Subscription> subscriptions = new HashMap<String, Subscription>();
32
33        public String getAddress() {
34                return address;
35        }
36
37        public void connect(StateFunctor connectedFunctor) {
38                try {
39                        log.info("connecting to " + address);
40
41                        socket = new Socket(hostName, port);
42
43                        socket.setSoTimeout(500);
44
45                        log.info("connected to " + hostName + ":" + port);
46                        connected = true;
47
48                        runThreads();
49
50                        connectedFunctor.call(null);
51                } catch (SocketException e) {
52                        log.error("failed to connect: " + e);
53                        connectedFunctor.call(e);
54                } catch (IOException e) {
55                        log.error("buffer creation failure");
56                        connectedFunctor.call(e);
57                        close();
58                }
59        }
60
61        private static abstract class InboundMessage {
62                protected String currentFilePath;
63                protected List<String> currentFileContent;
64                protected final List<File> files = new ArrayList<File>();
65
66                public abstract void eof();
67
68                protected void initCurrentFile(String path) {
69                        currentFileContent = new LinkedList<String>();
70                        currentFilePath = path;
71                }
72                protected void finishCurrentFile() {
73                        if (currentFileContent == null) {
74                                return;
75                        }
76                        files.add(new File(currentFilePath, new ListSource(currentFileContent)));
77                        currentFilePath = null;
78                        currentFileContent= null;
79                }
80
81                public abstract void startFile(String path);
82
83                public void addLine(String line) {
84                        assert line != null;
85                        assert currentFileContent != null;
86                        currentFileContent.add(line.substring(0, line.length() - 1));
87                }
88
89                public List<File> getFiles() {
90                        return files;
91                }
92        }
93
94        private static class EventFire extends InboundMessage {
95                public final Subscription subscription;
96
97                private EventFire(Subscription subscription) {
98                        this.subscription = subscription;
99                }
100
101                public void startFile(String path) {
102                        assert path == null;
103                        initCurrentFile(null);
104                }
105
106                @Override
107                public void eof() {
108                        finishCurrentFile();
109                        Dispatching.invokeLaterOrNow(subscription.getDispatcher(), new Runnable() {
110                                @Override
111                                public void run() {
112                                        subscription.getEventCallback().call(getFiles());
113                                }
114                        });
115                }
116        }
117
118        private static class SentQuery extends InboundMessage {
119                Request request;
120                ResponseCallback callback;
121                Dispatcher dispatcher;
122
123                public void startFile(String path) {
124                        finishCurrentFile();
125                        if (path == null) {
126                                assert request instanceof ApplicationRequest;
127                                path = ((ApplicationRequest)request).getPath();
128                        }
129                        initCurrentFile(path);
130                }
131
132                public void eof() {
133                        finishCurrentFile();
134                        //no-operation
135                }
136
137                @Override
138                public String toString() {
139                        return request.toString();
140                }
141        }
142        private Map<Integer, SentQuery> queryMap = new HashMap<Integer, SentQuery>();
143
144
145        protected final String address;
146        protected final String hostName;
147        protected final int port;
148
149        private static Pattern addressPattern = Pattern.compile("^([^:]*)(:([0-9]+))?$");
150
151        public ClientConnection(String address) {
152                assert address != null;
153                this.address = address;
154                Matcher matcher = addressPattern.matcher(address);
155                if (!matcher.matches()) {
156                        log.fatal("invalid address: " + address);
157                        hostName = null;
158                        port = 0;
159                        return;
160                }
161                hostName = matcher.group(1);
162                port = matcher.group(3) != null ? Integer.parseInt(matcher.group(3)) : 9009;
163        }
164
165        private SentQuery currentlySentQuery;
166
167        public void send(Request request, ResponseCallback callback) {
168                send(request, AtOnceDispatcher.instance, callback);
169        }
170
171        public void send(Request request, Dispatcher dispatcher, ResponseCallback callback) {
172
173                if (!isConnected()) {
174                        log.fatal("not connected");
175                        return;
176                }
177                final SentQuery sentQuery = new SentQuery();
178                sentQuery.request = request;
179                sentQuery.callback = callback;
180                sentQuery.dispatcher = dispatcher;
181
182                senderThread.invokeLater(new Runnable(){
183                        @Override
184                        public void run() {
185                                synchronized (ClientConnection.this) {
186
187                                        while (!(requestIdEnabled || currentlySentQuery == null)) {
188                                                try {
189                                                        ClientConnection.this.wait();
190                                                } catch (InterruptedException ignored) {
191                                                        break;
192                                                }
193                                        }
194                                }
195                                Integer id = stashQuery(sentQuery);
196                                String command = sentQuery.request.getCommand();
197                                StringBuilder message = new StringBuilder();
198                                message.append(command);
199                                if (id != null) {
200                                        message.append(" ").append(id);
201                                }
202                                sentQuery.request.construct(message);
203                                String out = message.toString();
204
205                                output.println(out);
206                                log.debug("sending query: " + out);
207
208                        }
209                });
210                /*
211                synchronized (this) {
212                        log.debug("queueing query: " + query);
213                        queryQueue.offer(sentQuery);
214                        notifyAll();
215                }
216                */
217        }
218
219
220        @Override
221        public String toString() {
222                return address;
223        }
224
225        public void subscribe(final String path, final SubscriptionCallback callback) {
226                send(new RegistrationRequest().setPath(path), new ResponseCallback() {
227                        @Override
228                        public void process(Response response) {
229                                if (!response.getOk()) {
230                                        log.error("failed to register on event: " + path);
231                                        callback.subscribed(null);
232                                        return;
233                                }
234                                assert response.getFiles().isEmpty();
235                                Subscription subscription = new Subscription(ClientConnection.this, path, response.getComment());
236                                log.debug("registered on event: " + subscription);
237                                synchronized (subscriptions) {
238                                        subscriptions.put(subscription.getRegisteredPath(), subscription);
239                                }
240                                subscription.setEventCallback(callback.subscribed(subscription));
241                                if (subscription.getEventCallback() == null) {
242                                        log.info("subscription for " + path + " aborted");
243                                        subscription.unsubscribe(new LoggingStateCallback(log, "abort subscription"));
244                                }
245                        }
246                });
247        }
248
249        public void negotiateProtocolVersion(StateFunctor stateFunctor) {
250                protocolVersion = -1;
251                sendQueryVersion(1, stateFunctor);
252        }
253
254        public void sendQueryVersion(final int version, final StateFunctor stateFunctor) {
255                send(new VersionRequest().version(version), new StateCallback() {
256                        @Override
257                        public void call(Exception e) {
258                                if (e != null) {
259                                        log.fatal("failed to upgrade protocol to version: " + version);
260                                        return;
261                                }
262                                protocolVersion = version;
263                                if (version < 4) {
264                                        /** it is an implicit loop here*/
265                                        sendQueryVersion(version + 1, stateFunctor);
266                                        return;
267                                }
268                                send(new UseRequest().feature("request_id"), new StateCallback() {
269                                        @Override
270                                        public void call(Exception e) {
271                                                requestIdEnabled = e == null;
272                                                /*
273                                                synchronized (ClientConnection.this) {
274                                                        ClientConnection.this.notifyAll();
275                                                }
276                                                */
277                                                if (!requestIdEnabled) {
278                                                        log.fatal("protocol negotiation failed");
279                                                        stateFunctor.call(new Exception("protocol negotiation failed", e));
280                                                        return;
281                                                }
282                                                stateFunctor.call(null);
283                                        }
284                                });
285
286                        }
287                });
288        }
289
290
291        private synchronized SentQuery fetchQuery(Integer id, boolean remove) {
292                if (id == null) {
293                        if (requestIdEnabled) {
294                                return null;
295                        }
296                        SentQuery result = currentlySentQuery;
297                        if (remove) {
298                                currentlySentQuery = null;
299                                notifyAll();
300                        }
301                        return result;
302                }
303                if (queryMap.containsKey(id)) {
304                        SentQuery result = queryMap.get(id);
305                        if (remove) {
306                                queryMap.remove(id);
307                        }
308                        return result;
309                }
310                return null;
311        }
312
313        private int nextQueryId = 0;
314
315        private Integer stashQuery(SentQuery sentQuery) {
316                if (!requestIdEnabled) {
317                        currentlySentQuery = sentQuery;
318                        return null;
319                }
320                queryMap.put(nextQueryId, sentQuery);
321                return nextQueryId++;
322        }
323
324        protected void processMessage(InboundMessage inboundMessage) throws Exception {
325                if (inboundMessage == null) {
326                        log.error("failed to use any inbound message");
327                        return;
328                }
329
330                String line;
331                while (!(line = getLine()).startsWith("eof")) {
332                        // log.debug("line: " + line);
333                        inboundMessage.addLine(line);
334                }
335                inboundMessage.eof();
336        }
337
338        protected void processEvent(String rest) throws Exception {
339                Matcher matcher = eventPattern.matcher(rest);
340                if (!matcher.matches()) {
341                        log.error("invalid event line: " + rest);
342                        return;
343                }
344                Subscription subscription = subscriptions.get(matcher.group(1));
345                if (subscription == null) {
346                        log.error("non subscribed event: " + matcher.group(1));
347                        return;
348                }
349                EventFire event = new EventFire(subscription);
350                event.startFile(null);
351                processMessage(event);
352        }
353
354
355        protected void processMessageStartingWith(String line) throws Exception {
356                Pair<String, String> command = Strings.splitIntoPair(line, ' ', "\n");
357                if (command.first.equals("event")) {
358                        processEvent(command.second);
359                        return;
360                }
361                Pair<Integer, String> rest = parseRest(command.second);
362
363                if (command.first.equals("file")) {
364                        SentQuery sentQuery = fetchQuery(rest.first, false);
365                        sentQuery.startFile(rest.second);
366                        processMessage(sentQuery);
367                        return;
368                }
369
370                SentQuery sentQuery = fetchQuery(rest.first, true);
371                if (sentQuery == null) {
372                        return;
373                }
374                log.debug("parsing response for request " + sentQuery);
375
376                final Response response = new Response(command.first.equals("ok"), rest.second, sentQuery.getFiles());
377                final ResponseCallback callback = sentQuery.callback;
378
379                Dispatching.invokeLaterOrNow(sentQuery.dispatcher, new Runnable() {
380                        @Override
381                        public void run() {
382                                callback.process(response);
383                        }
384                });
385        }
386
387        @Override
388        protected void receiverThreadRoutine() throws Exception {
389                while (connected) {
390                        processMessageStartingWith(getLine());
391                }
392        }
393
394}
Note: See TracBrowser for help on using the repository browser.