source: java/main/src/main/java/com/framsticks/communication/Connection.java @ 88

Last change on this file since 88 was 88, checked in by psniegowski, 11 years ago

HIGHLIGHTS:

  • loading f0 schema with XmlLoader?
  • use XmlLoader? to load configuration
  • introduce unified fork-join model of various entities

(Instances, Connections, GUI Frames, etc.),
all those entities clean up gracefully on
shutdown, which may be initialized by user
or by some entity

  • basing on above, simplify several organizing classes

(Observer, main class)

(to host native frams server process from Java level)

CHANGELOG:
Remove redundant Observer class.

Clean up in AbstractJoinable?.

Update ExternalProcess? class to changes in joining model.

Another sweep through code with FindBugs?.

Find bug with not joining RemoteInstance?.

Joining almost works.

Much improved joining model.

More improvement to joining model.

Add logging messages around joinable operations.

Rename methods in AbstractJoinable?.

Improve Joinable.

Rewrite of entity structure.

More simplifications with entities.

Further improve joinables.

Let Frame compose from JFrame instead of inheriting.

Add join classes.

Improvements of closing.

Add Builder interface.

Add FramsServerTest?.xml

FramsServer? may be configured through xml.

Make Framsticks main class an Observer of Entities.

Make Observer a generic type.

Remove variables regarding to removed endpoint.

Simplify observer (remove endpoints).

More changes to Observer and Endpoint.

Minor improvements.

Add OutputListener? to ExternalProcess?.

Improve testing of ExternalProcess?.

Add ExternalProcess? runner.

Rename the Program class to Framsticks.

Migrate Program to use XmlLoader? configuration.

First steps with configuration using XmlLoader?.

Fix several bugs.

Move all f0 classes to apriopriate package.

XmlLoader? is able to load Schema.

XmlLoader? is loading classes and props.

Add GroupBuilder?.

