source: java/main/src/main/java/com/framsticks/util/dispatching/Thread.java @ 96

Last change on this file since 96 was 96, checked in by psniegowski, 11 years ago

HIGHLIGHTS:

  • cleanup Instance management
    • extract Instance interface
    • extract Instance common algorithms to InstanceUtils?
  • fix closing issues: Ctrl+C or window close button

properly shutdown whole program

by Java Framsticks framework

  • fix parsing and printing of all request types
  • hide exception passing in special handle method of closures
    • substantially improve readability of closures
    • basically enable use of exception in asynchronous closures

(thrown exception is transported back to the caller)

  • implement call request on both sides

CHANGELOG:
Further improve calling.

Improve instance calling.

Calling is working on both sides.

Improve exception handling in testing.

Waiters do not supercede other apllication exception being thrown.

Finished parsing and printing of all request types (with tests).

Move implementation and tests of request parsing to Request.

Add tests for Requests.

Improve waits in asynchronours tests.

Extract more algorithms to InstanceUtils?.

Extract Instance.resolve to InstanceUtils?.

Improve naming.

Improve passing exception in InstanceClient?.

Hide calling of passed functor in StateCallback?.

Hide Exception passing in asynchronous closures.

Hide exception passing in Future.

Make ResponseCallback? an abstract class.

Make Future an abstract class.

Minor change.

Move getPath to Path.to()

Move bindAccess to InstanceUtils?.

Extract common things to InstanceUtils?.

Fix synchronization bug in Connection.

Move resolve to InstanceUtils?.

Allow names of Joinable to be dynamic.

Add support for set request server side.

More fixes in communication.

Fix issues with parsing in connection.

Cut new line characters when reading.

More improvements.

Migrate closures to FramsticksException?.

Several changes.

Extract resolveAndFetch to InstanceUtils? algorithms.

Test resolving and fetching.

More fixes with function signature deduction.

Do not print default values in SimpleAbstractAccess?.

Add test of FramsClass? printing.

Improve FramsticksException? messages.

Add explicit dispatcher synchronization feature.

Rework assertions in tests.

Previous solution was not generic enough.

Allow addition of joinables to collection after start.

Extract SimulatorInstance? from RemoteInstance?.

Remove PrivateJoinableCollection?.

Improve connections.

Move shutdown hook to inside the Monitor.

It should work in TestNG tests, but it seems that
hooks are not called.

In ServerTest? client connects to testing server.

Move socket initialization to receiver thread.

Add proper closing on Ctrl+C (don't use signals).

Fix bugs with server accepting connections.

Merge Entity into Joinable.

Reworking ServerInstance?.

Extract more algorithm to InstanceUtils?.

Extract some common functionality from AbstractInstance?.

Functions were placed in InstanceUtils?.

Hide registry of Instance.

Use ValueParam? in Instance interface.

Minor change.

Extract Instance interface.

Old Instance is now AbstractInstance?.

File size: 3.3 KB
Line 
1package com.framsticks.util.dispatching;
2
3import org.apache.log4j.Logger;
4
5import java.util.LinkedList;
6import java.util.ListIterator;
7
8
9import com.framsticks.params.annotations.ParamAnnotation;
10import com.framsticks.util.dispatching.RunAt;
11
12/**
13 * @author Piotr Sniegowski
14 */
15public class Thread<C> extends AbstractJoinable implements Dispatcher<C> {
16
17        private static final Logger log = Logger.getLogger(Thread.class.getName());
18
19        protected final java.lang.Thread thread;
20
21        private final LinkedList<Task<? extends C>> queue = new LinkedList<>();
22
23        public Thread() {
24                thread = new java.lang.Thread(new java.lang.Runnable() {
25                        @Override
26                        public void run() {
27                                Thread.this.routine();
28                        }
29                });
30        }
31
32
33        public Thread(java.lang.Thread thread) {
34                this.thread = thread;
35        }
36
37        @Override
38        protected void joinableStart() {
39                thread.start();
40        }
41
42        @Override
43        public final boolean isActive() {
44                return thread.equals(java.lang.Thread.currentThread());
45        }
46
47        protected void routine() {
48                log.debug("starting thread " + this);
49                assert getMonitor() != null;
50                ExceptionHandler exceptionHandler = getMonitor().getTaskExceptionHandler();
51                while (!java.lang.Thread.interrupted()) {
52                        Task<? extends C> task;
53                        synchronized (queue) {
54                                if (queue.isEmpty()) {
55                                        try {
56                                                queue.wait();
57                                        } catch (InterruptedException ignored) {
58                                                break;
59                                        }
60                                        continue;
61                                }
62                                task = queue.peekFirst();
63                                assert task != null;
64                                if (task.moment > System.currentTimeMillis()) {
65                                        try {
66                                                queue.wait(task.moment - System.currentTimeMillis());
67                                        } catch (InterruptedException ignored) {
68                                                continue;
69                                        }
70                                        continue;
71                                }
72                                queue.pollFirst();
73                        }
74                        try {
75                                task.run();
76                        } catch (Exception e) {
77                                if (exceptionHandler != null) {
78                                        if (exceptionHandler.handle(this, e)) {
79                                                continue;
80                                        }
81                                }
82                                log.error("error in thread: ", e);
83                        }
84                }
85                log.debug("finishing thread " + this);
86                finish();
87        }
88
89        protected void enqueueTask(Task<? extends C> task) {
90                synchronized (queue) {
91                        ListIterator<Task<? extends C>> i = queue.listIterator();
92                        while (i.hasNext()) {
93                                Task<? extends C> t = i.next();
94                                if (t.getMoment() > task.getMoment()) {
95                                        i.previous();
96                                        i.add(task);
97                                        task = null;
98                                        break;
99                                }
100                        }
101                        if (task != null) {
102                                queue.add(task);
103                        }
104
105                        /*
106                        Iterator<Task> j = queue.iterator();
107                        Task prev = null;
108                        while (j.hasNext()) {
109                                Task next = j.next();
110                                assert (prev == null) || prev.getMoment() <= next.getMoment();
111                                prev = next;
112                        }
113                        */
114                        queue.notify();
115                }
116        }
117
118        @Override
119        public void dispatch(final RunAt<? extends C> runnable) {
120                if (!(runnable instanceof Task)) {
121                        enqueueTask(new Task<C>() {
122                                @Override
123                                public void run() {
124                                        runnable.run();
125                                }
126                        });
127                        return;
128                }
129                enqueueTask((Task<? extends C>) runnable);
130        }
131
132        @Override
133        protected void joinableInterrupt() {
134                thread.interrupt();
135        }
136
137        @Override
138        protected void joinableJoin() throws InterruptedException {
139                thread.join(500);
140                log.debug("joined " + this);
141        }
142
143        @ParamAnnotation
144        public void setName(String name) {
145                thread.setName(name);
146        }
147
148        @ParamAnnotation
149        public String getName() {
150                return thread.getName();
151        }
152
153        public static boolean interrupted() {
154                return java.lang.Thread.interrupted();
155        }
156
157        @Override
158        public String toString() {
159                return getName();
160        }
161
162        @Override
163        protected void joinableFinish() {
164        }
165
166}
Note: See TracBrowser for help on using the repository browser.