Ignore:
Timestamp:
07/06/13 03:51:11 (11 years ago)
Author:
psniegowski
Message:

HIGHLIGHTS:

  • add proper exception passing between communication sides:

if exception occur during handling client request, it is
automatically passed as comment to error response.

it may be used to snoop communication between peers

  • fix algorithm choosing text controls in GUI
  • allow GUI testing in virtual frame buffer (xvfb)

FEST had some problem with xvfb but workaround was found

supports tab-completion based on requests history

CHANGELOG:
Further improve handling of exceptions in GUI.

Add StatusBar? implementing ExceptionResultHandler?.

Make completion processing asynchronous.

Minor changes.

Improve completion in console.

Improve history in InteractiveConsole?.

First working version of DirectConsole?.

Minor changes.

Make Connection.address non final.

It is more suitable to use in configuration.

Improvement of consoles.

Improve PopupMenu? and closing of FrameJoinable?.

Fix BrowserTest?.

Found bug with FEST running under xvfb.

JButtonFixture.click() is not working under xvfb.
GuiTest? has wrapper which uses JButton.doClick() directly.

Store CompositeParam? param in TreeNode?.

Simplify ClientSideManagedConnection? connecting.

There is now connectedFunctor needed, ApplicationRequests? can be
send right after creation. They are buffered until the version
and features are negotiated.

Narow down interface of ClientSideManagedConnection?.

Allow that connection specialization send only
ApplicationRequests?.

Improve policy of text control choosing.

Change name of Genotype in BrowserTest?.

Make BrowserTest? change name of Genotype.

Minor change.

First working draft of TrackConsole?.

Simplify Consoles.

More improvements with gui joinables.

Unify initialization on gui joinables.

More rework of Frame based entities.

Refactorize structure of JFrames based entities.

Extract GuiTest? from BrowserBaseTest?.

Reorganize Console classes structure.

Add Collection view to JoinableCollection?.

Configure timeout in testing.

Minor changes.

Rework connections hierarchy.

Add Mode to the get operation.

Make get and set in Tree take PrimitiveParam?.

Unify naming of operations.

Make RunAt? use the given ExceptionHandler?.

It wraps the virtual runAt() method call with
try-catch passing exception to handler.

Force RunAt? to include ExceptionHandler?.

Improve ClientAtServer?.

Minor change.

Another sweep with FindBugs?.

Rename Instance to Tree.

Minor changes.

Minor changes.

Further clarify semantics of Futures.

Add FutureHandler?.

FutureHandler? is refinement of Future, that proxifies
exception handling to ExceptionResultHandler? given
at construction time.

Remove StateFunctor? (use Future<Void> instead).

Make Connection use Future<Void>.

Unparametrize *ResponseFuture?.

Remove StateCallback? not needed anymore.

Distinguish between sides of ResponseFuture?.

Base ResponseCallback? on Future (now ResponseFuture?).

Make asynchronous store taking Future for flags.

