Ignore:
Timestamp:
06/30/13 12:48:20 (11 years ago)
Author:
psniegowski
Message:

HIGHLIGHTS:

  • loading f0 schema with XmlLoader?
  • use XmlLoader? to load configuration
  • introduce unified fork-join model of various entities

(Instances, Connections, GUI Frames, etc.),
all those entities clean up gracefully on
shutdown, which may be initialized by user
or by some entity

  • basing on above, simplify several organizing classes

(Observer, main class)

(to host native frams server process from Java level)

CHANGELOG:
Remove redundant Observer class.

Clean up in AbstractJoinable?.

Update ExternalProcess? class to changes in joining model.

Another sweep through code with FindBugs?.

Find bug with not joining RemoteInstance?.

Joining almost works.

Much improved joining model.

More improvement to joining model.

Add logging messages around joinable operations.

Rename methods in AbstractJoinable?.

Improve Joinable.

Rewrite of entity structure.

More simplifications with entities.

Further improve joinables.

Let Frame compose from JFrame instead of inheriting.

Add join classes.

Improvements of closing.

Add Builder interface.

Add FramsServerTest?.xml

FramsServer? may be configured through xml.

Make Framsticks main class an Observer of Entities.

Make Observer a generic type.

Remove variables regarding to removed endpoint.

Simplify observer (remove endpoints).

More changes to Observer and Endpoint.

Minor improvements.

Add OutputListener? to ExternalProcess?.

Improve testing of ExternalProcess?.

Add ExternalProcess? runner.

Rename the Program class to Framsticks.

Migrate Program to use XmlLoader? configuration.

First steps with configuration using XmlLoader?.

Fix several bugs.

Move all f0 classes to apriopriate package.

XmlLoader? is able to load Schema.

XmlLoader? is loading classes and props.

Add GroupBuilder?.

