source: java/main/src/main/java/com/framsticks/communication/Connection.java @ 96

Last change on this file since 96 was 96, checked in by psniegowski, 11 years ago

HIGHLIGHTS:

  • cleanup Instance management
    • extract Instance interface
    • extract Instance common algorithms to InstanceUtils?
  • fix closing issues: Ctrl+C or window close button

properly shutdown whole program

by Java Framsticks framework

  • fix parsing and printing of all request types
  • hide exception passing in special handle method of closures
    • substantially improve readability of closures
    • basically enable use of exception in asynchronous closures

(thrown exception is transported back to the caller)

  • implement call request on both sides

CHANGELOG:
Further improve calling.

Improve instance calling.

Calling is working on both sides.

Improve exception handling in testing.

Waiters do not supercede other apllication exception being thrown.

Finished parsing and printing of all request types (with tests).

Move implementation and tests of request parsing to Request.

Add tests for Requests.

Improve waits in asynchronours tests.

Extract more algorithms to InstanceUtils?.

Extract Instance.resolve to InstanceUtils?.

Improve naming.

Improve passing exception in InstanceClient?.

Hide calling of passed functor in StateCallback?.

Hide Exception passing in asynchronous closures.

Hide exception passing in Future.

Make ResponseCallback? an abstract class.

Make Future an abstract class.

Minor change.

Move getPath to Path.to()

Move bindAccess to InstanceUtils?.

Extract common things to InstanceUtils?.

Fix synchronization bug in Connection.

Move resolve to InstanceUtils?.

Allow names of Joinable to be dynamic.

Add support for set request server side.

More fixes in communication.

Fix issues with parsing in connection.

Cut new line characters when reading.

More improvements.

Migrate closures to FramsticksException?.

Several changes.

Extract resolveAndFetch to InstanceUtils? algorithms.

Test resolving and fetching.

More fixes with function signature deduction.

Do not print default values in SimpleAbstractAccess?.

Add test of FramsClass? printing.

Improve FramsticksException? messages.

Add explicit dispatcher synchronization feature.

Rework assertions in tests.

Previous solution was not generic enough.

Allow addition of joinables to collection after start.

Extract SimulatorInstance? from RemoteInstance?.

Remove PrivateJoinableCollection?.

Improve connections.

Move shutdown hook to inside the Monitor.

It should work in TestNG tests, but it seems that
hooks are not called.

In ServerTest? client connects to testing server.

Move socket initialization to receiver thread.