Implement storeValue in ObjectInstance?.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • java/main/src/main/java/com/framsticks/communication/Connection.java

    r96 r97  
    11package com.framsticks.communication;
    22
     3import com.framsticks.params.annotations.AutoAppendAnnotation;
     4import com.framsticks.params.annotations.FramsClassAnnotation;
     5import com.framsticks.params.annotations.ParamAnnotation;
    36import com.framsticks.util.FramsticksException;
    47import com.framsticks.util.io.Encoding;
    5 import com.framsticks.util.lang.Pair;
     8import com.framsticks.util.lang.Strings;
     9
     10import org.apache.log4j.Level;
    611import org.apache.log4j.Logger;
    712import java.io.BufferedReader;
     
    1217import java.net.Socket;
    1318import java.net.SocketTimeoutException;
     19import java.util.Collection;
     20import java.util.HashSet;
     21import java.util.Set;
    1422
    1523import com.framsticks.util.dispatching.AbstractJoinable;
     
    2129import com.framsticks.util.dispatching.RunAt;
    2230import com.framsticks.util.dispatching.Thread;
    23 
     31import com.framsticks.util.dispatching.ThrowExceptionHandler;
     32
     33@FramsClassAnnotation
    2434public abstract class Connection extends AbstractJoinable implements JoinableParent {
    2535
    2636        protected final static Logger log = Logger.getLogger(Connection.class);
    2737
    28         protected PrintWriter output = null;
    29         protected BufferedReader input = null;
     38        private PrintWriter output = null;
     39        private BufferedReader input = null;
    3040
    3141        protected Socket socket = null;
    3242
    33         public boolean requestIdEnabled = false;
    34 
    35         protected int protocolVersion = -1;
    36 
    37         public String getAddress() {
    38                 return address;
    39         }
    40         protected final String address;
    41         protected final String description;
     43        protected Address address;
     44        protected String description = "connection";
    4245
    4346        protected final Thread<Connection> senderThread = new Thread<>();
    4447        protected final Thread<Connection> receiverThread = new Thread<>();
    4548        protected final JoinableCollection<Thread<Connection>> threads = new JoinableCollection<>();
    46 
    47         protected void setUpThreadNames(String name) {
    48         }
     49        protected final Set<ConnectionListener> listeners = new HashSet<>();
    4950
    5051        /**
    5152         *
    5253         */
    53         public Connection(String address, String description) {
    54                 this.address = address;
    55                 this.description = description;
    56                 threads.setObservableName(address + " connection threads");
     54        public Connection() {
    5755                threads.add(senderThread);
    5856                threads.add(receiverThread);
    5957
     58        }
     59
     60        protected void updateNames() {
     61                if (address == null) {
     62                        return;
     63                }
    6064                senderThread.setName(description + " thread " + address + " sender");
    6165                receiverThread.setName(description + " thread " + address + " receiver");
     66                threads.setObservableName(address + " connection threads");
     67        }
     68
     69        public void setDescription(String description) {
     70                this.description = description;
     71                updateNames();
     72        }
     73
     74        @AutoAppendAnnotation
     75        public Connection setAddress(Address address) {
     76                this.address = address;
     77                updateNames();
     78                return this;
     79        }
     80
     81        @ParamAnnotation
     82        public Connection setAddress(String address) {
     83                return setAddress(new Address(address));
    6284        }
    6385
    6486        public synchronized boolean isConnected() {
    6587                return socket != null && socket.isConnected();
    66         }
    67 
    68 
    69         // protected static final String ARGUMENT_PATTERN_FRAGMENT = "((?:\\S+)|(?:\\\"[^\"]*\\\"))";
    70         // protected static final Pattern REQUEST_ID_ENABLED_PATTERN = Pattern.compile("^\\s*([0-9]+)(?:\\s+" + ARGUMENT_PATTERN_FRAGMENT + ")?$");
    71         // protected static final Pattern REQUEST_ID_DISABLED_PATTERN = Pattern.compile("^\\s*" + ARGUMENT_PATTERN_FRAGMENT + "?$");
    72 
    73         // // protected final Pair<String, String> breakLine(String line)
    74         // protected final Pair<Integer, String> parseRest(String rest) {
    75         //      Matcher matcher = (requestIdEnabled ? REQUEST_ID_ENABLED_PATTERN : REQUEST_ID_DISABLED_PATTERN).matcher(rest);
    76         //      if (!matcher.matches()) {
    77         //              log.fatal("unmatched first line of input: '" + rest + "'");
    78         //              return null;
    79         //      }
    80         //      return new Pair<Integer, String>(requestIdEnabled ? Integer.parseInt(matcher.group(1)) : null, matcher.group(requestIdEnabled ? 2 : 1));
    81         // }
    82 
    83         protected final Pair<Integer, CharSequence> takeRequestId(CharSequence line) {
    84                 return Request.takeRequestId(requestIdEnabled, line);
    8588        }
    8689
     
    105108                                        ++iterator;
    106109                                        bufferStart = iterator;
    107                                         return lineBuffer.toString();
     110                                        String line = lineBuffer.toString();
     111
     112                                        synchronized (listeners) {
     113                                                for (ConnectionListener l : listeners) {
     114                                                        l.connectionIncomming(line);
     115                                                }
     116                                        }
     117
     118                                        return line;
    108119                                }
    109120                                final int length = readChars - bufferStart;
     
    131142        }
    132143
     144        protected void putLine(String line) {
     145                synchronized (listeners) {
     146                        for (ConnectionListener l : listeners) {
     147                                l.connectionOutgoing(line);
     148                        }
     149                }
     150                output.println(line);
     151        }
     152
     153        protected void flushOut() {
     154                output.flush();
     155        }
     156
     157        protected abstract void processNextInputBatch();
     158
     159        protected final void processInputBatchesUntilClosed() {
     160                while (isRunning() && isConnected()) {
     161                        try {
     162                                processNextInputBatch();
     163                        } catch (Exception e) {
     164                                log.log(isRunning() ? Level.ERROR : Level.DEBUG, "caught exception: ", e);
     165                                break;
     166                        }
     167                }
     168        }
     169
    133170        protected abstract void receiverThreadRoutine();
    134171
    135 
     172        // @SuppressWarnings("NN_NAKED_NOTIFY")
    136173        protected void setupStreams() {
    137174                try {
     
    146183        }
    147184
    148         /**
    149          * Returns Query associated with query getId.
    150          *
    151          * @return Query associated with query getId.
    152          */
    153         public int getProtocolVersion() {
    154                 return protocolVersion;
    155         }
    156185
    157186
     
    187216        @Override
    188217        public String getName() {
    189                 return description + " " + address;
     218                return address != null ? description + " " + address : description;
    190219        }
    191220
     
    194223                Dispatching.use(threads, this);
    195224
    196                 senderThread.dispatch(new RunAt<Connection>() {
     225                senderThread.dispatch(new RunAt<Connection>(ThrowExceptionHandler.getInstance()) {
    197226                        @Override
    198                         public void run() {
     227                        protected void runAt() {
    199228                                synchronized (Connection.this) {
    200229                                        while (state.equals(JoinableState.RUNNING) && output == null) {
     
    205234                });
    206235
    207                 receiverThread.dispatch(new RunAt<Connection>() {
     236                receiverThread.dispatch(new RunAt<Connection>(ThrowExceptionHandler.getInstance()) {
    208237                        @Override
    209                         public void run() {
     238                        protected void runAt() {
    210239                                receiverThreadRoutine();
     240                                interrupt();
     241                                finish();
    211242                        }
    212243                });
     
    215246        @Override
    216247        protected void joinableInterrupt() {
    217                 protocolVersion = -1;
    218248                Dispatching.drop(threads, this);
    219249                finish();
     
    226256        }
    227257
     258        protected static void startClientConnection(Connection connection) {
     259                while (connection.isRunning() && !connection.isConnected()) {
     260                        log.debug("connecting to " + connection.address);
     261                        try {
     262                                connection.socket = new Socket(connection.getAddressObject().getHostName(), connection.getAddressObject().getPort());
     263                        } catch (IOException e) {
     264                                log.info(connection + " failed to connect (retrying): " + e);
     265                                Dispatching.sleep(0.5);
     266                        }
     267                }
     268
     269                log.debug(connection + " connected");
     270                try {
     271                        connection.socket.setSoTimeout(500);
     272                        connection.setupStreams();
     273                } catch (Exception e) {
     274                        throw new FramsticksException().msg("failed to initialize socket").cause(e).arg("connection", connection);
     275                }
     276        }
     277
     278        /**
     279         * @return the address
     280         */
     281        @ParamAnnotation
     282        public String getAddress() {
     283                return Strings.toStringNullProof(address, "?");
     284        }
     285
     286        public Address getAddressObject() {
     287                return address;
     288        }
     289
     290
     291        /**
     292         * @return the listeners
     293         */
     294        public Collection<ConnectionListener> getListeners() {
     295                return listeners;
     296        }
     297
     298        public static <T extends Connection> T to(T connection, Address address) {
     299                connection.setAddress(address);
     300                return connection;
     301        }
     302
    228303}
Note: See TracChangeset for help on using the changeset viewer.