source: java/main/src/main/java/com/framsticks/communication/Connection.java @ 97

Last change on this file since 97 was 97, checked in by psniegowski, 11 years ago

HIGHLIGHTS:

  • add proper exception passing between communication sides:

if exception occur during handling client request, it is
automatically passed as comment to error response.

it may be used to snoop communication between peers

  • fix algorithm choosing text controls in GUI
  • allow GUI testing in virtual frame buffer (xvfb)

FEST had some problem with xvfb but workaround was found

supports tab-completion based on requests history

CHANGELOG:
Further improve handling of exceptions in GUI.

Add StatusBar? implementing ExceptionResultHandler?.

Make completion processing asynchronous.

Minor changes.

Improve completion in console.

Improve history in InteractiveConsole?.

First working version of DirectConsole?.

Minor changes.

Make Connection.address non final.

It is more suitable to use in configuration.

Improvement of consoles.

Improve PopupMenu? and closing of FrameJoinable?.

Fix BrowserTest?.

Found bug with FEST running under xvfb.

JButtonFixture.click() is not working under xvfb.
GuiTest? has wrapper which uses JButton.doClick() directly.

Store CompositeParam? param in TreeNode?.

Simplify ClientSideManagedConnection? connecting.

There is now connectedFunctor needed, ApplicationRequests? can be
send right after creation. They are buffered until the version
and features are negotiated.

Narow down interface of ClientSideManagedConnection?.

Allow that connection specialization send only
ApplicationRequests?.

Improve policy of text control choosing.

Change name of Genotype in BrowserTest?.

Make BrowserTest? change name of Genotype.

Minor change.

First working draft of TrackConsole?.

Simplify Consoles.

More improvements with gui joinables.

Unify initialization on gui joinables.

More rework of Frame based entities.

Refactorize structure of JFrames based entities.

Extract GuiTest? from BrowserBaseTest?.

Reorganize Console classes structure.

Add Collection view to JoinableCollection?.

Configure timeout in testing.

Minor changes.

Rework connections hierarchy.

Add Mode to the get operation.

Make get and set in Tree take PrimitiveParam?.

Unify naming of operations.

Make RunAt? use the given ExceptionHandler?.

It wraps the virtual runAt() method call with
try-catch passing exception to handler.

Force RunAt? to include ExceptionHandler?.

Improve ClientAtServer?.

Minor change.

Another sweep with FindBugs?.

Rename Instance to Tree.

Minor changes.

Minor changes.

Further clarify semantics of Futures.

Add FutureHandler?.

FutureHandler? is refinement of Future, that proxifies
exception handling to ExceptionResultHandler? given
at construction time.

Remove StateFunctor? (use Future<Void> instead).

Make Connection use Future<Void>.

Unparametrize *ResponseFuture?.

Remove StateCallback? not needed anymore.

Distinguish between sides of ResponseFuture?.

Base ResponseCallback? on Future (now ResponseFuture?).

Make asynchronous store taking Future for flags.

Implement storeValue in ObjectInstance?.

