1 | package com.framsticks.util.dispatching; |
---|
2 | |
---|
3 | import org.apache.logging.log4j.Logger; |
---|
4 | import org.apache.logging.log4j.LogManager; |
---|
5 | |
---|
6 | |
---|
7 | import com.framsticks.params.annotations.ParamAnnotation; |
---|
8 | import com.framsticks.util.dispatching.RunAt; |
---|
9 | |
---|
10 | /** |
---|
11 | * @author Piotr Sniegowski |
---|
12 | */ |
---|
13 | public class Thread<C> extends AbstractJoinable implements Dispatcher<C> { |
---|
14 | |
---|
15 | private static final Logger log = LogManager.getLogger(Thread.class); |
---|
16 | |
---|
17 | protected final java.lang.Thread thread; |
---|
18 | |
---|
19 | protected final Object condition = new Object(); |
---|
20 | private RunnableQueue<C> queue = new RunnableQueue<>(); |
---|
21 | |
---|
22 | public Thread() { |
---|
23 | thread = new java.lang.Thread(new Runnable() { |
---|
24 | @Override |
---|
25 | public void run() { |
---|
26 | Thread.this.routine(); |
---|
27 | } |
---|
28 | }); |
---|
29 | } |
---|
30 | |
---|
31 | public Thread(java.lang.Thread thread) { |
---|
32 | this.thread = thread; |
---|
33 | } |
---|
34 | |
---|
35 | @Override |
---|
36 | protected void joinableStart() { |
---|
37 | thread.start(); |
---|
38 | } |
---|
39 | |
---|
40 | @Override |
---|
41 | public final boolean isActive() { |
---|
42 | return thread.equals(java.lang.Thread.currentThread()); |
---|
43 | } |
---|
44 | |
---|
45 | protected void routine() { |
---|
46 | log.debug("starting thread {}", this); |
---|
47 | assert getMonitor() != null; |
---|
48 | while (!java.lang.Thread.interrupted()) { |
---|
49 | RunAt<? extends C> runnable; |
---|
50 | synchronized (condition) { |
---|
51 | if (queue.isEmpty()) { |
---|
52 | try { |
---|
53 | condition.wait(); |
---|
54 | } catch (InterruptedException ignored) { |
---|
55 | break; |
---|
56 | } |
---|
57 | continue; |
---|
58 | } |
---|
59 | runnable = queue.pollFirst(); |
---|
60 | } |
---|
61 | if (runnable != null) { |
---|
62 | try { |
---|
63 | runnable.run(); |
---|
64 | } catch (Exception e) { |
---|
65 | final ExceptionDispatcherHandler exceptionHandler = getMonitor().getTaskExceptionHandler(); |
---|
66 | if (exceptionHandler != null) { |
---|
67 | if (exceptionHandler.handle(this, e)) { |
---|
68 | continue; |
---|
69 | } |
---|
70 | } |
---|
71 | log.error("error in thread: ", e); |
---|
72 | } |
---|
73 | } |
---|
74 | } |
---|
75 | log.debug("finishing thread {}", this); |
---|
76 | finishJoinable(); |
---|
77 | } |
---|
78 | |
---|
79 | |
---|
80 | @Override |
---|
81 | public void dispatch(RunAt<? extends C> runnable) { |
---|
82 | synchronized (condition) { |
---|
83 | queue.push(runnable); |
---|
84 | condition.notifyAll(); |
---|
85 | } |
---|
86 | } |
---|
87 | |
---|
88 | public RunnableQueue<C> switchQueue(RunnableQueue<C> queue) { |
---|
89 | synchronized (condition) { |
---|
90 | RunnableQueue<C> result = this.queue; |
---|
91 | this.queue = queue; |
---|
92 | return result; |
---|
93 | } |
---|
94 | } |
---|
95 | |
---|
96 | @Override |
---|
97 | protected void joinableInterrupt() { |
---|
98 | thread.interrupt(); |
---|
99 | } |
---|
100 | |
---|
101 | @Override |
---|
102 | protected void joinableJoin() throws InterruptedException { |
---|
103 | thread.join(500); |
---|
104 | log.debug("joined {}", this); |
---|
105 | } |
---|
106 | |
---|
107 | @ParamAnnotation |
---|
108 | public Thread<C> setName(String name) { |
---|
109 | thread.setName(name); |
---|
110 | return this; |
---|
111 | } |
---|
112 | |
---|
113 | @ParamAnnotation |
---|
114 | public String getName() { |
---|
115 | return thread.getName(); |
---|
116 | } |
---|
117 | |
---|
118 | public static boolean interrupted() { |
---|
119 | return java.lang.Thread.interrupted(); |
---|
120 | } |
---|
121 | |
---|
122 | // @Override |
---|
123 | // public String toString() { |
---|
124 | // return getName(); |
---|
125 | // } |
---|
126 | |
---|
127 | @Override |
---|
128 | protected void joinableFinish() { |
---|
129 | } |
---|
130 | |
---|
131 | } |
---|