- Timestamp:
- 07/14/13 23:20:04 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
java/main/src/main/java/com/framsticks/util/dispatching/Thread.java
r100 r101 3 3 import org.apache.logging.log4j.Logger; 4 4 import org.apache.logging.log4j.LogManager; 5 6 import java.util.LinkedList;7 import java.util.ListIterator;8 5 9 6 … … 14 11 * @author Piotr Sniegowski 15 12 */ 16 public class Thread<C> extends AbstractJoinable implements JoinableDispatcher<C> {13 public class Thread<C> extends AbstractJoinable implements Dispatcher<C> { 17 14 18 15 private static final Logger log = LogManager.getLogger(Thread.class); … … 20 17 protected final java.lang.Thread thread; 21 18 22 private final LinkedList<Task<? extends C>> queue = new LinkedList<>(); 19 protected final Object condition = new Object(); 20 private RunnableQueue<C> queue = new RunnableQueue<>(); 23 21 24 22 public Thread() { 25 thread = new java.lang.Thread(new java.lang.Runnable() {23 thread = new java.lang.Thread(new Runnable() { 26 24 @Override 27 25 public void run() { … … 30 28 }); 31 29 } 32 33 30 34 31 public Thread(java.lang.Thread thread) { … … 51 48 ExceptionHandler exceptionHandler = getMonitor().getTaskExceptionHandler(); 52 49 while (!java.lang.Thread.interrupted()) { 53 Task<? extends C> task;54 synchronized ( queue) {50 RunAt<? extends C> runnable; 51 synchronized (condition) { 55 52 if (queue.isEmpty()) { 56 53 try { 57 queue.wait();54 condition.wait(); 58 55 } catch (InterruptedException ignored) { 59 56 break; … … 61 58 continue; 62 59 } 63 task = queue.peekFirst(); 64 assert task != null; 65 if (task.moment > System.currentTimeMillis()) { 66 try { 67 queue.wait(task.moment - System.currentTimeMillis()); 68 } catch (InterruptedException ignored) { 69 continue; 60 runnable = queue.pollFirst(); 61 } 62 if (runnable != null) { 63 try { 64 runnable.run(); 65 } catch (Exception e) { 66 if (exceptionHandler != null) { 67 if (exceptionHandler.handle(this, e)) { 68 continue; 69 } 70 70 } 71 continue;71 log.error("error in thread: ", e); 72 72 } 73 queue.pollFirst();74 }75 try {76 task.run();77 } catch (Exception e) {78 if (exceptionHandler != null) {79 if (exceptionHandler.handle(this, e)) {80 continue;81 }82 }83 log.error("error in thread: ", e);84 73 } 85 74 } 86 75 log.debug("finishing thread {}", this); 87 finish ();76 finishJoinable(); 88 77 } 89 78 90 protected void enqueueTask(Task<? extends C> task) {91 synchronized (queue) {92 ListIterator<Task<? extends C>> i = queue.listIterator();93 while (i.hasNext()) {94 Task<? extends C> t = i.next();95 if (t.getMoment() > task.getMoment()) {96 i.previous();97 i.add(task);98 task = null;99 break;100 }101 }102 if (task != null) {103 queue.add(task);104 }105 79 106 /* 107 Iterator<Task> j = queue.iterator(); 108 Task prev = null; 109 while (j.hasNext()) { 110 Task next = j.next(); 111 assert (prev == null) || prev.getMoment() <= next.getMoment(); 112 prev = next; 113 } 114 */ 115 queue.notify(); 80 @Override 81 public void dispatch(RunAt<? extends C> runnable) { 82 synchronized (condition) { 83 queue.push(runnable); 84 condition.notifyAll(); 116 85 } 117 86 } 118 87 119 @Override 120 public void dispatch(final RunAt<? extends C> runnable) { 121 if (!(runnable instanceof Task)) { 122 enqueueTask(new Task<C>(runnable) { 123 @Override 124 protected void runAt() { 125 runnable.run(); 126 } 127 }); 128 return; 88 public RunnableQueue<C> switchQueue(RunnableQueue<C> queue) { 89 synchronized (condition) { 90 RunnableQueue<C> result = this.queue; 91 this.queue = queue; 92 return result; 129 93 } 130 enqueueTask((Task<? extends C>) runnable);131 94 } 132 95
Note: See TracChangeset
for help on using the changeset viewer.