package com.framsticks.util.dispatching; import org.apache.log4j.Logger; import java.util.LinkedList; import java.util.ListIterator; import com.framsticks.params.annotations.ParamAnnotation; import com.framsticks.util.dispatching.RunAt; /** * @author Piotr Sniegowski */ public class Thread extends AbstractJoinable implements Dispatcher { private static final Logger log = Logger.getLogger(Thread.class.getName()); protected final java.lang.Thread thread; private final LinkedList> queue = new LinkedList<>(); public Thread() { thread = new java.lang.Thread(new java.lang.Runnable() { @Override public void run() { Thread.this.routine(); } }); } public Thread(java.lang.Thread thread) { this.thread = thread; } @Override protected void joinableStart() { thread.start(); } @Override public final boolean isActive() { return thread.equals(java.lang.Thread.currentThread()); } protected void routine() { log.debug("starting thread " + this); while (!java.lang.Thread.interrupted()) { Task task; synchronized (queue) { if (queue.isEmpty()) { try { queue.wait(); } catch (InterruptedException ignored) { break; } continue; } task = queue.peekFirst(); assert task != null; if (task.moment > System.currentTimeMillis()) { try { queue.wait(task.moment - System.currentTimeMillis()); } catch (InterruptedException ignored) { continue; } continue; } queue.pollFirst(); } try { task.run(); } catch (Exception e) { log.error("error in thread: ", e); } } log.debug("finishing thread " + this); finish(); } protected void enqueueTask(Task task) { synchronized (queue) { ListIterator> i = queue.listIterator(); while (i.hasNext()) { Task t = i.next(); if (t.getMoment() > task.getMoment()) { i.previous(); i.add(task); task = null; break; } } if (task != null) { queue.add(task); } /* Iterator j = queue.iterator(); Task prev = null; while (j.hasNext()) { Task next = j.next(); assert (prev == null) || prev.getMoment() <= next.getMoment(); prev = next; } */ queue.notify(); } } @Override public void dispatch(final RunAt runnable) { if (!(runnable instanceof Task)) { enqueueTask(new Task() { @Override public void run() { runnable.run(); } }); return; } enqueueTask((Task) runnable); } @Override protected void joinableInterrupt() { thread.interrupt(); } @Override protected void joinableJoin() throws InterruptedException { thread.join(500); log.debug("joined " + this); } @ParamAnnotation public void setName(String name) { thread.setName(name); } @ParamAnnotation public String getName() { return thread.getName(); } public static boolean interrupted() { return java.lang.Thread.interrupted(); } @Override public String toString() { return getName(); } @Override protected void joinableFinish() { } }