Ignore:
Timestamp:
06/30/13 12:48:20 (11 years ago)
Author:
psniegowski
Message:

HIGHLIGHTS:

  • loading f0 schema with XmlLoader?
  • use XmlLoader? to load configuration
  • introduce unified fork-join model of various entities

(Instances, Connections, GUI Frames, etc.),
all those entities clean up gracefully on
shutdown, which may be initialized by user
or by some entity

  • basing on above, simplify several organizing classes

(Observer, main class)

(to host native frams server process from Java level)

CHANGELOG:
Remove redundant Observer class.

Clean up in AbstractJoinable?.

Update ExternalProcess? class to changes in joining model.

Another sweep through code with FindBugs?.

Find bug with not joining RemoteInstance?.

Joining almost works.

Much improved joining model.

More improvement to joining model.

Add logging messages around joinable operations.

Rename methods in AbstractJoinable?.

Improve Joinable.

Rewrite of entity structure.

More simplifications with entities.

Further improve joinables.

Let Frame compose from JFrame instead of inheriting.

Add join classes.

Improvements of closing.

Add Builder interface.

Add FramsServerTest?.xml

FramsServer? may be configured through xml.

Make Framsticks main class an Observer of Entities.

Make Observer a generic type.

Remove variables regarding to removed endpoint.

Simplify observer (remove endpoints).

More changes to Observer and Endpoint.

Minor improvements.

Add OutputListener? to ExternalProcess?.

Improve testing of ExternalProcess?.

Add ExternalProcess? runner.

Rename the Program class to Framsticks.

Migrate Program to use XmlLoader? configuration.

First steps with configuration using XmlLoader?.

Fix several bugs.

Move all f0 classes to apriopriate package.

XmlLoader? is able to load Schema.

XmlLoader? is loading classes and props.

