source:
java/main/src/main/java/com/framsticks/communication/Connection.java
@
96
Last change on this file since 96 was 96, checked in by , 11 years ago | |
---|---|
File size: 6.2 KB |
Rev | Line | |
---|---|---|
[77] | 1 | package com.framsticks.communication; |
2 | ||
[88] | 3 | import com.framsticks.util.FramsticksException; |
[85] | 4 | import com.framsticks.util.io.Encoding; |
[84] | 5 | import com.framsticks.util.lang.Pair; |
[77] | 6 | import org.apache.log4j.Logger; |
7 | import java.io.BufferedReader; | |
8 | import java.io.IOException; | |
9 | import java.io.InputStreamReader; | |
[85] | 10 | import java.io.OutputStreamWriter; |
[77] | 11 | import java.io.PrintWriter; |
12 | import java.net.Socket; | |
13 | import java.net.SocketTimeoutException; | |
14 | ||
[88] | 15 | import com.framsticks.util.dispatching.AbstractJoinable; |
16 | import com.framsticks.util.dispatching.Dispatching; | |
17 | import com.framsticks.util.dispatching.Joinable; | |
18 | import com.framsticks.util.dispatching.JoinableCollection; | |
19 | import com.framsticks.util.dispatching.JoinableParent; | |
20 | import com.framsticks.util.dispatching.JoinableState; | |
[85] | 21 | import com.framsticks.util.dispatching.RunAt; |
22 | import com.framsticks.util.dispatching.Thread; | |
23 | ||
[88] | 24 | public abstract class Connection extends AbstractJoinable implements JoinableParent { |
[77] | 25 | |
[84] | 26 | protected final static Logger log = Logger.getLogger(Connection.class); |
[77] | 27 | |
28 | protected PrintWriter output = null; | |
29 | protected BufferedReader input = null; | |
30 | ||
31 | protected Socket socket = null; | |
32 | ||
33 | public boolean requestIdEnabled = false; | |
34 | ||
35 | protected int protocolVersion = -1; | |
36 | ||
[88] | 37 | public String getAddress() { |
38 | return address; | |
39 | } | |
40 | protected final String address; | |
[96] | 41 | protected final String description; |
[88] | 42 | |
[85] | 43 | protected final Thread<Connection> senderThread = new Thread<>(); |
44 | protected final Thread<Connection> receiverThread = new Thread<>(); | |
[96] | 45 | protected final JoinableCollection<Thread<Connection>> threads = new JoinableCollection<>(); |
[77] | 46 | |
[96] | 47 | protected void setUpThreadNames(String name) { |
48 | } | |
49 | ||
[88] | 50 | /** |
51 | * | |
52 | */ | |
[96] | 53 | public Connection(String address, String description) { |
[88] | 54 | this.address = address; |
[96] | 55 | this.description = description; |
56 | threads.setObservableName(address + " connection threads"); | |
[88] | 57 | threads.add(senderThread); |
58 | threads.add(receiverThread); | |
[96] | 59 | |
60 | senderThread.setName(description + " thread " + address + " sender"); | |
61 | receiverThread.setName(description + " thread " + address + " receiver"); | |
[88] | 62 | } |
[96] | 63 | |
64 | public synchronized boolean isConnected() { | |
65 | return socket != null && socket.isConnected(); | |
[77] | 66 | } |
67 | ||
68 | ||
[96] | 69 | // protected static final String ARGUMENT_PATTERN_FRAGMENT = "((?:\\S+)|(?:\\\"[^\"]*\\\"))"; |
70 | // protected static final Pattern REQUEST_ID_ENABLED_PATTERN = Pattern.compile("^\\s*([0-9]+)(?:\\s+" + ARGUMENT_PATTERN_FRAGMENT + ")?$"); | |
71 | // protected static final Pattern REQUEST_ID_DISABLED_PATTERN = Pattern.compile("^\\s*" + ARGUMENT_PATTERN_FRAGMENT + "?$"); | |
[77] | 72 | |
[96] | 73 | // // protected final Pair<String, String> breakLine(String line) |
74 | // protected final Pair<Integer, String> parseRest(String rest) { | |
75 | // Matcher matcher = (requestIdEnabled ? REQUEST_ID_ENABLED_PATTERN : REQUEST_ID_DISABLED_PATTERN).matcher(rest); | |
76 | // if (!matcher.matches()) { | |
77 | // log.fatal("unmatched first line of input: '" + rest + "'"); | |
78 | // return null; | |
79 | // } | |
80 | // return new Pair<Integer, String>(requestIdEnabled ? Integer.parseInt(matcher.group(1)) : null, matcher.group(requestIdEnabled ? 2 : 1)); | |
81 | // } | |
[77] | 82 | |
[96] | 83 | protected final Pair<Integer, CharSequence> takeRequestId(CharSequence line) { |
84 | return Request.takeRequestId(requestIdEnabled, line); | |
[84] | 85 | } |
[77] | 86 | |
[84] | 87 | static final int BUFFER_LENGTH = 1024; |
[77] | 88 | |
[84] | 89 | int readChars = 0; |
90 | int iterator = 0; | |
91 | int bufferStart = 0; | |
[96] | 92 | final char[] readBuffer = new char[BUFFER_LENGTH]; |
[77] | 93 | |
[88] | 94 | protected String getLine() { |
[96] | 95 | final StringBuilder lineBuffer = new StringBuilder(); |
[88] | 96 | try { |
97 | while (!Thread.interrupted()) { | |
98 | while (iterator < readChars) { | |
99 | if (readBuffer[iterator] != '\n') { | |
100 | ++iterator; | |
101 | continue; | |
102 | } | |
[96] | 103 | /** Do not append new line. */ |
104 | lineBuffer.append(readBuffer, bufferStart, iterator - bufferStart); | |
[84] | 105 | ++iterator; |
[88] | 106 | bufferStart = iterator; |
107 | return lineBuffer.toString(); | |
[84] | 108 | } |
[96] | 109 | final int length = readChars - bufferStart; |
110 | if (length > 0) { | |
111 | assert bufferStart >= 0 && bufferStart < BUFFER_LENGTH; | |
112 | assert bufferStart + length <= BUFFER_LENGTH; | |
113 | lineBuffer.append(readBuffer, bufferStart, length); | |
114 | } | |
[77] | 115 | |
[88] | 116 | readChars = 0; |
117 | while (readChars == 0) { | |
118 | try { | |
119 | readChars = input.read(readBuffer); | |
120 | } catch (SocketTimeoutException ignored) { | |
121 | //timeout - continue | |
122 | } | |
[84] | 123 | } |
[88] | 124 | iterator = 0; |
125 | bufferStart = 0; | |
[84] | 126 | } |
[88] | 127 | throw new InterruptedException(); |
128 | } catch (Exception e) { | |
129 | throw new FramsticksException().msg("failed to read line").cause(e); | |
[84] | 130 | } |
131 | } | |
[77] | 132 | |
[88] | 133 | protected abstract void receiverThreadRoutine(); |
[77] | 134 | |
[88] | 135 | |
[96] | 136 | protected void setupStreams() { |
[84] | 137 | try { |
[85] | 138 | output = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Encoding.getFramsticksCharset()), true); |
139 | input = new BufferedReader(new InputStreamReader(socket.getInputStream(), Encoding.getFramsticksCharset())); | |
[96] | 140 | synchronized (this) { |
141 | this.notifyAll(); | |
142 | } | |
[84] | 143 | } catch (IOException e) { |
[96] | 144 | throw new FramsticksException().msg("failed to setup streams").cause(e).arg("connection", this); |
[84] | 145 | } |
146 | } | |
[77] | 147 | |
148 | /** | |
149 | * Returns Query associated with query getId. | |
150 | * | |
151 | * @return Query associated with query getId. | |
152 | */ | |
153 | public int getProtocolVersion() { | |
154 | return protocolVersion; | |
155 | } | |
156 | ||
157 | ||
[88] | 158 | @Override |
159 | protected void joinableFinish() { | |
160 | try { | |
161 | if (output != null) { | |
162 | output.close(); | |
163 | output = null; | |
164 | } | |
165 | ||
166 | if (input != null) { | |
167 | input.close(); | |
168 | input = null; | |
169 | } | |
170 | ||
171 | ||
172 | if (socket != null) { | |
173 | socket.close(); | |
174 | socket = null; | |
175 | } | |
176 | } catch (Exception e) { | |
177 | log.error("failed to stop connection: ", e); | |
178 | } | |
179 | log.debug("connection closed"); | |
180 | } | |
181 | ||
182 | @Override | |
183 | public void childChangedState(Joinable joinable, JoinableState state) { | |
184 | proceedToState(state); | |
185 | } | |
186 | ||
187 | @Override | |
[96] | 188 | public String getName() { |
189 | return description + " " + address; | |
190 | } | |
191 | ||
192 | @Override | |
193 | protected void joinableStart() { | |
194 | Dispatching.use(threads, this); | |
195 | ||
196 | senderThread.dispatch(new RunAt<Connection>() { | |
197 | @Override | |
198 | public void run() { | |
199 | synchronized (Connection.this) { | |
200 | while (state.equals(JoinableState.RUNNING) && output == null) { | |
201 | Dispatching.wait(Connection.this, 500); | |
202 | } | |
203 | } | |
204 | } | |
205 | }); | |
206 | ||
207 | receiverThread.dispatch(new RunAt<Connection>() { | |
208 | @Override | |
209 | public void run() { | |
210 | receiverThreadRoutine(); | |
211 | } | |
212 | }); | |
213 | } | |
214 | ||
215 | @Override | |
216 | protected void joinableInterrupt() { | |
217 | protocolVersion = -1; | |
218 | Dispatching.drop(threads, this); | |
219 | finish(); | |
220 | } | |
221 | ||
222 | ||
223 | @Override | |
[88] | 224 | protected void joinableJoin() throws InterruptedException { |
225 | Dispatching.join(threads); | |
226 | } | |
227 | ||
[77] | 228 | } |
Note: See TracBrowser
for help on using the repository browser.