Ignore:
Timestamp:
07/04/13 20:29:50 (11 years ago)
Author:
psniegowski
Message:

HIGHLIGHTS:

  • cleanup Instance management
    • extract Instance interface
    • extract Instance common algorithms to InstanceUtils?
  • fix closing issues: Ctrl+C or window close button

properly shutdown whole program

by Java Framsticks framework

  • fix parsing and printing of all request types
  • hide exception passing in special handle method of closures
    • substantially improve readability of closures
    • basically enable use of exception in asynchronous closures

(thrown exception is transported back to the caller)

  • implement call request on both sides

CHANGELOG:
Further improve calling.

Improve instance calling.

Calling is working on both sides.

Improve exception handling in testing.

Waiters do not supercede other apllication exception being thrown.

Finished parsing and printing of all request types (with tests).

Move implementation and tests of request parsing to Request.

Add tests for Requests.

Improve waits in asynchronours tests.

Extract more algorithms to InstanceUtils?.

Extract Instance.resolve to InstanceUtils?.

Improve naming.

Improve passing exception in InstanceClient?.

Hide calling of passed functor in StateCallback?.

Hide Exception passing in asynchronous closures.

Hide exception passing in Future.

Make ResponseCallback? an abstract class.

Make Future an abstract class.

Minor change.

Move getPath to Path.to()

Move bindAccess to InstanceUtils?.

Extract common things to InstanceUtils?.

Fix synchronization bug in Connection.

Move resolve to InstanceUtils?.

Allow names of Joinable to be dynamic.

Add support for set request server side.

More fixes in communication.

Fix issues with parsing in connection.

Cut new line characters when reading.

More improvements.

Migrate closures to FramsticksException?.

Several changes.

Extract resolveAndFetch to InstanceUtils? algorithms.

Test resolving and fetching.

More fixes with function signature deduction.

Do not print default values in SimpleAbstractAccess?.

Add test of FramsClass? printing.

Improve FramsticksException? messages.

Add explicit dispatcher synchronization feature.

Rework assertions in tests.

Previous solution was not generic enough.

Allow addition of joinables to collection after start.

Extract SimulatorInstance? from RemoteInstance?.

Remove PrivateJoinableCollection?.

Improve connections.

Move shutdown hook to inside the Monitor.

It should work in TestNG tests, but it seems that
hooks are not called.

In ServerTest? client connects to testing server.

Move socket initialization to receiver thread.