Add GroupBuilder?.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • java/main/src/main/java/com/framsticks/communication/Connection.java

    r85 r88  
    11package com.framsticks.communication;
    22
     3import com.framsticks.util.FramsticksException;
    34import com.framsticks.util.io.Encoding;
    45import com.framsticks.util.lang.Pair;
     
    1415import java.util.regex.Pattern;
    1516
     17import com.framsticks.util.dispatching.AbstractJoinable;
     18import com.framsticks.util.dispatching.Dispatching;
     19import com.framsticks.util.dispatching.Joinable;
     20import com.framsticks.util.dispatching.JoinableCollection;
     21import com.framsticks.util.dispatching.JoinableParent;
     22import com.framsticks.util.dispatching.JoinableState;
    1623import com.framsticks.util.dispatching.RunAt;
    1724import com.framsticks.util.dispatching.Thread;
    1825
    19 public abstract class Connection {
     26public abstract class Connection extends AbstractJoinable implements JoinableParent {
    2027
    2128        protected final static Logger log = Logger.getLogger(Connection.class);
     
    3239        protected int protocolVersion = -1;
    3340
     41        public String getAddress() {
     42                return address;
     43        }
     44        protected final String address;
     45
    3446        protected final Thread<Connection> senderThread = new Thread<>();
    3547        protected final Thread<Connection> receiverThread = new Thread<>();
    36 
     48        protected final JoinableCollection<Thread<Connection>> threads = new JoinableCollection<>(true);
     49
     50        /**
     51         *
     52         */
     53        public Connection(String address) {
     54                this.address = address;
     55                threads.add(senderThread);
     56                threads.add(receiverThread);
     57        }
    3758        public boolean isConnected() {
    3859                return connected;
    3960        }
    4061
    41         public void close() {
    42                 protocolVersion = -1;
    43                 try {
    44                         connected = false;
    45 
    46                         senderThread.interrupt();
    47                         senderThread.join();
    48 
    49                         receiverThread.interrupt();
    50                         receiverThread.join();
    51 
    52                         if (output != null) {
    53                                 output.close();
    54                                 output = null;
    55                         }
    56 
    57                         if (input != null) {
    58                                 input.close();
    59                                 input = null;
    60                         }
    61 
    62 
    63                         if (socket != null) {
    64                                 socket.close();
    65                                 socket = null;
    66                         }
    67 
    68                         log.info("connection closed");
    69                 } catch (Exception e) {
    70                         log.error(e);
    71                 }
    72 
    73         }
    7462
    7563        protected static final String ARGUMENT_PATTERN_FRAGMENT = "((?:\\S+)|(?:\"[^\"]*\"))";
     
    9583        char[] readBuffer = new char[BUFFER_LENGTH];
    9684
    97         protected String getLine() throws Exception {
     85        protected String getLine() {
    9886                StringBuilder lineBuffer = new StringBuilder();
    99                 while (!Thread.interrupted()) {
    100                         while (iterator < readChars) {
    101                                 if (readBuffer[iterator] != '\n') {
     87                try {
     88                        while (!Thread.interrupted()) {
     89                                while (iterator < readChars) {
     90                                        if (readBuffer[iterator] != '\n') {
     91                                                ++iterator;
     92                                                continue;
     93                                        }
     94                                        lineBuffer.append(readBuffer, bufferStart, iterator - bufferStart + 1);
    10295                                        ++iterator;
    103                                         continue;
     96                                        bufferStart = iterator;
     97                                        return lineBuffer.toString();
    10498                                }
    105                                 lineBuffer.append(readBuffer, bufferStart, iterator - bufferStart + 1);
    106                                 ++iterator;
    107                                 bufferStart = iterator;
    108                                 return lineBuffer.toString();
    109                         }
    110                         lineBuffer.append(readBuffer, bufferStart, readChars - bufferStart);
    111 
    112                         readChars = 0;
    113                         while (readChars == 0) {
    114                                 try {
    115                                         readChars = input.read(readBuffer);
    116                                 } catch (SocketTimeoutException ignored) {
    117                                         //timeout - continue
     99                                lineBuffer.append(readBuffer, bufferStart, readChars - bufferStart);
     100
     101                                readChars = 0;
     102                                while (readChars == 0) {
     103                                        try {
     104                                                readChars = input.read(readBuffer);
     105                                        } catch (SocketTimeoutException ignored) {
     106                                                //timeout - continue
     107                                        }
    118108                                }
    119                         }
    120                         iterator = 0;
    121                         bufferStart = 0;
    122                 }
    123                 throw new InterruptedException();
    124         }
    125 
    126         protected abstract void receiverThreadRoutine() throws Exception;
     109                                iterator = 0;
     110                                bufferStart = 0;
     111                        }
     112                        throw new InterruptedException();
     113                } catch (Exception e) {
     114                        throw new FramsticksException().msg("failed to read line").cause(e);
     115                }
     116        }
     117
     118        protected abstract void receiverThreadRoutine();
     119
     120        protected void setUpThread(Thread<Connection> thread, String name) {
     121                thread.setName("connection thread " + address + " " + name);
     122        }
    127123
    128124        protected void runThreads() {
     
    132128                } catch (IOException e) {
    133129                        log.error("buffer creation failure");
    134                         close();
    135130                        return;
    136131                }
    137132
    138                 senderThread.setName(this + "-sender");
    139                 receiverThread.setName(this + "-receiver");
    140 
    141                 senderThread.start();
    142                 receiverThread.start();
     133                setUpThread(senderThread, "sender");
     134                setUpThread(receiverThread, "receiver");
     135                Dispatching.use(threads, this);
    143136
    144137                receiverThread.invokeLater(new RunAt<Connection>() {
    145138                        @Override
    146139                        public void run() {
    147                                 try {
    148                                         receiverThreadRoutine();
    149                                 } catch (InterruptedException ignored) {
    150                                         log.debug("receiver thread interrupted");
    151                                 } catch (Exception e) {
    152                                         log.error("error: " + e);
    153                                         close();
    154                                 }
     140                                receiverThreadRoutine();
    155141                        }
    156142                });
    157143
    158144        }
    159 
    160145
    161146        /**
     
    164149         * @return Query associated with query getId.
    165150         */
    166 
    167151        public int getProtocolVersion() {
    168152                return protocolVersion;
    169153        }
    170154
    171 
     155        @Override
     156        protected void joinableInterrupt() {
     157                protocolVersion = -1;
     158
     159                connected = false;
     160                Dispatching.drop(threads, this);
     161
     162                // finish();
     163        }
     164
     165        @Override
     166        protected void joinableFinish() {
     167                try {
     168                        if (output != null) {
     169                                output.close();
     170                                output = null;
     171                        }
     172
     173                        if (input != null) {
     174                                input.close();
     175                                input = null;
     176                        }
     177
     178
     179                        if (socket != null) {
     180                                socket.close();
     181                                socket = null;
     182                        }
     183                } catch (Exception e) {
     184                        log.error("failed to stop connection: ", e);
     185                }
     186                log.debug("connection closed");
     187        }
     188
     189        @Override
     190        public void childChangedState(Joinable joinable, JoinableState state) {
     191                proceedToState(state);
     192        }
     193
     194        @Override
     195        protected void joinableJoin() throws InterruptedException {
     196                Dispatching.join(threads);
     197
     198        }
    172199
    173200}
Note: See TracChangeset for help on using the changeset viewer.