source: java/main/src/main/java/com/framsticks/communication/Connection.java @ 77

Last change on this file since 77 was 77, checked in by psniegowski, 11 years ago

Add new java codebase.

File size: 4.7 KB
Line 
1package com.framsticks.communication;
2
3import com.framsticks.util.Pair;
4import org.apache.log4j.Logger;
5import java.io.BufferedReader;
6import java.io.IOException;
7import java.io.InputStreamReader;
8import java.io.PrintWriter;
9import java.lang.Thread;
10import java.net.Socket;
11import java.net.SocketTimeoutException;
12import java.util.regex.Matcher;
13import java.util.regex.Pattern;
14
15public abstract class Connection {
16
17        protected final static Logger LOGGER = Logger.getLogger(Connection.class);
18
19        protected PrintWriter output = null;
20        protected BufferedReader input = null;
21
22        protected Socket socket = null;
23
24        protected volatile boolean connected = false;
25
26        public boolean requestIdEnabled = false;
27
28        protected int protocolVersion = -1;
29
30        protected final com.framsticks.util.Thread senderThread = new com.framsticks.util.Thread();
31        protected Thread receiverThread;
32
33        public boolean isConnected() {
34                return connected;
35        }
36
37        public void close() {
38                protocolVersion = -1;
39                try {
40                        connected = false;
41
42            senderThread.interrupt();
43            senderThread.join();
44                        if (receiverThread != null) {
45                                receiverThread.interrupt();
46                                receiverThread.join();
47                                receiverThread = null;
48                        }
49
50                        if (output != null) {
51                                output.close();
52                                output = null;
53                        }
54
55                        if (input != null) {
56                                input.close();
57                                input = null;
58                        }
59
60
61                        if (socket != null) {
62                                socket.close();
63                                socket = null;
64                        }
65
66                        LOGGER.info("connection closed");
67                } catch (Exception e) {
68                        LOGGER.error(e);
69                }
70
71        }
72
73        protected static final String ARGUMENT_PATTERN_FRAGMENT = "((?:\\S+)|(?:\"[^\"]*\"))";
74        protected static Pattern requestIdEnabledPattern = Pattern.compile("^\\s*([0-9]+)(?:\\s+" + ARGUMENT_PATTERN_FRAGMENT + ")?\\n$");
75        protected static Pattern requestIDisabledPattern = Pattern.compile("^\\s*" + ARGUMENT_PATTERN_FRAGMENT + "?\\n$");
76        protected static Pattern eventPattern = Pattern.compile("^\\s*(\\S+)\\s*(\\S+)\\n");
77
78
79    protected final Pair<Integer, String> parseRest(String rest) {
80        Matcher matcher = (requestIdEnabled ? requestIdEnabledPattern : requestIDisabledPattern).matcher(rest);
81        if (!matcher.matches()) {
82            LOGGER.fatal("unmatched first line of input: " + rest);
83            return null;
84        }
85        return new Pair<Integer, String>(requestIdEnabled ? Integer.parseInt(matcher.group(1)) : null, matcher.group(requestIdEnabled ? 2 : 1));
86    }
87
88    static final int BUFFER_LENGTH = 1024;
89
90    int readChars = 0;
91    int iterator = 0;
92    int bufferStart = 0;
93    char[] readBuffer = new char[BUFFER_LENGTH];
94
95    protected String getLine() throws Exception {
96        StringBuilder lineBuffer = new StringBuilder();
97        while (!Thread.interrupted()) {
98            while (iterator < readChars) {
99                if (readBuffer[iterator] != '\n') {
100                    ++iterator;
101                    continue;
102                }
103                lineBuffer.append(readBuffer, bufferStart, iterator - bufferStart + 1);
104                ++iterator;
105                bufferStart = iterator;
106                return lineBuffer.toString();
107            }
108            lineBuffer.append(readBuffer, bufferStart, readChars - bufferStart);
109
110            readChars = 0;
111            while (readChars == 0) {
112                try {
113                    readChars = input.read(readBuffer);
114                } catch (SocketTimeoutException ignored) {
115                    //timeout - continue
116                }
117            }
118            iterator = 0;
119            bufferStart = 0;
120        }
121        throw new InterruptedException();
122    }
123
124    protected abstract void receiverThreadRoutine() throws Exception;
125
126    protected void runThreads() {
127        try {
128            output = new PrintWriter(socket.getOutputStream(), true);
129            input = new BufferedReader(new InputStreamReader(socket.getInputStream()));
130        } catch (IOException e) {
131            LOGGER.error("buffer creation failure");
132            close();
133            return;
134        }
135
136        senderThread.setName(this + "-sender");
137        receiverThread = new Thread(new Runnable() {
138            @Override
139            public void run() {
140                try {
141                    receiverThreadRoutine();
142                } catch (InterruptedException ignored) {
143                    LOGGER.debug("receiver thread interrupted");
144                } catch (Exception e) {
145                    LOGGER.error("error: " + e);
146                    close();
147                }
148            }
149        });
150        receiverThread.setName(this + "-receiver");
151
152        senderThread.start();
153        receiverThread.start();
154    }
155
156
157        /**
158         * Returns Query associated with query getId.
159         *
160         * @return Query associated with query getId.
161         */
162
163        public int getProtocolVersion() {
164                return protocolVersion;
165        }
166
167
168
169}
Note: See TracBrowser for help on using the repository browser.