Ignore:
Timestamp:
07/04/13 20:29:50 (11 years ago)
Author:
psniegowski
Message:

HIGHLIGHTS:

  • cleanup Instance management
    • extract Instance interface
    • extract Instance common algorithms to InstanceUtils?
  • fix closing issues: Ctrl+C or window close button

properly shutdown whole program

by Java Framsticks framework

  • fix parsing and printing of all request types
  • hide exception passing in special handle method of closures
    • substantially improve readability of closures
    • basically enable use of exception in asynchronous closures

(thrown exception is transported back to the caller)

  • implement call request on both sides

CHANGELOG:
Further improve calling.

Improve instance calling.

Calling is working on both sides.

Improve exception handling in testing.

Waiters do not supercede other apllication exception being thrown.

Finished parsing and printing of all request types (with tests).

Move implementation and tests of request parsing to Request.

Add tests for Requests.

Improve waits in asynchronours tests.

Extract more algorithms to InstanceUtils?.

Extract Instance.resolve to InstanceUtils?.

Improve naming.

Improve passing exception in InstanceClient?.

Hide calling of passed functor in StateCallback?.

Hide Exception passing in asynchronous closures.

Hide exception passing in Future.

Make ResponseCallback? an abstract class.

Make Future an abstract class.

Minor change.

Move getPath to Path.to()

Move bindAccess to InstanceUtils?.

Extract common things to InstanceUtils?.

Fix synchronization bug in Connection.

Move resolve to InstanceUtils?.

Allow names of Joinable to be dynamic.

Add support for set request server side.

More fixes in communication.

Fix issues with parsing in connection.

Cut new line characters when reading.

More improvements.

Migrate closures to FramsticksException?.

Several changes.

Extract resolveAndFetch to InstanceUtils? algorithms.

Test resolving and fetching.

More fixes with function signature deduction.

Do not print default values in SimpleAbstractAccess?.

Add test of FramsClass? printing.

Improve FramsticksException? messages.

Add explicit dispatcher synchronization feature.

Rework assertions in tests.

Previous solution was not generic enough.

Allow addition of joinables to collection after start.

Extract SimulatorInstance? from RemoteInstance?.

Remove PrivateJoinableCollection?.

Improve connections.

Move shutdown hook to inside the Monitor.

It should work in TestNG tests, but it seems that
hooks are not called.

In ServerTest? client connects to testing server.

Move socket initialization to receiver thread.