File size: 7.6 KB
Line 
1package com.framsticks.communication;
2
3import com.framsticks.params.annotations.AutoAppendAnnotation;
4import com.framsticks.params.annotations.FramsClassAnnotation;
5import com.framsticks.params.annotations.ParamAnnotation;
6import com.framsticks.util.FramsticksException;
7import com.framsticks.util.io.Encoding;
8import com.framsticks.util.lang.Strings;
9
10import org.apache.log4j.Level;
11import org.apache.log4j.Logger;
12import java.io.BufferedReader;
13import java.io.IOException;
14import java.io.InputStreamReader;
15import java.io.OutputStreamWriter;
16import java.io.PrintWriter;
17import java.net.Socket;
18import java.net.SocketTimeoutException;
19import java.util.Collection;
20import java.util.HashSet;
21import java.util.Set;
22
23import com.framsticks.util.dispatching.AbstractJoinable;
24import com.framsticks.util.dispatching.Dispatching;
25import com.framsticks.util.dispatching.Joinable;
26import com.framsticks.util.dispatching.JoinableCollection;
27import com.framsticks.util.dispatching.JoinableParent;
28import com.framsticks.util.dispatching.JoinableState;
29import com.framsticks.util.dispatching.RunAt;
30import com.framsticks.util.dispatching.Thread;
31import com.framsticks.util.dispatching.ThrowExceptionHandler;
32
33@FramsClassAnnotation
34public abstract class Connection extends AbstractJoinable implements JoinableParent {
35
36        protected final static Logger log = Logger.getLogger(Connection.class);
37
38        private PrintWriter output = null;
39        private BufferedReader input = null;
40
41        protected Socket socket = null;
42
43        protected Address address;
44        protected String description = "connection";
45
46        protected final Thread<Connection> senderThread = new Thread<>();
47        protected final Thread<Connection> receiverThread = new Thread<>();
48        protected final JoinableCollection<Thread<Connection>> threads = new JoinableCollection<>();
49        protected final Set<ConnectionListener> listeners = new HashSet<>();
50
51        /**
52         *
53         */
54        public Connection() {
55                threads.add(senderThread);
56                threads.add(receiverThread);
57
58        }
59
60        protected void updateNames() {
61                if (address == null) {
62                        return;
63                }
64                senderThread.setName(description + " thread " + address + " sender");
65                receiverThread.setName(description + " thread " + address + " receiver");
66                threads.setObservableName(address + " connection threads");
67        }
68
69        public void setDescription(String description) {
70                this.description = description;
71                updateNames();
72        }
73
74        @AutoAppendAnnotation
75        public Connection setAddress(Address address) {
76                this.address = address;
77                updateNames();
78                return this;
79        }
80
81        @ParamAnnotation
82        public Connection setAddress(String address) {
83                return setAddress(new Address(address));
84        }
85
86        public synchronized boolean isConnected() {
87                return socket != null && socket.isConnected();
88        }
89
90        static final int BUFFER_LENGTH = 1024;
91
92        int readChars = 0;
93        int iterator = 0;
94        int bufferStart = 0;
95        final char[] readBuffer = new char[BUFFER_LENGTH];
96
97        protected String getLine() {
98                final StringBuilder lineBuffer = new StringBuilder();
99                try {
100                        while (!Thread.interrupted()) {
101                                while (iterator < readChars) {
102                                        if (readBuffer[iterator] != '\n') {
103                                                ++iterator;
104                                                continue;
105                                        }
106                                        /** Do not append new line. */
107                                        lineBuffer.append(readBuffer, bufferStart, iterator - bufferStart);
108                                        ++iterator;
109                                        bufferStart = iterator;
110                                        String line = lineBuffer.toString();
111
112                                        synchronized (listeners) {
113                                                for (ConnectionListener l : listeners) {
114                                                        l.connectionIncomming(line);
115                                                }
116                                        }
117
118                                        return line;
119                                }
120                                final int length = readChars - bufferStart;
121                                if (length > 0) {
122                                        assert bufferStart >= 0 && bufferStart < BUFFER_LENGTH;
123                                        assert bufferStart + length <= BUFFER_LENGTH;
124                                        lineBuffer.append(readBuffer, bufferStart, length);
125                                }
126
127                                readChars = 0;
128                                while (readChars == 0) {
129                                        try {
130                                                readChars = input.read(readBuffer);
131                                        } catch (SocketTimeoutException ignored) {
132                                                //timeout - continue
133                                        }
134                                }
135                                iterator = 0;
136                                bufferStart = 0;
137                        }
138                        throw new InterruptedException();
139                } catch (Exception e) {
140                        throw new FramsticksException().msg("failed to read line").cause(e);
141                }
142        }
143
144        protected void putLine(String line) {
145                synchronized (listeners) {
146                        for (ConnectionListener l : listeners) {
147                                l.connectionOutgoing(line);
148                        }
149                }
150                output.println(line);
151        }
152
153        protected void flushOut() {
154                output.flush();
155        }
156
157        protected abstract void processNextInputBatch();
158
159        protected final void processInputBatchesUntilClosed() {
160                while (isRunning() && isConnected()) {
161                        try {
162                                processNextInputBatch();
163                        } catch (Exception e) {
164                                log.log(isRunning() ? Level.ERROR : Level.DEBUG, "caught exception: ", e);
165                                break;
166                        }
167                }
168        }
169
170        protected abstract void receiverThreadRoutine();
171
172        // @SuppressWarnings("NN_NAKED_NOTIFY")
173        protected void setupStreams() {
174                try {
175                        output = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Encoding.getFramsticksCharset()), true);
176                        input = new BufferedReader(new InputStreamReader(socket.getInputStream(), Encoding.getFramsticksCharset()));
177                        synchronized (this) {
178                                this.notifyAll();
179                        }
180                } catch (IOException e) {
181                        throw new FramsticksException().msg("failed to setup streams").cause(e).arg("connection", this);
182                }
183        }
184
185
186
187        @Override
188        protected void joinableFinish() {
189                try {
190                        if (output != null) {
191                                output.close();
192                                output = null;
193                        }
194
195                        if (input != null) {
196                                input.close();
197                                input = null;
198                        }
199
200
201                        if (socket != null) {
202                                socket.close();
203                                socket = null;
204                        }
205                } catch (Exception e) {
206                        log.error("failed to stop connection: ", e);
207                }
208                log.debug("connection closed");
209        }
210
211        @Override
212        public void childChangedState(Joinable joinable, JoinableState state) {
213                proceedToState(state);
214        }
215
216        @Override
217        public String getName() {
218                return address != null ? description + " " + address : description;
219        }
220
221        @Override
222        protected void joinableStart() {
223                Dispatching.use(threads, this);
224
225                senderThread.dispatch(new RunAt<Connection>(ThrowExceptionHandler.getInstance()) {
226                        @Override
227                        protected void runAt() {
228                                synchronized (Connection.this) {
229                                        while (state.equals(JoinableState.RUNNING) && output == null) {
230                                                Dispatching.wait(Connection.this, 500);
231                                        }
232                                }
233                        }
234                });
235
236                receiverThread.dispatch(new RunAt<Connection>(ThrowExceptionHandler.getInstance()) {
237                        @Override
238                        protected void runAt() {
239                                receiverThreadRoutine();
240                                interrupt();
241                                finish();
242                        }
243                });
244        }
245
246        @Override
247        protected void joinableInterrupt() {
248                Dispatching.drop(threads, this);
249                finish();
250        }
251
252
253        @Override
254        protected void joinableJoin() throws InterruptedException {
255                Dispatching.join(threads);
256        }
257
258        protected static void startClientConnection(Connection connection) {
259                while (connection.isRunning() && !connection.isConnected()) {
260                        log.debug("connecting to " + connection.address);
261                        try {
262                                connection.socket = new Socket(connection.getAddressObject().getHostName(), connection.getAddressObject().getPort());
263                        } catch (IOException e) {
264                                log.info(connection + " failed to connect (retrying): " + e);
265                                Dispatching.sleep(0.5);
266                        }
267                }
268
269                log.debug(connection + " connected");
270                try {
271                        connection.socket.setSoTimeout(500);
272                        connection.setupStreams();
273                } catch (Exception e) {
274                        throw new FramsticksException().msg("failed to initialize socket").cause(e).arg("connection", connection);
275                }
276        }
277
278        /**
279         * @return the address
280         */
281        @ParamAnnotation
282        public String getAddress() {
283                return Strings.toStringNullProof(address, "?");
284        }
285
286        public Address getAddressObject() {
287                return address;
288        }
289
290
291        /**
292         * @return the listeners
293         */
294        public Collection<ConnectionListener> getListeners() {
295                return listeners;
296        }
297
298        public static <T extends Connection> T to(T connection, Address address) {
299                connection.setAddress(address);
300                return connection;
301        }
302
303}
Note: See TracBrowser for help on using the repository browser.