File size: 5.3 KB
Line 
1package com.framsticks.communication;
2
3import com.framsticks.util.FramsticksException;
4import com.framsticks.util.io.Encoding;
5import com.framsticks.util.lang.Pair;
6import org.apache.log4j.Logger;
7import java.io.BufferedReader;
8import java.io.IOException;
9import java.io.InputStreamReader;
10import java.io.OutputStreamWriter;
11import java.io.PrintWriter;
12import java.net.Socket;
13import java.net.SocketTimeoutException;
14import java.util.regex.Matcher;
15import java.util.regex.Pattern;
16
17import com.framsticks.util.dispatching.AbstractJoinable;
18import com.framsticks.util.dispatching.Dispatching;
19import com.framsticks.util.dispatching.Joinable;
20import com.framsticks.util.dispatching.JoinableCollection;
21import com.framsticks.util.dispatching.JoinableParent;
22import com.framsticks.util.dispatching.JoinableState;
23import com.framsticks.util.dispatching.RunAt;
24import com.framsticks.util.dispatching.Thread;
25
26public abstract class Connection extends AbstractJoinable implements JoinableParent {
27
28        protected final static Logger log = Logger.getLogger(Connection.class);
29
30        protected PrintWriter output = null;
31        protected BufferedReader input = null;
32
33        protected Socket socket = null;
34
35        protected volatile boolean connected = false;
36
37        public boolean requestIdEnabled = false;
38
39        protected int protocolVersion = -1;
40
41        public String getAddress() {
42                return address;
43        }
44        protected final String address;
45
46        protected final Thread<Connection> senderThread = new Thread<>();
47        protected final Thread<Connection> receiverThread = new Thread<>();
48        protected final JoinableCollection<Thread<Connection>> threads = new JoinableCollection<>(true);
49
50        /**
51         *
52         */
53        public Connection(String address) {
54                this.address = address;
55                threads.add(senderThread);
56                threads.add(receiverThread);
57        }
58        public boolean isConnected() {
59                return connected;
60        }
61
62
63        protected static final String ARGUMENT_PATTERN_FRAGMENT = "((?:\\S+)|(?:\"[^\"]*\"))";
64        protected static final Pattern requestIdEnabledPattern = Pattern.compile("^\\s*([0-9]+)(?:\\s+" + ARGUMENT_PATTERN_FRAGMENT + ")?\\n$");
65        protected static final Pattern requestIDisabledPattern = Pattern.compile("^\\s*" + ARGUMENT_PATTERN_FRAGMENT + "?\\n$");
66        protected static final Pattern eventPattern = Pattern.compile("^\\s*(\\S+)\\s*(\\S+)\\n");
67
68
69        protected final Pair<Integer, String> parseRest(String rest) {
70                Matcher matcher = (requestIdEnabled ? requestIdEnabledPattern : requestIDisabledPattern).matcher(rest);
71                if (!matcher.matches()) {
72                        log.fatal("unmatched first line of input: " + rest);
73                        return null;
74                }
75                return new Pair<Integer, String>(requestIdEnabled ? Integer.parseInt(matcher.group(1)) : null, matcher.group(requestIdEnabled ? 2 : 1));
76        }
77
78        static final int BUFFER_LENGTH = 1024;
79
80        int readChars = 0;
81        int iterator = 0;
82        int bufferStart = 0;
83        char[] readBuffer = new char[BUFFER_LENGTH];
84
85        protected String getLine() {
86                StringBuilder lineBuffer = new StringBuilder();
87                try {
88                        while (!Thread.interrupted()) {
89                                while (iterator < readChars) {
90                                        if (readBuffer[iterator] != '\n') {
91                                                ++iterator;
92                                                continue;
93                                        }
94                                        lineBuffer.append(readBuffer, bufferStart, iterator - bufferStart + 1);
95                                        ++iterator;
96                                        bufferStart = iterator;
97                                        return lineBuffer.toString();
98                                }
99                                lineBuffer.append(readBuffer, bufferStart, readChars - bufferStart);
100
101                                readChars = 0;
102                                while (readChars == 0) {
103                                        try {
104                                                readChars = input.read(readBuffer);
105                                        } catch (SocketTimeoutException ignored) {
106                                                //timeout - continue
107                                        }
108                                }
109                                iterator = 0;
110                                bufferStart = 0;
111                        }
112                        throw new InterruptedException();
113                } catch (Exception e) {
114                        throw new FramsticksException().msg("failed to read line").cause(e);
115                }
116        }
117
118        protected abstract void receiverThreadRoutine();
119
120        protected void setUpThread(Thread<Connection> thread, String name) {
121                thread.setName("connection thread " + address + " " + name);
122        }
123
124        protected void runThreads() {
125                try {
126                        output = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Encoding.getFramsticksCharset()), true);
127                        input = new BufferedReader(new InputStreamReader(socket.getInputStream(), Encoding.getFramsticksCharset()));
128                } catch (IOException e) {
129                        log.error("buffer creation failure");
130                        return;
131                }
132
133                setUpThread(senderThread, "sender");
134                setUpThread(receiverThread, "receiver");
135                Dispatching.use(threads, this);
136
137                receiverThread.invokeLater(new RunAt<Connection>() {
138                        @Override
139                        public void run() {
140                                receiverThreadRoutine();
141                        }
142                });
143
144        }
145
146        /**
147         * Returns Query associated with query getId.
148         *
149         * @return Query associated with query getId.
150         */
151        public int getProtocolVersion() {
152                return protocolVersion;
153        }
154
155        @Override
156        protected void joinableInterrupt() {
157                protocolVersion = -1;
158
159                connected = false;
160                Dispatching.drop(threads, this);
161
162                // finish();
163        }
164
165        @Override
166        protected void joinableFinish() {
167                try {
168                        if (output != null) {
169                                output.close();
170                                output = null;
171                        }
172
173                        if (input != null) {
174                                input.close();
175                                input = null;
176                        }
177
178
179                        if (socket != null) {
180                                socket.close();
181                                socket = null;
182                        }
183                } catch (Exception e) {
184                        log.error("failed to stop connection: ", e);
185                }
186                log.debug("connection closed");
187        }
188
189        @Override
190        public void childChangedState(Joinable joinable, JoinableState state) {
191                proceedToState(state);
192        }
193
194        @Override
195        protected void joinableJoin() throws InterruptedException {
196                Dispatching.join(threads);
197
198        }
199
200}
Note: See TracBrowser for help on using the repository browser.