source: java/main/src/main/java/com/framsticks/communication/Connection.java @ 101

Last change on this file since 101 was 101, checked in by psniegowski, 11 years ago

HIGHLIGHTS:

  • improve tree side notes
  • improve GUI layout
  • add foldable list of occured events to EventControl?
  • improve automatic type conversion in proxy listeners
  • implement several Access functionalities as algorithms independent of Access type
  • introduce draft base classes for distributed experiments
  • automatically register dependant Java classes to FramsClass? registry
  • add testing prime experiment and configuration
  • simplify and improve task dispatching

CHANGELOG:
Improve task dispatching in RemoteTree?.

GUI no longer hangs on connection problems.

Make all dispatchers joinables.

Refactorize Thread dispatcher.

Remove Task and PeriodicTask?.

Use Java utilities in those situations.

Reworking tasks dispatching.

Fix bug in EventControl? listener dispatching.

Minor improvements.

Add testing configuration for ExternalProcess? in GUI.

More improvement to prime.

Support for USERREADONLY in GUI.

Add that flag to various params in Java classes.

Remove redundant register clauses from several FramsClassAnnotations?.

Automatically gather and register dependant classes.

Add configuration for prime.

Improve Simulator class.

Add prime.xml configuration.

Introduce draft Experiment and Simulator classes.

Add prime experiment tests.

Enclose typical map with listeners into SimpleUniqueList?.

Needfile works in GUI.

Improve needfile handling in Browser.

More improvement with NeedFile?.

Implementing needfile.

Update test.

Rename ChangeEvent? to TestChangeEvent?.

Automatic argument type search in RemoteTree? listeners.

MultiParamLoader? uses AccessProvider?. By default old implementation
enclosed in AccessStash? or Registry.

Minor changes.

Rename SourceInterface? to Source.

Also improve toString of File and ListSource?.

Remove unused SimpleSource? class.

Add clearing in HistoryControl?.

Show entries in table at EventControl?.

Improve EventControl?.

Add listeners registration to EventControl?.

Add foldable table to HistoryControl?.

Add control row to Procedure and Event controls.

Improve layout of controls.

Another minor change to gui layout.

Minor improvement in the SliderControl?.

Minor changes.

Move ReflectionAccess?.Backend to separate file.

It was to cluttered.

Cleanup in ReflectionAccess?.

Move setMin, setMax, setDef to AccessOperations?.

Extract loading operation into AccessOperations?.

Append Framsticks to name of UnsupportedOperationException?.

The java.lang.UnsupportedOperationException? was shadowing this class.

Rename params.Util to params.ParamsUtil?.

Several improvements.

Minor changes.

Implement revert functionality.

Improve local changes management.

Minor improvement.

Remove methods rendered superfluous after SideNoteKey? improvement.

Improve SideNoteKey?.

It is now generic type, so explicit type specification at
call site is no more needed.

Introduce SideNoteKey? interface.

Only Objects implementing that key may be used as side note keys.

Minor improvements.

Use strings instead of ValueControls? in several gui mappings.

