Ignore:
Timestamp:
07/04/13 20:29:50 (11 years ago)
Author:
psniegowski
Message:

HIGHLIGHTS:

  • cleanup Instance management
    • extract Instance interface
    • extract Instance common algorithms to InstanceUtils?
  • fix closing issues: Ctrl+C or window close button

properly shutdown whole program

by Java Framsticks framework

  • fix parsing and printing of all request types
  • hide exception passing in special handle method of closures
    • substantially improve readability of closures
    • basically enable use of exception in asynchronous closures

(thrown exception is transported back to the caller)

  • implement call request on both sides

CHANGELOG:
Further improve calling.

Improve instance calling.

Calling is working on both sides.

Improve exception handling in testing.

Waiters do not supercede other apllication exception being thrown.

Finished parsing and printing of all request types (with tests).

Move implementation and tests of request parsing to Request.

Add tests for Requests.

Improve waits in asynchronours tests.

Extract more algorithms to InstanceUtils?.

Extract Instance.resolve to InstanceUtils?.

Improve naming.

Improve passing exception in InstanceClient?.

Hide calling of passed functor in StateCallback?.

Hide Exception passing in asynchronous closures.

Hide exception passing in Future.

Make ResponseCallback? an abstract class.

Make Future an abstract class.

Minor change.

Move getPath to Path.to()

Move bindAccess to InstanceUtils?.

Extract common things to InstanceUtils?.

Fix synchronization bug in Connection.

Move resolve to InstanceUtils?.

Allow names of Joinable to be dynamic.

Add support for set request server side.

More fixes in communication.

Fix issues with parsing in connection.

Cut new line characters when reading.

More improvements.

Migrate closures to FramsticksException?.

Several changes.

Extract resolveAndFetch to InstanceUtils? algorithms.

Test resolving and fetching.

More fixes with function signature deduction.

Do not print default values in SimpleAbstractAccess?.

Add test of FramsClass? printing.

Improve FramsticksException? messages.

Add explicit dispatcher synchronization feature.

Rework assertions in tests.

Previous solution was not generic enough.

Allow addition of joinables to collection after start.

Extract SimulatorInstance? from RemoteInstance?.

Remove PrivateJoinableCollection?.

Improve connections.

Move shutdown hook to inside the Monitor.

It should work in TestNG tests, but it seems that
hooks are not called.

In ServerTest? client connects to testing server.

Move socket initialization to receiver thread.

