package com.framsticks.communication; import com.framsticks.util.Pair; import org.apache.log4j.Logger; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.lang.Thread; import java.net.Socket; import java.net.SocketTimeoutException; import java.util.regex.Matcher; import java.util.regex.Pattern; public abstract class Connection { protected final static Logger LOGGER = Logger.getLogger(Connection.class); protected PrintWriter output = null; protected BufferedReader input = null; protected Socket socket = null; protected volatile boolean connected = false; public boolean requestIdEnabled = false; protected int protocolVersion = -1; protected final com.framsticks.util.Thread senderThread = new com.framsticks.util.Thread(); protected Thread receiverThread; public boolean isConnected() { return connected; } public void close() { protocolVersion = -1; try { connected = false; senderThread.interrupt(); senderThread.join(); if (receiverThread != null) { receiverThread.interrupt(); receiverThread.join(); receiverThread = null; } if (output != null) { output.close(); output = null; } if (input != null) { input.close(); input = null; } if (socket != null) { socket.close(); socket = null; } LOGGER.info("connection closed"); } catch (Exception e) { LOGGER.error(e); } } protected static final String ARGUMENT_PATTERN_FRAGMENT = "((?:\\S+)|(?:\"[^\"]*\"))"; protected static Pattern requestIdEnabledPattern = Pattern.compile("^\\s*([0-9]+)(?:\\s+" + ARGUMENT_PATTERN_FRAGMENT + ")?\\n$"); protected static Pattern requestIDisabledPattern = Pattern.compile("^\\s*" + ARGUMENT_PATTERN_FRAGMENT + "?\\n$"); protected static Pattern eventPattern = Pattern.compile("^\\s*(\\S+)\\s*(\\S+)\\n"); protected final Pair parseRest(String rest) { Matcher matcher = (requestIdEnabled ? requestIdEnabledPattern : requestIDisabledPattern).matcher(rest); if (!matcher.matches()) { LOGGER.fatal("unmatched first line of input: " + rest); return null; } return new Pair(requestIdEnabled ? Integer.parseInt(matcher.group(1)) : null, matcher.group(requestIdEnabled ? 2 : 1)); } static final int BUFFER_LENGTH = 1024; int readChars = 0; int iterator = 0; int bufferStart = 0; char[] readBuffer = new char[BUFFER_LENGTH]; protected String getLine() throws Exception { StringBuilder lineBuffer = new StringBuilder(); while (!Thread.interrupted()) { while (iterator < readChars) { if (readBuffer[iterator] != '\n') { ++iterator; continue; } lineBuffer.append(readBuffer, bufferStart, iterator - bufferStart + 1); ++iterator; bufferStart = iterator; return lineBuffer.toString(); } lineBuffer.append(readBuffer, bufferStart, readChars - bufferStart); readChars = 0; while (readChars == 0) { try { readChars = input.read(readBuffer); } catch (SocketTimeoutException ignored) { //timeout - continue } } iterator = 0; bufferStart = 0; } throw new InterruptedException(); } protected abstract void receiverThreadRoutine() throws Exception; protected void runThreads() { try { output = new PrintWriter(socket.getOutputStream(), true); input = new BufferedReader(new InputStreamReader(socket.getInputStream())); } catch (IOException e) { LOGGER.error("buffer creation failure"); close(); return; } senderThread.setName(this + "-sender"); receiverThread = new Thread(new Runnable() { @Override public void run() { try { receiverThreadRoutine(); } catch (InterruptedException ignored) { LOGGER.debug("receiver thread interrupted"); } catch (Exception e) { LOGGER.error("error: " + e); close(); } } }); receiverThread.setName(this + "-receiver"); senderThread.start(); receiverThread.start(); } /** * Returns Query associated with query getId. * * @return Query associated with query getId. */ public int getProtocolVersion() { return protocolVersion; } }