File size: 9.2 KB
Line 
1package com.framsticks.communication;
2
3import com.framsticks.params.Source;
4import com.framsticks.params.annotations.AutoAppendAnnotation;
5import com.framsticks.params.annotations.FramsClassAnnotation;
6import com.framsticks.params.annotations.ParamAnnotation;
7import com.framsticks.util.FramsticksException;
8import com.framsticks.util.io.Encoding;
9import com.framsticks.util.lang.Strings;
10
11import org.apache.logging.log4j.Level;
12import org.apache.logging.log4j.Logger;
13import org.apache.logging.log4j.LogManager;
14import java.io.BufferedReader;
15import java.io.IOException;
16import java.io.InputStreamReader;
17import java.io.OutputStreamWriter;
18import java.io.PrintWriter;
19import java.net.Socket;
20import java.net.SocketTimeoutException;
21import java.util.Collection;
22import java.util.HashSet;
23import java.util.Set;
24
25import com.framsticks.util.dispatching.AbstractJoinable;
26import com.framsticks.util.dispatching.Dispatcher;
27import com.framsticks.util.dispatching.Dispatching;
28import com.framsticks.util.dispatching.ExceptionResultHandler;
29import com.framsticks.util.dispatching.Joinable;
30import com.framsticks.util.dispatching.JoinableCollection;
31import com.framsticks.util.dispatching.JoinableParent;
32import com.framsticks.util.dispatching.JoinableState;
33import com.framsticks.util.dispatching.RunAt;
34import com.framsticks.util.dispatching.Thread;
35import com.framsticks.util.dispatching.ThrowExceptionHandler;
36
37@FramsClassAnnotation
38public abstract class Connection extends AbstractJoinable implements JoinableParent, ExceptionResultHandler {
39
40        protected final static Logger log = LogManager.getLogger(Connection.class);
41
42        private PrintWriter output = null;
43        private BufferedReader input = null;
44
45        protected Socket socket = null;
46
47        protected Address address;
48        protected String description = "connection";
49
50        protected final Thread<Connection> senderThread = new Thread<>();
51        protected final Thread<Connection> receiverThread = new Thread<>();
52        protected final JoinableCollection<Thread<Connection>> threads = new JoinableCollection<>();
53        protected final Set<ConnectionListener> listeners = new HashSet<>();
54
55        protected ExceptionResultHandler exceptionHandler = ThrowExceptionHandler.getInstance();
56
57        /**
58         *
59         */
60        public Connection() {
61                threads.add(senderThread);
62                threads.add(receiverThread);
63        }
64
65        protected void updateNames() {
66                if (address == null) {
67                        return;
68                }
69                senderThread.setName(description + " thread " + address + " sender");
70                receiverThread.setName(description + " thread " + address + " receiver");
71                threads.setObservableName(address + " connection threads");
72        }
73
74        public void setDescription(String description) {
75                this.description = description;
76                updateNames();
77        }
78
79        @AutoAppendAnnotation
80        public Connection setAddress(Address address) {
81                this.address = address;
82                updateNames();
83                return this;
84        }
85
86        @ParamAnnotation
87        public Connection setAddress(String address) {
88                return setAddress(new Address(address));
89        }
90
91        public synchronized boolean isConnected() {
92                return socket != null && socket.isConnected();
93        }
94
95        static final int BUFFER_LENGTH = 1024;
96
97        int readChars = 0;
98        int iterator = 0;
99        int bufferStart = 0;
100        final char[] readBuffer = new char[BUFFER_LENGTH];
101
102        protected String getLine() {
103                final StringBuilder lineBuffer = new StringBuilder();
104                try {
105                        while (!Thread.interrupted()) {
106                                while (iterator < readChars) {
107                                        if (readBuffer[iterator] != '\n') {
108                                                ++iterator;
109                                                continue;
110                                        }
111                                        /** Do not append new line. */
112                                        lineBuffer.append(readBuffer, bufferStart, iterator - bufferStart);
113                                        ++iterator;
114                                        bufferStart = iterator;
115                                        String line = lineBuffer.toString();
116
117                                        synchronized (listeners) {
118                                                for (ConnectionListener l : listeners) {
119                                                        l.connectionIncomming(line);
120                                                }
121                                        }
122
123                                        return line;
124                                }
125                                final int length = readChars - bufferStart;
126                                if (length > 0) {
127                                        assert bufferStart >= 0 && bufferStart < BUFFER_LENGTH;
128                                        assert bufferStart + length <= BUFFER_LENGTH;
129                                        lineBuffer.append(readBuffer, bufferStart, length);
130                                }
131
132                                readChars = 0;
133                                while (readChars == 0) {
134                                        try {
135                                                readChars = input.read(readBuffer);
136                                        } catch (SocketTimeoutException ignored) {
137                                                //timeout - continue
138                                        }
139                                }
140                                iterator = 0;
141                                bufferStart = 0;
142                        }
143                        throw new InterruptedException();
144                } catch (Exception e) {
145                        throw new FramsticksException().msg("failed to read line").cause(e);
146                }
147        }
148
149        protected void putLine(String line) {
150                synchronized (listeners) {
151                        for (ConnectionListener l : listeners) {
152                                l.connectionOutgoing(line);
153                        }
154                }
155                output.println(line);
156        }
157
158        protected void flushOut() {
159                output.flush();
160        }
161
162        protected abstract void processNextInputBatch();
163
164        protected final void processInputBatchesUntilClosed() {
165                while (isRunning() && isConnected()) {
166                        try {
167                                processNextInputBatch();
168                        } catch (FramsticksException e) {
169                                handle(e);
170                        } catch (Exception e) {
171                                log.log(isRunning() ? Level.ERROR : Level.DEBUG, "caught exception: ", e);
172                                break;
173                        }
174                }
175        }
176
177        protected abstract void receiverThreadRoutine();
178
179        // @SuppressWarnings("NN_NAKED_NOTIFY")
180        protected void setupStreams() {
181                try {
182                        output = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Encoding.getFramsticksCharset()), true);
183                        input = new BufferedReader(new InputStreamReader(socket.getInputStream(), Encoding.getFramsticksCharset()));
184                        synchronized (this) {
185                                this.notifyAll();
186                        }
187                } catch (IOException e) {
188                        throw new FramsticksException().msg("failed to setup streams").cause(e).arg("connection", this);
189                }
190        }
191
192        @Override
193        protected void joinableFinish() {
194                try {
195                        if (output != null) {
196                                output.close();
197                                output = null;
198                        }
199
200                        if (input != null) {
201                                input.close();
202                                input = null;
203                        }
204
205                        if (socket != null) {
206                                socket.close();
207                                socket = null;
208                        }
209                } catch (Exception e) {
210                        log.error("failed to stop connection: ", e);
211                }
212                log.debug("connection closed");
213        }
214
215        @Override
216        public void childChangedState(Joinable joinable, JoinableState state) {
217                proceedToState(state);
218        }
219
220        @Override
221        public String getName() {
222                return address != null ? description + " " + address : description;
223        }
224
225        @Override
226        protected void joinableStart() {
227                Dispatching.use(threads, this);
228
229                senderThread.dispatch(new RunAt<Connection>(ThrowExceptionHandler.getInstance()) {
230                        @Override
231                        protected void runAt() {
232                                synchronized (Connection.this) {
233                                        while (state.equals(JoinableState.RUNNING) && output == null) {
234                                                Dispatching.wait(Connection.this, 500);
235                                        }
236                                }
237                        }
238                });
239
240                receiverThread.dispatch(new RunAt<Connection>(ThrowExceptionHandler.getInstance()) {
241                        @Override
242                        protected void runAt() {
243                                receiverThreadRoutine();
244                                interruptJoinable();
245                                finishJoinable();
246                        }
247                });
248        }
249
250        @Override
251        protected void joinableInterrupt() {
252                Dispatching.drop(threads, this);
253                finishJoinable();
254        }
255
256        @Override
257        protected void joinableJoin() throws InterruptedException {
258                Dispatching.join(threads);
259        }
260
261        protected static void startClientConnection(Connection connection) {
262                while (connection.isRunning() && !connection.isConnected()) {
263                        log.debug("connecting to {}", connection.address);
264                        try {
265                                connection.socket = new Socket(connection.getAddressObject().getHostName(), connection.getAddressObject().getPort());
266                        } catch (IOException e) {
267                                log.info("{} failed to connect (retrying): ", connection, e);
268                                Dispatching.sleep(0.5);
269                        }
270                }
271
272                log.debug("{} connected", connection);
273                try {
274                        connection.socket.setSoTimeout(500);
275                        connection.setupStreams();
276                } catch (Exception e) {
277                        throw new FramsticksException().msg("failed to initialize socket").cause(e).arg("connection", connection);
278                }
279        }
280
281        /**
282         * @return the address
283         */
284        @ParamAnnotation
285        public String getAddress() {
286                return Strings.toStringNullProof(address, "?");
287        }
288
289        public Address getAddressObject() {
290                return address;
291        }
292
293        /**
294         * @return the listeners
295         */
296        public Collection<ConnectionListener> getListeners() {
297                return listeners;
298        }
299
300        /**
301         * @return the handler
302         */
303        public ExceptionResultHandler getExceptionHandler() {
304                return exceptionHandler;
305        }
306
307        /**
308         * @param handler the handler to set
309         */
310        public void setExceptionHandler(ExceptionResultHandler handler) {
311                this.exceptionHandler = handler;
312        }
313
314        public static <T extends Connection> T to(T connection, Address address) {
315                connection.setAddress(address);
316                return connection;
317        }
318
319        @Override
320        public void handle(FramsticksException exception) {
321                exceptionHandler.handle(exception);
322        }
323
324        public Dispatcher<Connection> getReceiverDispatcher() {
325                return receiverThread;
326        }
327
328        public Dispatcher<Connection> getSenderDispatcher() {
329                return senderThread;
330        }
331
332
333        protected static String idToString(Integer id) {
334                return id != null ? " " + id.toString() : "";
335        }
336
337        protected final void putFile(File file, Integer outId) {
338                putLine("file" + idToString(outId)/* + " " + f.getPath()*/);
339                Source content = file.getContent();
340                String line;
341                while ((line = content.readLine()) != null) {
342                        putLine(line);
343                }
344                putLine("eof");
345        }
346
347        public final void sendFile(final String header, final File file, final Integer id, ExceptionResultHandler handler) {
348                senderThread.dispatch(new RunAt<Connection>(handler) {
349                        @Override
350                        protected void runAt() {
351                                if (header != null) {
352                                        putLine(header);
353                                }
354                                putFile(file, id);
355                                flushOut();
356                        }
357                });
358        }
359
360}
Note: See TracBrowser for help on using the repository browser.