Add proper closing on Ctrl+C (don't use signals).

Fix bugs with server accepting connections.

Merge Entity into Joinable.

Reworking ServerInstance?.

Extract more algorithm to InstanceUtils?.

Extract some common functionality from AbstractInstance?.

Functions were placed in InstanceUtils?.

Hide registry of Instance.

Use ValueParam? in Instance interface.

Minor change.

Extract Instance interface.

Old Instance is now AbstractInstance?.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • java/main/src/main/java/com/framsticks/communication/ClientConnection.java

    r90 r96  
    1111import com.framsticks.util.dispatching.Dispatcher;
    1212import com.framsticks.util.dispatching.Dispatching;
     13import com.framsticks.util.dispatching.JoinableState;
    1314import com.framsticks.util.lang.Pair;
    1415import com.framsticks.util.lang.Strings;
     16
     17import org.apache.log4j.Level;
    1518import org.apache.log4j.Logger;
    1619
    1720import java.io.IOException;
    1821import java.net.Socket;
    19 import java.net.SocketException;
    2022import java.util.*;
    2123import java.util.regex.Matcher;
     
    3941        }
    4042
     43        public ClientConnection(String address) {
     44                super(address, "client connection");
     45                Matcher matcher = addressPattern.matcher(address);
     46                if (!matcher.matches()) {
     47                        log.fatal("invalid address: " + address);
     48                        hostName = null;
     49                        port = 0;
     50                        return;
     51                }
     52                hostName = matcher.group(1);
     53                port = matcher.group(3) != null ? Integer.parseInt(matcher.group(3)) : 9009;
     54        }
    4155
    4256        protected StateFunctor connectedFunctor;
    43 
    44         @Override
    45         protected void joinableStart() {
    46                 try {
    47                         log.debug("connecting to " + address);
    48 
    49                         socket = new Socket(hostName, port);
    50 
    51                         socket.setSoTimeout(500);
    52 
    53                         log.debug("connected to " + hostName + ":" + port);
    54                         connected = true;
    55                         runThreads();
    56 
    57                         connectedFunctor.call(null);
    58                 } catch (SocketException e) {
    59                         log.error("failed to connect: " + e);
    60                         connectedFunctor.call(e);
    61                 } catch (IOException e) {
    62                         log.error("buffer creation failure");
    63                         connectedFunctor.call(e);
    64                         // close();
    65                 }
    66         }
    67 
    6857
    6958        private static abstract class InboundMessage {
     
    7867                        currentFilePath = path;
    7968                }
     69
    8070                protected void finishCurrentFile() {
    8171                        if (currentFileContent == null) {
     
    8474                        files.add(new File(currentFilePath, new ListSource(currentFileContent)));
    8575                        currentFilePath = null;
    86                         currentFileContent= null;
     76                        currentFileContent = null;
    8777                }
    8878
    8979                public abstract void startFile(String path);
    9080
    91                 public void addLine(String line) {
     81                public final void addLine(String line) {
    9282                        assert line != null;
    9383                        assert currentFileContent != null;
    94                         currentFileContent.add(line.substring(0, line.length() - 1));
     84                        currentFileContent.add(line);
    9585                }
    9686
     
    127117                public void startFile(String path) {
    128118                        finishCurrentFile();
    129                         if (path == null) {
     119                        if (!Strings.notEmpty(path)) {
    130120                                assert request instanceof ApplicationRequest;
    131                                 path = ((ApplicationRequest)request).getPath();
    132                         }
     121                                path = ((ApplicationRequest) request).getPath();
     122                        }
     123                        Strings.assureNotEmpty(path);
    133124                        initCurrentFile(path);
    134125                }
    135126
    136127                public void eof() {
     128                        assert Strings.notEmpty(currentFilePath);
    137129                        finishCurrentFile();
    138130                        //no-operation
     
    153145                }
    154146        }
     147
    155148        private Map<Integer, SentQuery<?>> queryMap = new HashMap<>();
    156 
    157149
    158150        protected final String hostName;
     
    161153        private static Pattern addressPattern = Pattern.compile("^([^:]*)(:([0-9]+))?$");
    162154
    163         public ClientConnection(String address) {
    164                 super(address);
    165                 Matcher matcher = addressPattern.matcher(address);
    166                 if (!matcher.matches()) {
    167                         log.fatal("invalid address: " + address);
    168                         hostName = null;
    169                         port = 0;
    170                         return;
    171                 }
    172                 hostName = matcher.group(1);
    173                 port = matcher.group(3) != null ? Integer.parseInt(matcher.group(3)) : 9009;
    174         }
    175155
    176156        private SentQuery<?> currentlySentQuery;
    177 
    178157
    179158        public <C extends Connection> void send(Request request, ResponseCallback<C> callback) {
     
    184163        public <C> void send(Request request, Dispatcher<C> dispatcher, ResponseCallback<? extends C> callback) {
    185164
    186                 if (!isConnected()) {
     165                if (getState().ordinal() > JoinableState.RUNNING.ordinal()) {
    187166                        log.fatal("not connected");
    188167                        return;
    189168                }
     169
    190170                final SentQuery<C> sentQuery = new SentQuery<C>();
    191171                sentQuery.request = request;
     
    193173                sentQuery.dispatcher = dispatcher;
    194174
    195                 senderThread.dispatch(new RunAt<Connection>(){
     175                senderThread.dispatch(new RunAt<Connection>() {
    196176                        @Override
    197177                        public void run() {
     
    220200                                        message.append(" ").append(id);
    221201                                }
     202                                message.append(" ");
    222203                                sentQuery.request.construct(message);
    223204                                String out = message.toString();
    224205
    225206                                output.println(out);
     207                                output.flush();
    226208                                log.debug("sending query: " + out);
    227209
     
    234216                        notifyAll();
    235217                }
    236                 */
    237         }
    238 
     218                 */
     219        }
    239220
    240221        @Override
     
    272253        }
    273254
    274         public void sendQueryVersion(final int version, final StateFunctor stateFunctor) {
    275                 send(new VersionRequest().version(version), new StateCallback<Connection>() {
     255        public void sendQueryVersion(final int version, StateFunctor stateFunctor) {
     256                send(new VersionRequest().version(version), new StateCallback<Connection>(stateFunctor) {
    276257                        @Override
    277                         public void call(Exception e) {
    278                                 if (e != null) {
    279                                         log.fatal("failed to upgrade protocol to version: " + version);
    280                                         return;
    281                                 }
     258                        public void callImpl() {
    282259                                protocolVersion = version;
    283260                                if (version < 4) {
    284261                                        /** it is an implicit loop here*/
    285                                         sendQueryVersion(version + 1, stateFunctor);
     262                                        sendQueryVersion(version + 1, move());
    286263                                        return;
    287264                                }
    288                                 send(new UseRequest().feature("request_id"), new StateCallback<Connection>() {
     265                                send(new UseRequest().feature("request_id"), new StateCallback<Connection>(move()) {
    289266                                        @Override
    290                                         public void call(Exception e) {
    291                                                 requestIdEnabled = e == null;
    292                                                 /*
    293                                                 synchronized (ClientConnection.this) {
    294                                                         ClientConnection.this.notifyAll();
    295                                                 }
    296                                                 */
    297                                                 if (!requestIdEnabled) {
    298                                                         log.fatal("protocol negotiation failed");
    299                                                         stateFunctor.call(new Exception("protocol negotiation failed", e));
    300                                                         return;
    301                                                 }
    302                                                 stateFunctor.call(null);
     267                                        public void handle(FramsticksException exception) {
     268                                                requestIdEnabled = false;
     269                                                log.fatal("protocol negotiation failed");
     270                                                super.handle(new FramsticksException().msg("protocol negotiation failed").cause(exception));
     271                                        }
     272
     273                                        @Override
     274                                        public void callImpl() {
     275                                                requestIdEnabled = true;
    303276                                        }
    304277                                });
     
    307280                });
    308281        }
    309 
    310282
    311283        private synchronized SentQuery<?> fetchQuery(Integer id, boolean remove) {
     
    348320
    349321        protected void processEvent(String rest) {
    350                 Matcher matcher = eventPattern.matcher(rest);
     322                Matcher matcher = Request.EVENT_PATTERN.matcher(rest);
    351323                if (!matcher.matches()) {
    352324                        log.error("invalid event line: " + rest);
     
    363335        }
    364336
    365 
    366337        protected void processMessageStartingWith(String line) {
    367                 Pair<String, String> command = Strings.splitIntoPair(line, ' ', "\n");
    368                 if (command.first.equals("event")) {
    369                         processEvent(command.second);
    370                         return;
    371                 }
    372                 Pair<Integer, String> rest = parseRest(command.second);
    373 
    374                 if (command.first.equals("file")) {
    375                         SentQuery<?> sentQuery = fetchQuery(rest.first, false);
    376                         sentQuery.startFile(rest.second);
    377                         processMessage(sentQuery);
    378                         return;
    379                 }
    380 
    381                 SentQuery<?> sentQuery = fetchQuery(rest.first, true);
    382                 if (sentQuery == null) {
    383                         return;
    384                 }
    385                 log.debug("parsing response for request " + sentQuery);
    386 
    387                 sentQuery.dispatchResponseProcess(new Response(command.first.equals("ok"), rest.second, sentQuery.getFiles()));
     338                try {
     339                        Pair<CharSequence, CharSequence> command = Request.takeIdentifier(line);
     340                        if (command.first.equals("event")) {
     341                                processEvent(command.second.toString());
     342                                return;
     343                        }
     344                        Pair<Integer, CharSequence> rest = takeRequestId(command.second);
     345
     346                        if (command.first.equals("file")) {
     347                                SentQuery<?> sentQuery = fetchQuery(rest.first, false);
     348                                sentQuery.startFile(rest.second.toString());
     349                                processMessage(sentQuery);
     350                                return;
     351                        }
     352
     353                        SentQuery<?> sentQuery = fetchQuery(rest.first, true);
     354                        if (sentQuery == null) {
     355                                return;
     356                        }
     357                        log.debug("parsing response for request " + sentQuery);
     358
     359                        sentQuery.dispatchResponseProcess(new Response(command.first.equals("ok"), rest.second.toString(), sentQuery.getFiles()));
     360                } catch (FramsticksException e) {
     361                        throw new FramsticksException().msg("failed to process message").arg("starting with line", line).cause(e);
     362                }
    388363        }
    389364
    390365        @Override
    391366        protected void receiverThreadRoutine() {
    392                 while (connected) {
     367                while (isRunning() && !isConnected()) {
     368                        log.debug("connecting to " + address);
     369                        try {
     370                                socket = new Socket(hostName, port);
     371                        } catch (IOException e) {
     372                                log.info(this + " failed to connect (retrying): " + e);
     373                                Dispatching.sleep(0.5);
     374                        }
     375                }
     376
     377                log.debug(this + " connected");
     378                try {
     379                        socket.setSoTimeout(500);
     380                        setupStreams();
     381                } catch (Exception e) {
     382                        throw new FramsticksException().msg("failed to initialize socket").cause(e).arg("connection", this);
     383                }
     384
     385                connectedFunctor.call();
     386
     387                while (isRunning() && isConnected()) {
    393388                        try {
    394389                                processMessageStartingWith(getLine());
    395390                        } catch (Exception e) {
     391                                log.log(isRunning() ? Level.ERROR : Level.DEBUG, "caught exception: ", e);
    396392                                break;
    397393                        }
    398394                }
    399         }
    400 
    401 
     395                interrupt();
     396                finish();
     397        }
    402398}
Note: See TracChangeset for help on using the changeset viewer.