Add proper closing on Ctrl+C (don't use signals).

Fix bugs with server accepting connections.

Merge Entity into Joinable.

Reworking ServerInstance?.

Extract more algorithm to InstanceUtils?.

Extract some common functionality from AbstractInstance?.

Functions were placed in InstanceUtils?.

Hide registry of Instance.

Use ValueParam? in Instance interface.

Minor change.

Extract Instance interface.

Old Instance is now AbstractInstance?.

File size: 6.2 KB
Line 
1package com.framsticks.communication;
2
3import com.framsticks.util.FramsticksException;
4import com.framsticks.util.io.Encoding;
5import com.framsticks.util.lang.Pair;
6import org.apache.log4j.Logger;
7import java.io.BufferedReader;
8import java.io.IOException;
9import java.io.InputStreamReader;
10import java.io.OutputStreamWriter;
11import java.io.PrintWriter;
12import java.net.Socket;
13import java.net.SocketTimeoutException;
14
15import com.framsticks.util.dispatching.AbstractJoinable;
16import com.framsticks.util.dispatching.Dispatching;
17import com.framsticks.util.dispatching.Joinable;
18import com.framsticks.util.dispatching.JoinableCollection;
19import com.framsticks.util.dispatching.JoinableParent;
20import com.framsticks.util.dispatching.JoinableState;
21import com.framsticks.util.dispatching.RunAt;
22import com.framsticks.util.dispatching.Thread;
23
24public abstract class Connection extends AbstractJoinable implements JoinableParent {
25
26        protected final static Logger log = Logger.getLogger(Connection.class);
27
28        protected PrintWriter output = null;
29        protected BufferedReader input = null;
30
31        protected Socket socket = null;
32
33        public boolean requestIdEnabled = false;
34
35        protected int protocolVersion = -1;
36
37        public String getAddress() {
38                return address;
39        }
40        protected final String address;
41        protected final String description;
42
43        protected final Thread<Connection> senderThread = new Thread<>();
44        protected final Thread<Connection> receiverThread = new Thread<>();
45        protected final JoinableCollection<Thread<Connection>> threads = new JoinableCollection<>();
46
47        protected void setUpThreadNames(String name) {
48        }
49
50        /**
51         *
52         */
53        public Connection(String address, String description) {
54                this.address = address;
55                this.description = description;
56                threads.setObservableName(address + " connection threads");
57                threads.add(senderThread);
58                threads.add(receiverThread);
59
60                senderThread.setName(description + " thread " + address + " sender");
61                receiverThread.setName(description + " thread " + address + " receiver");
62        }
63
64        public synchronized boolean isConnected() {
65                return socket != null && socket.isConnected();
66        }
67
68
69        // protected static final String ARGUMENT_PATTERN_FRAGMENT = "((?:\\S+)|(?:\\\"[^\"]*\\\"))";
70        // protected static final Pattern REQUEST_ID_ENABLED_PATTERN = Pattern.compile("^\\s*([0-9]+)(?:\\s+" + ARGUMENT_PATTERN_FRAGMENT + ")?$");
71        // protected static final Pattern REQUEST_ID_DISABLED_PATTERN = Pattern.compile("^\\s*" + ARGUMENT_PATTERN_FRAGMENT + "?$");
72
73        // // protected final Pair<String, String> breakLine(String line)
74        // protected final Pair<Integer, String> parseRest(String rest) {
75        //      Matcher matcher = (requestIdEnabled ? REQUEST_ID_ENABLED_PATTERN : REQUEST_ID_DISABLED_PATTERN).matcher(rest);
76        //      if (!matcher.matches()) {
77        //              log.fatal("unmatched first line of input: '" + rest + "'");
78        //              return null;
79        //      }
80        //      return new Pair<Integer, String>(requestIdEnabled ? Integer.parseInt(matcher.group(1)) : null, matcher.group(requestIdEnabled ? 2 : 1));
81        // }
82
83        protected final Pair<Integer, CharSequence> takeRequestId(CharSequence line) {
84                return Request.takeRequestId(requestIdEnabled, line);
85        }
86
87        static final int BUFFER_LENGTH = 1024;
88
89        int readChars = 0;
90        int iterator = 0;
91        int bufferStart = 0;
92        final char[] readBuffer = new char[BUFFER_LENGTH];
93
94        protected String getLine() {
95                final StringBuilder lineBuffer = new StringBuilder();
96                try {
97                        while (!Thread.interrupted()) {
98                                while (iterator < readChars) {
99                                        if (readBuffer[iterator] != '\n') {
100                                                ++iterator;
101                                                continue;
102                                        }
103                                        /** Do not append new line. */
104                                        lineBuffer.append(readBuffer, bufferStart, iterator - bufferStart);
105                                        ++iterator;
106                                        bufferStart = iterator;
107                                        return lineBuffer.toString();
108                                }
109                                final int length = readChars - bufferStart;
110                                if (length > 0) {
111                                        assert bufferStart >= 0 && bufferStart < BUFFER_LENGTH;
112                                        assert bufferStart + length <= BUFFER_LENGTH;
113                                        lineBuffer.append(readBuffer, bufferStart, length);
114                                }
115
116                                readChars = 0;
117                                while (readChars == 0) {
118                                        try {
119                                                readChars = input.read(readBuffer);
120                                        } catch (SocketTimeoutException ignored) {
121                                                //timeout - continue
122                                        }
123                                }
124                                iterator = 0;
125                                bufferStart = 0;
126                        }
127                        throw new InterruptedException();
128                } catch (Exception e) {
129                        throw new FramsticksException().msg("failed to read line").cause(e);
130                }
131        }
132
133        protected abstract void receiverThreadRoutine();
134
135
136        protected void setupStreams() {
137                try {
138                        output = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Encoding.getFramsticksCharset()), true);
139                        input = new BufferedReader(new InputStreamReader(socket.getInputStream(), Encoding.getFramsticksCharset()));
140                        synchronized (this) {
141                                this.notifyAll();
142                        }
143                } catch (IOException e) {
144                        throw new FramsticksException().msg("failed to setup streams").cause(e).arg("connection", this);
145                }
146        }
147
148        /**
149         * Returns Query associated with query getId.
150         *
151         * @return Query associated with query getId.
152         */
153        public int getProtocolVersion() {
154                return protocolVersion;
155        }
156
157
158        @Override
159        protected void joinableFinish() {
160                try {
161                        if (output != null) {
162                                output.close();
163                                output = null;
164                        }
165
166                        if (input != null) {
167                                input.close();
168                                input = null;
169                        }
170
171
172                        if (socket != null) {
173                                socket.close();
174                                socket = null;
175                        }
176                } catch (Exception e) {
177                        log.error("failed to stop connection: ", e);
178                }
179                log.debug("connection closed");
180        }
181
182        @Override
183        public void childChangedState(Joinable joinable, JoinableState state) {
184                proceedToState(state);
185        }
186
187        @Override
188        public String getName() {
189                return description + " " + address;
190        }
191
192        @Override
193        protected void joinableStart() {
194                Dispatching.use(threads, this);
195
196                senderThread.dispatch(new RunAt<Connection>() {
197                        @Override
198                        public void run() {
199                                synchronized (Connection.this) {
200                                        while (state.equals(JoinableState.RUNNING) && output == null) {
201                                                Dispatching.wait(Connection.this, 500);
202                                        }
203                                }
204                        }
205                });
206
207                receiverThread.dispatch(new RunAt<Connection>() {
208                        @Override
209                        public void run() {
210                                receiverThreadRoutine();
211                        }
212                });
213        }
214
215        @Override
216        protected void joinableInterrupt() {
217                protocolVersion = -1;
218                Dispatching.drop(threads, this);
219                finish();
220        }
221
222
223        @Override
224        protected void joinableJoin() throws InterruptedException {
225                Dispatching.join(threads);
226        }
227
228}
Note: See TracBrowser for help on using the repository browser.