source: java/main/src/main/java/com/framsticks/communication/Connection.java @ 85

Last change on this file since 85 was 85, checked in by psniegowski, 11 years ago

HIGHLIGHTS:

  • upgrade to Java 7
    • use try-multi-catch clauses
    • use try-with-resources were appropriate
  • configure FindBugs? (use mvn site and then navigate in browser to the report)
    • remove most bugs found
  • parametrize Dispatching environment (Dispatcher, RunAt?) to enforce more control on the place of closures actual call

CHANGELOG:
Rework FavouritesXMLFactory.

FindBugs?. Thread start.

FindBugs?. Minor change.

FindBugs?. Iterate over entrySet.

FindBugs?. Various.

FindBug?.

FindBug?. Encoding.

FindBug?. Final fields.

FindBug?.

Remove synchronization bug in ClientConnection?.

Experiments with findbugs.

Finish parametrization.

Make RunAt? an abstract class.

More changes in parametrization.

More changes in parametrizing dispatching.

Several changes to parametrize tasks.

Rename Runnable to RunAt?.

Add specific framsticks Runnable.

Add JSR305 (annotations).

Add findbugs reporting.

More improvements to ParamBuilder? wording.

Make FramsClass? accept also ParamBuilder?.

Change wording of ParamBuilder?.

Change wording of Request creation.

Use Java 7 exception catch syntax.

Add ScopeEnd? class.

Upgrade to Java 7.

File size: 4.3 KB
Line 
1package com.framsticks.communication;
2
3import com.framsticks.util.io.Encoding;
4import com.framsticks.util.lang.Pair;
5import org.apache.log4j.Logger;
6import java.io.BufferedReader;
7import java.io.IOException;
8import java.io.InputStreamReader;
9import java.io.OutputStreamWriter;
10import java.io.PrintWriter;
11import java.net.Socket;
12import java.net.SocketTimeoutException;
13import java.util.regex.Matcher;
14import java.util.regex.Pattern;
15
16import com.framsticks.util.dispatching.RunAt;
17import com.framsticks.util.dispatching.Thread;
18
19public abstract class Connection {
20
21        protected final static Logger log = Logger.getLogger(Connection.class);
22
23        protected PrintWriter output = null;
24        protected BufferedReader input = null;
25
26        protected Socket socket = null;
27
28        protected volatile boolean connected = false;
29
30        public boolean requestIdEnabled = false;
31
32        protected int protocolVersion = -1;
33
34        protected final Thread<Connection> senderThread = new Thread<>();
35        protected final Thread<Connection> receiverThread = new Thread<>();
36
37        public boolean isConnected() {
38                return connected;
39        }
40
41        public void close() {
42                protocolVersion = -1;
43                try {
44                        connected = false;
45
46                        senderThread.interrupt();
47                        senderThread.join();
48
49                        receiverThread.interrupt();
50                        receiverThread.join();
51
52                        if (output != null) {
53                                output.close();
54                                output = null;
55                        }
56
57                        if (input != null) {
58                                input.close();
59                                input = null;
60                        }
61
62
63                        if (socket != null) {
64                                socket.close();
65                                socket = null;
66                        }
67
68                        log.info("connection closed");
69                } catch (Exception e) {
70                        log.error(e);
71                }
72
73        }
74
75        protected static final String ARGUMENT_PATTERN_FRAGMENT = "((?:\\S+)|(?:\"[^\"]*\"))";
76        protected static final Pattern requestIdEnabledPattern = Pattern.compile("^\\s*([0-9]+)(?:\\s+" + ARGUMENT_PATTERN_FRAGMENT + ")?\\n$");
77        protected static final Pattern requestIDisabledPattern = Pattern.compile("^\\s*" + ARGUMENT_PATTERN_FRAGMENT + "?\\n$");
78        protected static final Pattern eventPattern = Pattern.compile("^\\s*(\\S+)\\s*(\\S+)\\n");
79
80
81        protected final Pair<Integer, String> parseRest(String rest) {
82                Matcher matcher = (requestIdEnabled ? requestIdEnabledPattern : requestIDisabledPattern).matcher(rest);
83                if (!matcher.matches()) {
84                        log.fatal("unmatched first line of input: " + rest);
85                        return null;
86                }
87                return new Pair<Integer, String>(requestIdEnabled ? Integer.parseInt(matcher.group(1)) : null, matcher.group(requestIdEnabled ? 2 : 1));
88        }
89
90        static final int BUFFER_LENGTH = 1024;
91
92        int readChars = 0;
93        int iterator = 0;
94        int bufferStart = 0;
95        char[] readBuffer = new char[BUFFER_LENGTH];
96
97        protected String getLine() throws Exception {
98                StringBuilder lineBuffer = new StringBuilder();
99                while (!Thread.interrupted()) {
100                        while (iterator < readChars) {
101                                if (readBuffer[iterator] != '\n') {
102                                        ++iterator;
103                                        continue;
104                                }
105                                lineBuffer.append(readBuffer, bufferStart, iterator - bufferStart + 1);
106                                ++iterator;
107                                bufferStart = iterator;
108                                return lineBuffer.toString();
109                        }
110                        lineBuffer.append(readBuffer, bufferStart, readChars - bufferStart);
111
112                        readChars = 0;
113                        while (readChars == 0) {
114                                try {
115                                        readChars = input.read(readBuffer);
116                                } catch (SocketTimeoutException ignored) {
117                                        //timeout - continue
118                                }
119                        }
120                        iterator = 0;
121                        bufferStart = 0;
122                }
123                throw new InterruptedException();
124        }
125
126        protected abstract void receiverThreadRoutine() throws Exception;
127
128        protected void runThreads() {
129                try {
130                        output = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Encoding.getFramsticksCharset()), true);
131                        input = new BufferedReader(new InputStreamReader(socket.getInputStream(), Encoding.getFramsticksCharset()));
132                } catch (IOException e) {
133                        log.error("buffer creation failure");
134                        close();
135                        return;
136                }
137
138                senderThread.setName(this + "-sender");
139                receiverThread.setName(this + "-receiver");
140
141                senderThread.start();
142                receiverThread.start();
143
144                receiverThread.invokeLater(new RunAt<Connection>() {
145                        @Override
146                        public void run() {
147                                try {
148                                        receiverThreadRoutine();
149                                } catch (InterruptedException ignored) {
150                                        log.debug("receiver thread interrupted");
151                                } catch (Exception e) {
152                                        log.error("error: " + e);
153                                        close();
154                                }
155                        }
156                });
157
158        }
159
160
161        /**
162         * Returns Query associated with query getId.
163         *
164         * @return Query associated with query getId.
165         */
166
167        public int getProtocolVersion() {
168                return protocolVersion;
169        }
170
171
172
173}
Note: See TracBrowser for help on using the repository browser.