Location:
java/main/src/main/java/com/framsticks/communication
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • java/main/src/main/java/com/framsticks/communication/ClientConnection.java

    r85 r88  
    3232        protected final Map<String, Subscription<?>> subscriptions = new HashMap<>();
    3333
    34         public String getAddress() {
    35                 return address;
    36         }
    37 
    38         public void connect(StateFunctor connectedFunctor) {
     34        /**
     35         * @param connectedFunctor the connectedFunctor to set
     36         */
     37        public void setConnectedFunctor(StateFunctor connectedFunctor) {
     38                this.connectedFunctor = connectedFunctor;
     39        }
     40
     41
     42        protected StateFunctor connectedFunctor;
     43
     44        @Override
     45        protected void joinableStart() {
    3946                try {
    4047                        log.info("connecting to " + address);
     
    4653                        log.info("connected to " + hostName + ":" + port);
    4754                        connected = true;
    48 
    4955                        runThreads();
    5056
     
    5662                        log.error("buffer creation failure");
    5763                        connectedFunctor.call(e);
    58                         close();
    59                 }
    60         }
     64                        // close();
     65                }
     66        }
     67
    6168
    6269        private static abstract class InboundMessage {
     
    149156
    150157
    151         protected final String address;
    152158        protected final String hostName;
    153159        protected final int port;
     
    156162
    157163        public ClientConnection(String address) {
    158                 assert address != null;
    159                 this.address = address;
     164                super(address);
    160165                Matcher matcher = addressPattern.matcher(address);
    161166                if (!matcher.matches()) {
     
    235240        @Override
    236241        public String toString() {
    237                 return address;
     242                return "client connection " + address;
    238243        }
    239244
     
    328333        private int nextQueryId = 0;
    329334
    330         protected void processMessage(InboundMessage inboundMessage) throws Exception {
     335        protected void processMessage(InboundMessage inboundMessage) {
    331336                if (inboundMessage == null) {
    332337                        log.error("failed to use any inbound message");
     
    342347        }
    343348
    344         protected void processEvent(String rest) throws Exception {
     349        protected void processEvent(String rest) {
    345350                Matcher matcher = eventPattern.matcher(rest);
    346351                if (!matcher.matches()) {
     
    359364
    360365
    361         protected void processMessageStartingWith(String line) throws Exception {
     366        protected void processMessageStartingWith(String line) {
    362367                Pair<String, String> command = Strings.splitIntoPair(line, ' ', "\n");
    363368                if (command.first.equals("event")) {
     
    384389
    385390        @Override
    386         protected void receiverThreadRoutine() throws Exception {
     391        protected void receiverThreadRoutine() {
    387392                while (connected) {
    388                         processMessageStartingWith(getLine());
    389                 }
    390         }
     393                        try {
     394                                processMessageStartingWith(getLine());
     395                        } catch (Exception e) {
     396                                break;
     397                        }
     398                }
     399        }
     400
    391401
    392402}
  • java/main/src/main/java/com/framsticks/communication/Connection.java

    r85 r88  
    11package com.framsticks.communication;
    22
     3import com.framsticks.util.FramsticksException;
    34import com.framsticks.util.io.Encoding;
    45import com.framsticks.util.lang.Pair;
     
    1415import java.util.regex.Pattern;
    1516
     17import com.framsticks.util.dispatching.AbstractJoinable;
     18import com.framsticks.util.dispatching.Dispatching;
     19import com.framsticks.util.dispatching.Joinable;
     20import com.framsticks.util.dispatching.JoinableCollection;
     21import com.framsticks.util.dispatching.JoinableParent;
     22import com.framsticks.util.dispatching.JoinableState;
    1623import com.framsticks.util.dispatching.RunAt;
    1724import com.framsticks.util.dispatching.Thread;
    1825
    19 public abstract class Connection {
     26public abstract class Connection extends AbstractJoinable implements JoinableParent {
    2027
    2128        protected final static Logger log = Logger.getLogger(Connection.class);
     
    3239        protected int protocolVersion = -1;
    3340
     41        public String getAddress() {
     42                return address;
     43        }
     44        protected final String address;
     45
    3446        protected final Thread<Connection> senderThread = new Thread<>();
    3547        protected final Thread<Connection> receiverThread = new Thread<>();
    36 
     48        protected final JoinableCollection<Thread<Connection>> threads = new JoinableCollection<>(true);
     49
     50        /**
     51         *
     52         */
     53        public Connection(String address) {
     54                this.address = address;
     55                threads.add(senderThread);
     56                threads.add(receiverThread);
     57        }
    3758        public boolean isConnected() {
    3859                return connected;
    3960        }
    4061
    41         public void close() {
    42                 protocolVersion = -1;
    43                 try {
    44                         connected = false;
    45 
    46                         senderThread.interrupt();
    47                         senderThread.join();
    48 
    49                         receiverThread.interrupt();
    50                         receiverThread.join();
    51 
    52                         if (output != null) {
    53                                 output.close();
    54                                 output = null;
    55                         }
    56 
    57                         if (input != null) {
    58                                 input.close();
    59                                 input = null;
    60                         }
    61 
    62 
    63                         if (socket != null) {
    64                                 socket.close();
    65                                 socket = null;
    66                         }
    67 
    68                         log.info("connection closed");
    69                 } catch (Exception e) {
    70                         log.error(e);
    71                 }
    72 
    73         }
    7462
    7563        protected static final String ARGUMENT_PATTERN_FRAGMENT = "((?:\\S+)|(?:\"[^\"]*\"))";
     
    9583        char[] readBuffer = new char[BUFFER_LENGTH];
    9684
    97         protected String getLine() throws Exception {
     85        protected String getLine() {
    9886                StringBuilder lineBuffer = new StringBuilder();
    99                 while (!Thread.interrupted()) {
    100                         while (iterator < readChars) {
    101                                 if (readBuffer[iterator] != '\n') {
     87                try {
     88                        while (!Thread.interrupted()) {
     89                                while (iterator < readChars) {
     90                                        if (readBuffer[iterator] != '\n') {
     91                                                ++iterator;
     92                                                continue;
     93                                        }
     94                                        lineBuffer.append(readBuffer, bufferStart, iterator - bufferStart + 1);
    10295                                        ++iterator;
    103                                         continue;
     96                                        bufferStart = iterator;
     97                                        return lineBuffer.toString();
    10498                                }
    105                                 lineBuffer.append(readBuffer, bufferStart, iterator - bufferStart + 1);
    106                                 ++iterator;
    107                                 bufferStart = iterator;
    108                                 return lineBuffer.toString();
    109                         }
    110                         lineBuffer.append(readBuffer, bufferStart, readChars - bufferStart);
    111 
    112                         readChars = 0;
    113                         while (readChars == 0) {
    114                                 try {
    115                                         readChars = input.read(readBuffer);
    116                                 } catch (SocketTimeoutException ignored) {
    117                                         //timeout - continue
     99                                lineBuffer.append(readBuffer, bufferStart, readChars - bufferStart);
     100
     101                                readChars = 0;
     102                                while (readChars == 0) {
     103                                        try {
     104                                                readChars = input.read(readBuffer);
     105                                        } catch (SocketTimeoutException ignored) {
     106                                                //timeout - continue
     107                                        }
    118108                                }
    119                         }
    120                         iterator = 0;
    121                         bufferStart = 0;
    122                 }
    123                 throw new InterruptedException();
    124         }
    125 
    126         protected abstract void receiverThreadRoutine() throws Exception;
     109                                iterator = 0;
     110                                bufferStart = 0;
     111                        }
     112                        throw new InterruptedException();
     113                } catch (Exception e) {
     114                        throw new FramsticksException().msg("failed to read line").cause(e);
     115                }
     116        }
     117
     118        protected abstract void receiverThreadRoutine();
     119
     120        protected void setUpThread(Thread<Connection> thread, String name) {
     121                thread.setName("connection thread " + address + " " + name);
     122        }
    127123
    128124        protected void runThreads() {
     
    132128                } catch (IOException e) {
    133129                        log.error("buffer creation failure");
    134                         close();
    135130                        return;
    136131                }
    137132
    138                 senderThread.setName(this + "-sender");
    139                 receiverThread.setName(this + "-receiver");
    140 
    141                 senderThread.start();
    142                 receiverThread.start();
     133                setUpThread(senderThread, "sender");
     134                setUpThread(receiverThread, "receiver");
     135                Dispatching.use(threads, this);
    143136
    144137                receiverThread.invokeLater(new RunAt<Connection>() {
    145138                        @Override
    146139                        public void run() {
    147                                 try {
    148                                         receiverThreadRoutine();
    149                                 } catch (InterruptedException ignored) {
    150                                         log.debug("receiver thread interrupted");
    151                                 } catch (Exception e) {
    152                                         log.error("error: " + e);
    153                                         close();
    154                                 }
     140                                receiverThreadRoutine();
    155141                        }
    156142                });
    157143
    158144        }
    159 
    160145
    161146        /**
     
    164149         * @return Query associated with query getId.
    165150         */
    166 
    167151        public int getProtocolVersion() {
    168152                return protocolVersion;
    169153        }
    170154
    171 
     155        @Override
     156        protected void joinableInterrupt() {
     157                protocolVersion = -1;
     158
     159                connected = false;
     160                Dispatching.drop(threads, this);
     161
     162                // finish();
     163        }
     164
     165        @Override
     166        protected void joinableFinish() {
     167                try {
     168                        if (output != null) {
     169                                output.close();
     170                                output = null;
     171                        }
     172
     173                        if (input != null) {
     174                                input.close();
     175                                input = null;
     176                        }
     177
     178
     179                        if (socket != null) {
     180                                socket.close();
     181                                socket = null;
     182                        }
     183                } catch (Exception e) {
     184                        log.error("failed to stop connection: ", e);
     185                }
     186                log.debug("connection closed");
     187        }
     188
     189        @Override
     190        public void childChangedState(Joinable joinable, JoinableState state) {
     191                proceedToState(state);
     192        }
     193
     194        @Override
     195        protected void joinableJoin() throws InterruptedException {
     196                Dispatching.join(threads);
     197
     198        }
    172199
    173200}
  • java/main/src/main/java/com/framsticks/communication/ServerConnection.java

    r85 r88  
    2020
    2121        public ServerConnection(Socket socket, RequestHandler requestHandler) {
     22                super("todo");
    2223                this.socket = socket;
    2324                this.requestHandler = requestHandler;
    2425                connected = true;
    2526
    26         }
    27 
    28         public void start() {
    29                 runThreads();
    3027        }
    3128
     
    3633
    3734        @Override
    38         protected void receiverThreadRoutine() throws Exception {
     35        protected void receiverThreadRoutine() {
    3936                while (connected) {
    4037                        processNextRequest();
     
    9895        }
    9996
    100         protected void processNextRequest() throws Exception {
     97        protected void processNextRequest() {
    10198                String line = getLine();
    10299                Pair<String, String> command = Strings.splitIntoPair(line, ' ', "\n");
     
    124121                });
    125122        }
     123
     124        @Override
     125        protected void joinableStart() {
     126                // TODO Auto-generated method stub
     127
     128        }
    126129}
Note: See TracChangeset for help on using the changeset viewer.