Add proper closing on Ctrl+C (don't use signals).

Fix bugs with server accepting connections.

Merge Entity into Joinable.

Reworking ServerInstance?.

Extract more algorithm to InstanceUtils?.

Extract some common functionality from AbstractInstance?.

Functions were placed in InstanceUtils?.

Hide registry of Instance.

Use ValueParam? in Instance interface.

Minor change.

Extract Instance interface.

Old Instance is now AbstractInstance?.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • java/main/src/main/java/com/framsticks/remote/RemoteInstance.java

    r90 r96  
    22
    33import com.framsticks.communication.*;
     4import com.framsticks.communication.queries.CallRequest;
    45import com.framsticks.communication.queries.GetRequest;
    56import com.framsticks.communication.queries.InfoRequest;
    67import com.framsticks.communication.queries.SetRequest;
    7 import com.framsticks.communication.util.LoggingSubscriptionCallback;
     8import com.framsticks.core.AbstractInstance;
     9import com.framsticks.core.InstanceUtils;
    810import com.framsticks.core.ListChange;
     11import com.framsticks.core.Node;
    912import com.framsticks.core.Path;
    1013import com.framsticks.params.*;
     
    1720import com.framsticks.util.*;
    1821import com.framsticks.util.dispatching.Dispatching;
     22import com.framsticks.util.dispatching.ExceptionResultHandler;
    1923import com.framsticks.util.dispatching.Future;
    2024import com.framsticks.util.dispatching.Joinable;
     
    2428import com.framsticks.util.lang.Pair;
    2529import com.framsticks.util.dispatching.RunAt;
     30import static com.framsticks.core.InstanceUtils.*;
    2631
    2732import java.util.*;
     33
     34import javax.annotation.Nonnull;
    2835
    2936import org.apache.log4j.Logger;
     
    3340 */
    3441@FramsClassAnnotation
    35 public class RemoteInstance extends Instance implements JoinableParent {
     42public class RemoteInstance extends AbstractInstance implements JoinableParent {
    3643
    3744        private final static Logger log = Logger.getLogger(RemoteInstance.class.getName());
    3845
    39         protected Path simulator;
    4046        protected ClientConnection connection;
    4147
     
    6470        }
    6571
     72        protected void onProtocolVersionNegotiated() {
     73        }
     74
     75
    6676        public void setConnection(final ClientConnection connection) {
    6777                this.connection = connection;
    68                 this.connection.setConnectedFunctor(new StateFunctor() {
    69                         @Override
    70                         public void call(Exception e) {
    71                                 if (e != null) {
    72                                         fireRun(e);
    73                                         return;
    74                                 }
    75                                 connection.negotiateProtocolVersion(new StateFunctor() {
     78                final ExceptionResultHandler failure = new ExceptionResultHandler() {
     79                        @Override
     80                        public void handle(FramsticksException exception) {
     81                                log.fatal("failed to establish connection: ", exception);
     82                                // log.fatal("unsupported protocol version!\n minimal version is: " + "\nmanager protocol is: " + connection.getProtocolVersion());
     83                                Dispatching.drop(connection, RemoteInstance.this);
     84                                fireRun(exception);
     85                        }
     86                };
     87
     88                this.connection.setConnectedFunctor(new AbstractStateFunctor(failure) {
     89                        @Override
     90                        public void call() {
     91                                connection.negotiateProtocolVersion(new AbstractStateFunctor(failure) {
    7692                                        @Override
    77                                         public void call(Exception e) {
    78                                                 if (e != null) {
    79                                                         log.fatal("unsupported protocol version!\n minimal version is: " + "\nmanager protocol is: " + connection.getProtocolVersion());
    80                                                         Dispatching.drop(connection, RemoteInstance.this);
    81                                                         fireRun(e);
    82                                                         return;
    83                                                 }
    84 
    85                                                 dispatch(new RunAt<Instance>() {
    86                                                         @Override
    87                                                         public void run() {
    88                                                                 resolveAndFetch("/simulator", new Future<Path>() {
    89                                                                         @Override
    90                                                                         public void result(Path path, Exception e) {
    91                                                                                 if (e != null) {
    92                                                                                         log.fatal("failed to resolve simulator node");
    93                                                                                         fireRun(e);
    94                                                                                         return;
    95                                                                                 }
    96                                                                                 assert isActive();
    97                                                                                 simulator = path;
    98                                                                                 fireRun(null);
    99                                                                                 log.info("resolved simulator node");
    100 
    101                                                                                 EventParam param = getParam(simulator, "running_changed", EventParam.class);
    102                                                                                 assert param != null;
    103                                                                                 connection.subscribe(simulator.getTextual() + "/" + param.getId(), RemoteInstance.this, new LoggingSubscriptionCallback<Instance>(log, "server running state change", new EventCallback() {
    104                                                                                         @Override
    105                                                                                         public void call(List<File> files) {
    106                                                                                                 dispatch(new RunAt<Instance>() {
    107                                                                                                         @Override
    108                                                                                                         public void run() {
    109                                                                                                                 updateSimulationRunning();
    110                                                                                                         }
    111                                                                                                 });
    112                                                                                         }
    113                                                                                 }));
    114                                                                                 new PeriodicTask<Instance>(RemoteInstance.this, 1000) {
    115                                                                                         @Override
    116                                                                                         public void run() {
    117                                                                                                 updateSimulationRunning();
    118                                                                                                 again();
    119                                                                                         }
    120                                                                                 };
    121                                                                         }
    122                                                                 });
    123                                                         }
    124                                                 });
     93                                        public void call() {
     94                                                onProtocolVersionNegotiated();
    12595                                        }
    12696                                });
    12797                        }
    12898                });
    129 
    13099        }
    131100
     
    136105        }
    137106
    138         public void setRunning(final boolean running) {
    139                 assert isActive();
    140                 //simulator.call(simulator.getParam(running ? "start" : "stop", ProcedureParam.class), new LoggingStateCallback(log, (running ? "starting" : "stopping") + " server"));
    141         }
    142 
    143         protected final UnaryListenersSet<Boolean> simulationRunningListeners = new UnaryListenersSet<Boolean>();
    144 
    145         protected void updateSimulationRunning() {
    146                 assert isActive();
    147                 /*
    148                 fetchValue(simulator, getParam(simulator, "running", Param.class), new StateFunctor() {
    149                         @Override
    150                         public void call(Exception e) {
    151                                 if (e != null) {
    152                                         log.fatal("failed to query simulator running status: " + e);
    153                                         return;
    154                                 }
    155 
    156                                 invokeLater(new Runnable() {
    157                                         @Override
    158                                         public void run() {
    159                                                 boolean value = bindAccess(simulator).get("running", Boolean.class);
    160                                                 log.trace("server running: " + value);
    161                                                 simulationRunningListeners.call(value);
    162                                         }
    163                                 });
    164 
    165                         }
    166                 });
    167                  */
    168         }
    169 
    170         public void addRunningStateListener(UnaryFunctor<Boolean, Boolean> listener) {
    171                 assert isActive();
    172                 simulationRunningListeners.add(listener);
    173         }
    174 
    175         // public void disconnect() {
    176         //      assert isActive();
    177         //      if (connection.isConnected()) {
    178         //              Dispatching.stop(connection, this);
    179         //      }
    180         // }
    181 
    182107        public final ClientConnection getConnection() {
    183108                return connection;
     
    185110
    186111        @Override
    187         public void fetchValue(final Path path, final Param param, final StateFunctor stateFunctor) {
     112        public void fetchValue(final Path path, final ValueParam param, final StateFunctor stateFunctor) {
    188113                assert isActive();
    189114                assert param != null;
     
    194119                                assert isActive();
    195120                                if (!response.getOk()) {
    196                                         stateFunctor.call(new Exception(response.getComment()));
     121                                        stateFunctor.handle(new FramsticksException().msg("failed to fetch value").arg("comment", response.getComment()));
    197122                                        return;
    198123                                }
    199124                                try {
    200                                         processFetchedValues(path, response.getFiles());
    201                                         stateFunctor.call(null);
    202                                 } catch (Exception ex) {
    203                                         stateFunctor.call(ex);
     125                                        InstanceUtils.processFetchedValues(path, response.getFiles());
     126                                        stateFunctor.call();
     127                                } catch (FramsticksException ex) {
     128                                        stateFunctor.handle(ex);
    204129                                }
    205130                        }
     
    209134        protected final Map<String, Set<Future<FramsClass>>> infoRequests = new HashMap<String, Set<Future<FramsClass>>>();
    210135
    211         protected void finishInfoRequest(String id, FramsClass result, Exception e) {
     136        protected void finishInfoRequest(String id, FramsClass result, FramsticksException e) {
    212137                assert isActive();
    213138                Set<Future<FramsClass>> futures = infoRequests.get(id);
    214139                infoRequests.remove(id);
    215140                for (Future<FramsClass> f : futures) {
    216                         f.result(result, e);
    217                 }
    218         }
    219 
    220         @Override
    221         protected void fetchInfo(final Path path, final Future<FramsClass> future) {
     141                        Future.passOrHandle(f, result, e);
     142                }
     143        }
     144
     145        @Override
     146        public void fetchInfo(final Path path, final Future<FramsClass> future) {
    222147
    223148                final String name = path.getTop().getParam().getContainedTypeName();
     
    238163                        public void process(Response response) {
    239164                                assert isActive();
    240                                 if (!response.getOk()) {
    241                                         finishInfoRequest(name, null, new Exception(response.getComment()));
    242                                         return;
    243                                 }
    244 
    245                                 assert response.getFiles().size() == 1;
    246                                 assert path.isTheSame(response.getFiles().get(0).getPath());
    247                                 FramsClass framsClass;
    248165                                try {
    249                                         framsClass = processFetchedInfo(response.getFiles().get(0));
    250                                 } catch (ConstructionException e) {
    251                                         log.fatal("could not read class info");
    252                                         finishInfoRequest(name, null, new Exception("could not read class info"));
    253                                         return;
    254                                 }
    255 
    256                                 CompositeParam thisParam = path.getTop().getParam();
    257                                 if (!thisParam.isMatchingContainedName(framsClass.getId())) {
    258                                         String mismatch = "class name mismatch: param=" + thisParam.getContainedTypeName() + " differs from fetched=" + framsClass.getId();
    259                                         log.error(mismatch);
    260                                         finishInfoRequest(name, null, new Exception(mismatch));
    261                                         return;
    262                                 }
    263                                 finishInfoRequest(name, framsClass, null);
     166                                        if (!response.getOk()) {
     167                                                throw new FramsticksException().msg("invalid response").arg("comment", response.getComment());
     168                                        }
     169                                        if (response.getFiles().size() != 1) {
     170                                                throw new FramsticksException().msg("invalid number of files in response").arg("files", response.getFiles().size());
     171                                        }
     172                                        if (!path.isTheSame(response.getFiles().get(0).getPath())) {
     173                                                throw new FramsticksException().msg("path mismatch").arg("returned path", response.getFiles().get(0).getPath());
     174                                        }
     175                                        FramsClass framsClass = InstanceUtils.processFetchedInfo(RemoteInstance.this, response.getFiles().get(0));
     176
     177                                        CompositeParam thisParam = path.getTop().getParam();
     178                                        if (!thisParam.isMatchingContainedName(framsClass.getId())) {
     179                                                throw new FramsticksException().msg("framsclass id mismatch").arg("request", thisParam.getContainedTypeName()).arg("fetched", framsClass.getId());
     180                                        }
     181
     182                                        finishInfoRequest(name, framsClass, null);
     183                                } catch (FramsticksException e) {
     184                                        finishInfoRequest(name, null, e.arg("path", path));
     185                                }
    264186                        }
    265187                });
     
    276198                        public void process(Response response) {
    277199                                assert isActive();
    278                                 if (!response.getOk()) {
    279                                         stateFunctor.call(new Exception(response.getComment()));
    280                                         return;
    281                                 }
    282200                                try {
    283                                         processFetchedValues(path, response.getFiles());
    284                                         stateFunctor.call(null);
    285                                 } catch (Exception ex) {
    286                                         log.error("an exception occurred while loading: " + ex);
    287                                         ex.printStackTrace();
    288                                         stateFunctor.call(ex);
     201                                        if (!response.getOk()) {
     202                                                throw new FramsticksException().msg("failed to fetch values").arg("comment", response.getComment());
     203                                        }
     204                                        InstanceUtils.processFetchedValues(path, response.getFiles());
     205                                        stateFunctor.call();
     206                                } catch (FramsticksException e) {
     207                                        stateFunctor.handle(e);
    289208                                }
    290209                        }
     
    294213        @Override
    295214        public void resolve(final Path path, final Future<Path> future) {
    296                 assert isActive();
    297                 if (path.getTop().getObject() != null) {
    298                         if (getInfoFromCache(path) != null) {
    299                                 future.result(path, null);
    300                                 return;
    301                         }
    302                         findInfo(path, new Future<FramsClass>() {
    303                                 @Override
    304                                 public void result(FramsClass result, Exception e) {
    305                                         if (e != null) {
    306                                                 future.result(null, e);
    307                                                 return;
    308                                         }
    309                                         future.result(path, null);
    310                                 }
    311                         });
    312                         return;
    313                 }
    314                 findInfo(path, new Future<FramsClass>() {
    315                         @Override
    316                         public void result(FramsClass result, Exception e) {
    317                                 assert isActive();
    318                                 if (e != null) {
    319                                         future.result(null, e);
    320                                         return;
    321                                 }
    322                                 assert path.getTop().getParam().isMatchingContainedName(result.getId());
    323                                 Path p = (path.getTop().getParam().getContainedTypeName() != null ? path : path.tryFindResolution());
    324                                 future.result(createIfNeeded(p), null);
    325                         }
    326                 });
     215                InstanceUtils.resolve(path, future);
    327216        }
    328217
     
    330219        protected void tryRegisterOnChangeEvents(final Path path) {
    331220                assert isActive();
    332                 AccessInterface access = bindAccess(path);
     221                AccessInterface access = InstanceUtils.bindAccess(path);
    333222                if (!(access instanceof ListAccess)) {
    334223                        return;
     
    396285        }
    397286
     287        protected Future<Path> futureListChanger(final ListChange listChange, final String path) {
     288                return new Future<Path>(Logging.logger(log, "failed to " + listChange, path)) {
     289                        @Override
     290                        protected void result(Path result) {
     291                                log.debug(listChange + ": " + result);
     292                                fireListChange(result, listChange);
     293                        }
     294                };
     295        }
     296
    398297        protected void reactToChange(final Path path, final ListChange listChange) {
    399298                assert isActive();
    400299                log.debug("reacting to change " + listChange + " in " + path);
    401                 AccessInterface access = bindAccess(path);
     300                AccessInterface access = InstanceUtils.bindAccess(path);
    402301                assert access != null;
    403302
    404303                if ((listChange.getAction() == ListChange.Action.Modify) && (listChange.getPosition() == -1)) {
    405304                        final String p = path.getTextual();
    406                         resolveAndFetch(p, new Future<Path>() {
    407                                 @Override
    408                                 public void result(Path result, Exception e) {
    409                                         if (e != null) {
    410                                                 log.error("failed to modify " + p + ": " + e);
    411                                                 return;
    412                                         }
    413                                         fireListChange(path, listChange);
    414                                 }
    415                         });
     305                        InstanceUtils.resolveAndFetch(this, p, futureListChanger(listChange, p));
    416306                        return;
    417307                }
     
    420310                assert childParam != null;
    421311                switch (listChange.getAction()) {
    422                 case Add: {
    423                         final String p = path.getTextual() + "/" + childParam.getId();
    424                         resolveAndFetch(p, new Future<Path>() {
    425                                 @Override
    426                                 public void result(Path result, Exception e) {
    427                                         if (e != null) {
    428                                                 log.error("failed to add " + p + ": " + e);
    429                                                 return;
    430                                         }
    431                                         log.debug("added: " + result);
    432                                         fireListChange(path, listChange);
    433                                 }
    434                         });
    435                         break;
    436                 }
    437                 case Remove: {
    438                         access.set(childParam, null);
    439                         fireListChange(path, listChange);
    440                         break;
    441                 }
    442                 case Modify: {
    443                         final String p = path.getTextual() + "/" + childParam.getId();
    444                         resolveAndFetch(p, new Future<Path>() {
    445                                 @Override
    446                                 public void result(Path result, Exception e) {
    447                                         if (e != null) {
    448                                                 log.error("failed to modify " + p + ": " + e);
    449                                                 return;
    450                                         }
    451                                         fireListChange(path, listChange);
    452                                 }
    453                         });
    454                         break;
    455                 }
    456                 }
    457         }
    458 
    459         //TODO ValueParam
    460         @Override
    461         public void storeValue(final Path path, final Param param, final Object value, final StateFunctor stateFunctor) {
     312                        case Add: {
     313                                final String p = path.getTextual() + "/" + childParam.getId();
     314                                InstanceUtils.resolveAndFetch(this, p, futureListChanger(listChange, p));
     315                                break;
     316                        }
     317                        case Remove: {
     318                                access.set(childParam, null);
     319                                fireListChange(path, listChange);
     320                                break;
     321                        }
     322                        case Modify: {
     323                                final String p = path.getTextual() + "/" + childParam.getId();
     324                                InstanceUtils.resolveAndFetch(this, p, futureListChanger(listChange, p));
     325                                break;
     326                        }
     327                }
     328        }
     329
     330        @Override
     331        public void storeValue(final Path path, final ValueParam param, final Object value, StateFunctor stateFunctor) {
    462332                assert isActive();
    463333
    464334                log.trace("storing value " + param + " for " + path);
    465                 connection.send(new SetRequest().value(value.toString()).field(param.getId()).path(path.getTextual()), this, new StateCallback<Instance>() {
    466                         @Override
    467                         public void call(Exception e) {
    468                                 if (e == null) {
    469                                         bindAccess(path).set((ValueParam) param, value);
    470                                 }
    471                                 stateFunctor.call(e);
     335                connection.send(new SetRequest().value(value.toString()).field(param.getId()).path(path.getTextual()), this, new StateCallback<Instance>(stateFunctor) {
     336                        @Override
     337                        protected void callImpl() {
     338                                InstanceUtils.bindAccess(path).set((ValueParam) param, value);
    472339                        }
    473340                });
     
    504371
    505372        @Override
    506         public void call(Path path, ProcedureParam param, Object[] arguments, StateFunctor stateFunctor) {
    507                 throw new UnimplementedException();
     373        public void call(@Nonnull final Path path, @Nonnull final ProcedureParam procedure, @Nonnull Object[] arguments, final Future<Object> future) {
     374                assert isActive();
     375                assert path.isResolved();
     376
     377                //TODO validate arguments type using params
     378                connection.send(new CallRequest().procedure(procedure.getId()).arguments(Arrays.asList(arguments)).path(path.getTextual()), this, new ResponseCallback<Instance>() {
     379                        @Override
     380                        public void process(Response response) {
     381                                assert isActive();
     382                                try {
     383                                        if (!response.getOk()) {
     384                                                throw new FramsticksException().msg("failed to call procedure").arg("procedure", procedure).arg("comment", response.getComment());
     385                                        }
     386                                        // InstanceUtils.processFetchedValues(path, response.getFiles());
     387                                        future.pass(null);
     388                                } catch (FramsticksException ex) {
     389                                        future.handle(ex);
     390                                }
     391                        }
     392                });
     393
     394        }
     395
     396        @Override
     397        public Path create(Path path) {
     398                assert isActive();
     399                assert !path.isResolved();
     400                Path resolved = path.tryFindResolution();
     401                if (!resolved.isResolved()) {
     402                        log.debug("creating: " + path);
     403                        AccessInterface access = registry.prepareAccess(path.getTop().getParam());
     404                        assert access != null;
     405                        Object child = access.createAccessee();
     406                        assert child != null;
     407                        if (path.size() == 1) {
     408                                setRoot(new Node(getRoot().getParam(), child));
     409                        } else {
     410                                bindAccess(this, path.getUnder()).set(path.getTop().getParam(), child);
     411                        }
     412                        resolved = path.appendResolution(child);
     413                }
     414                tryRegisterOnChangeEvents(resolved);
     415                return resolved;
    508416        }
    509417
Note: See TracChangeset for help on using the changeset viewer.