Add proper closing on Ctrl+C (don't use signals).

Fix bugs with server accepting connections.

Merge Entity into Joinable.

Reworking ServerInstance?.

Extract more algorithm to InstanceUtils?.

Extract some common functionality from AbstractInstance?.

Functions were placed in InstanceUtils?.

Hide registry of Instance.

Use ValueParam? in Instance interface.

Minor change.

Extract Instance interface.

Old Instance is now AbstractInstance?.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • java/main/src/main/java/com/framsticks/communication/Connection.java

    r90 r96  
    1212import java.net.Socket;
    1313import java.net.SocketTimeoutException;
    14 import java.util.regex.Matcher;
    15 import java.util.regex.Pattern;
    1614
    1715import com.framsticks.util.dispatching.AbstractJoinable;
     
    3331        protected Socket socket = null;
    3432
    35         protected volatile boolean connected = false;
    36 
    3733        public boolean requestIdEnabled = false;
    3834
     
    4339        }
    4440        protected final String address;
     41        protected final String description;
    4542
    4643        protected final Thread<Connection> senderThread = new Thread<>();
    4744        protected final Thread<Connection> receiverThread = new Thread<>();
    48         protected final JoinableCollection<Thread<Connection>> threads = new JoinableCollection<>(true);
     45        protected final JoinableCollection<Thread<Connection>> threads = new JoinableCollection<>();
     46
     47        protected void setUpThreadNames(String name) {
     48        }
    4949
    5050        /**
    5151         *
    5252         */
    53         public Connection(String address) {
     53        public Connection(String address, String description) {
    5454                this.address = address;
     55                this.description = description;
     56                threads.setObservableName(address + " connection threads");
    5557                threads.add(senderThread);
    5658                threads.add(receiverThread);
    57         }
    58         public boolean isConnected() {
    59                 return connected;
    60         }
    61 
    62 
    63         protected static final String ARGUMENT_PATTERN_FRAGMENT = "((?:\\S+)|(?:\"[^\"]*\"))";
    64         protected static final Pattern requestIdEnabledPattern = Pattern.compile("^\\s*([0-9]+)(?:\\s+" + ARGUMENT_PATTERN_FRAGMENT + ")?\\n$");
    65         protected static final Pattern requestIDisabledPattern = Pattern.compile("^\\s*" + ARGUMENT_PATTERN_FRAGMENT + "?\\n$");
    66         protected static final Pattern eventPattern = Pattern.compile("^\\s*(\\S+)\\s*(\\S+)\\n");
    67 
    68 
    69         protected final Pair<Integer, String> parseRest(String rest) {
    70                 Matcher matcher = (requestIdEnabled ? requestIdEnabledPattern : requestIDisabledPattern).matcher(rest);
    71                 if (!matcher.matches()) {
    72                         log.fatal("unmatched first line of input: " + rest);
    73                         return null;
    74                 }
    75                 return new Pair<Integer, String>(requestIdEnabled ? Integer.parseInt(matcher.group(1)) : null, matcher.group(requestIdEnabled ? 2 : 1));
     59
     60                senderThread.setName(description + " thread " + address + " sender");
     61                receiverThread.setName(description + " thread " + address + " receiver");
     62        }
     63
     64        public synchronized boolean isConnected() {
     65                return socket != null && socket.isConnected();
     66        }
     67
     68
     69        // protected static final String ARGUMENT_PATTERN_FRAGMENT = "((?:\\S+)|(?:\\\"[^\"]*\\\"))";
     70        // protected static final Pattern REQUEST_ID_ENABLED_PATTERN = Pattern.compile("^\\s*([0-9]+)(?:\\s+" + ARGUMENT_PATTERN_FRAGMENT + ")?$");
     71        // protected static final Pattern REQUEST_ID_DISABLED_PATTERN = Pattern.compile("^\\s*" + ARGUMENT_PATTERN_FRAGMENT + "?$");
     72
     73        // // protected final Pair<String, String> breakLine(String line)
     74        // protected final Pair<Integer, String> parseRest(String rest) {
     75        //      Matcher matcher = (requestIdEnabled ? REQUEST_ID_ENABLED_PATTERN : REQUEST_ID_DISABLED_PATTERN).matcher(rest);
     76        //      if (!matcher.matches()) {
     77        //              log.fatal("unmatched first line of input: '" + rest + "'");
     78        //              return null;
     79        //      }
     80        //      return new Pair<Integer, String>(requestIdEnabled ? Integer.parseInt(matcher.group(1)) : null, matcher.group(requestIdEnabled ? 2 : 1));
     81        // }
     82
     83        protected final Pair<Integer, CharSequence> takeRequestId(CharSequence line) {
     84                return Request.takeRequestId(requestIdEnabled, line);
    7685        }
    7786
     
    8190        int iterator = 0;
    8291        int bufferStart = 0;
    83         char[] readBuffer = new char[BUFFER_LENGTH];
     92        final char[] readBuffer = new char[BUFFER_LENGTH];
    8493
    8594        protected String getLine() {
    86                 StringBuilder lineBuffer = new StringBuilder();
     95                final StringBuilder lineBuffer = new StringBuilder();
    8796                try {
    8897                        while (!Thread.interrupted()) {
     
    92101                                                continue;
    93102                                        }
    94                                         lineBuffer.append(readBuffer, bufferStart, iterator - bufferStart + 1);
     103                                        /** Do not append new line. */
     104                                        lineBuffer.append(readBuffer, bufferStart, iterator - bufferStart);
    95105                                        ++iterator;
    96106                                        bufferStart = iterator;
    97107                                        return lineBuffer.toString();
    98108                                }
    99                                 lineBuffer.append(readBuffer, bufferStart, readChars - bufferStart);
     109                                final int length = readChars - bufferStart;
     110                                if (length > 0) {
     111                                        assert bufferStart >= 0 && bufferStart < BUFFER_LENGTH;
     112                                        assert bufferStart + length <= BUFFER_LENGTH;
     113                                        lineBuffer.append(readBuffer, bufferStart, length);
     114                                }
    100115
    101116                                readChars = 0;
     
    118133        protected abstract void receiverThreadRoutine();
    119134
    120         protected void setUpThread(Thread<Connection> thread, String name) {
    121                 thread.setName("connection thread " + address + " " + name);
    122         }
    123 
    124         protected void runThreads() {
     135
     136        protected void setupStreams() {
    125137                try {
    126138                        output = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Encoding.getFramsticksCharset()), true);
    127139                        input = new BufferedReader(new InputStreamReader(socket.getInputStream(), Encoding.getFramsticksCharset()));
     140                        synchronized (this) {
     141                                this.notifyAll();
     142                        }
    128143                } catch (IOException e) {
    129                         log.error("buffer creation failure");
    130                         return;
     144                        throw new FramsticksException().msg("failed to setup streams").cause(e).arg("connection", this);
    131145                }
    132 
    133                 setUpThread(senderThread, "sender");
    134                 setUpThread(receiverThread, "receiver");
    135                 Dispatching.use(threads, this);
    136 
    137                 receiverThread.dispatch(new RunAt<Connection>() {
    138                         @Override
    139                         public void run() {
    140                                 receiverThreadRoutine();
    141                         }
    142                 });
    143 
    144146        }
    145147
     
    153155        }
    154156
    155         @Override
    156         protected void joinableInterrupt() {
    157                 protocolVersion = -1;
    158 
    159                 connected = false;
    160                 Dispatching.drop(threads, this);
    161 
    162                 // finish();
    163         }
    164157
    165158        @Override
     
    193186
    194187        @Override
     188        public String getName() {
     189                return description + " " + address;
     190        }
     191
     192        @Override
     193        protected void joinableStart() {
     194                Dispatching.use(threads, this);
     195
     196                senderThread.dispatch(new RunAt<Connection>() {
     197                        @Override
     198                        public void run() {
     199                                synchronized (Connection.this) {
     200                                        while (state.equals(JoinableState.RUNNING) && output == null) {
     201                                                Dispatching.wait(Connection.this, 500);
     202                                        }
     203                                }
     204                        }
     205                });
     206
     207                receiverThread.dispatch(new RunAt<Connection>() {
     208                        @Override
     209                        public void run() {
     210                                receiverThreadRoutine();
     211                        }
     212                });
     213        }
     214
     215        @Override
     216        protected void joinableInterrupt() {
     217                protocolVersion = -1;
     218                Dispatching.drop(threads, this);
     219                finish();
     220        }
     221
     222
     223        @Override
    195224        protected void joinableJoin() throws InterruptedException {
    196225                Dispatching.join(threads);
    197 
    198226        }
    199227
Note: See TracChangeset for help on using the changeset viewer.