public class TimerTest { public static void main(String[] args) { final Timer1 timer = new Timer1("定时器线程"); Ticket ticket = new Ticket(timer); Thread t0 = new Thread(ticket); t0.start(); Thread t1 = new Thread(ticket); t1.start(); Thread t2 = new Thread(ticket); t2.start(); } static class Ticket implements Runnable { private Timer1 timer = null; Ticket(Timer1 timer){ this.timer = timer; } public void run() { MyTimerTask myTask = new MyTimerTask("TimerTask1 1"); timer.schedule(myTask, 2000L, 1000L); } } }
public class MyTimerTask extends TimerTask1 { private String taskName; public MyTimerTask(String taskName) { this.taskName = taskName; } public String getTaskName() { return taskName; } public void setTaskName(String taskName) { this.taskName = taskName; } @Override public void run() { System.out.println("当前执行的任务是:" + taskName); } }
public class Timer1 { private final TaskQueue queue = new TaskQueue();//这是一个最小堆,它存放所有TimerTask。一个数组 //定时任务只会创建一个线程,所以如果存在多个任务,且任务时间过长,超过了两个任务的间隔时间 private final TimerThread thread = new TimerThread(queue);//queue中的任务,执行完从任务队列中移除。 /** * This object causes the timer's task execution thread to exit * gracefully when there are no live references to the Timer object and no * tasks in the timer queue. It is used in preference to a finalizer on * Timer as such a finalizer would be susceptible to a subclass's * finalizer forgetting to call it. */ private final Object threadReaper = new Object() { protected void finalize() throws Throwable { synchronized(queue) { thread.newTasksMayBeScheduled = false; queue.notify(); // In case queue is empty. } } }; private final static AtomicInteger nextSerialNumber = new AtomicInteger(0); private static int serialNumber() { return nextSerialNumber.getAndIncrement(); } public Timer1() { this("Timer-" + serialNumber()); } public Timer1(boolean isDaemon) { this("Timer-" + serialNumber(), isDaemon);//是否守护线程 } public Timer1(String name) { thread.setName(name); thread.start(); } public Timer1(String name, boolean isDaemon) { thread.setName(name); thread.setDaemon(isDaemon); thread.start(); } public void schedule(TimerTask1 task, long delay) {//在时间等于或超过time的时候执行且只执行一次task, if (delay < 0) throw new IllegalArgumentException("Negative delay."); sched(task, System.currentTimeMillis()+delay, 0); } public void schedule(TimerTask1 task, Date time) { sched(task, time.getTime(), 0); } public void schedule(TimerTask1 task, long delay, long period) {//在时间等于或超过time的时候首次执行task,之后每隔period毫秒重复执行一次task 。 if (delay < 0) throw new IllegalArgumentException("Negative delay."); if (period <= 0) throw new IllegalArgumentException("Non-positive period."); sched(task, System.currentTimeMillis()+delay, -period); } public void schedule(TimerTask1 task, Date firstTime, long period) { if (period <= 0) throw new IllegalArgumentException("Non-positive period."); sched(task, firstTime.getTime(), -period); } public void scheduleAtFixedRate(TimerTask1 task, long delay, long period) { if (delay < 0) throw new IllegalArgumentException("Negative delay."); if (period <= 0) throw new IllegalArgumentException("Non-positive period."); sched(task, System.currentTimeMillis()+delay, period); } public void scheduleAtFixedRate(TimerTask1 task, Date firstTime, long period) { if (period <= 0) throw new IllegalArgumentException("Non-positive period."); sched(task, firstTime.getTime(), period); } private void sched(TimerTask1 task, long time, long period) { if (time < 0) throw new IllegalArgumentException("Illegal execution time."); // Constrain value of period sufficiently to prevent numeric // overflow while still being effectively infinitely large. if (Math.abs(period) > (Long.MAX_VALUE >> 1)) period >>= 1; synchronized(queue) { if (!thread.newTasksMayBeScheduled) throw new IllegalStateException("Timer already cancelled."); synchronized(task.lock) { if (task.state != TimerTask1.VIRGIN) throw new IllegalStateException( "Task already scheduled or cancelled"); task.nextExecutionTime = time; task.period = period; task.state = TimerTask1.SCHEDULED; } queue.add(task); //当timer对象调用schedule方法时,都会向队列添加元素,并唤醒TaskQueue队列上的线程, //这时候TimerThread会被唤醒,继续执行mainLoop方法。 if (queue.getMin() == task) queue.notify();//多线程对同一队列出队入队,使用synchronized,queue.notify() } } public void cancel() { synchronized(queue) { thread.newTasksMayBeScheduled = false; queue.clear(); queue.notify(); // In case queue was already empty. } } public int purge() { int result = 0; synchronized(queue) { for (int i = queue.size(); i > 0; i--) { if (queue.get(i).state == TimerTask1.CANCELLED) { queue.quickRemove(i); result++; } } if (result != 0) queue.heapify(); } return result; } } class TimerThread extends Thread { /** * This flag is set to false by the reaper to inform us that there * are no more live references to our Timer object. Once this flag * is true and there are no more tasks in our queue, there is no * work left for us to do, so we terminate gracefully. Note that * this field is protected by queue's monitor! */ boolean newTasksMayBeScheduled = true; private TaskQueue queue; TimerThread(TaskQueue queue) { this.queue = queue; } public void run() { try { mainLoop(); } finally { // Someone killed this Thread, behave as if Timer cancelled synchronized(queue) { newTasksMayBeScheduled = false; queue.clear(); // Eliminate obsolete references } } } private void mainLoop() {//拿出任务队列中的第一个任务,如果执行时间还没有到,则继续等待,否则立即执行。 while (true) {//函数执行的是一个死循环 try { TimerTask1 task; boolean taskFired; synchronized(queue) {//并且加了queue锁,从而保证是线程安全的。 // 队列为空等待 while (queue.isEmpty() && newTasksMayBeScheduled) queue.wait(); if (queue.isEmpty()) break; // Queue is empty and will forever remain; die // Queue nonempty; look at first evt and do the right thing long currentTime, executionTime; task = queue.getMin(); /* queue.getMin()找到任务队列中执行时间最早的元素, 然后判断元素的state,period,nextExecutionTime,SCHEDULED等属性,从而确定任务是否可执行。 主要是判断这几个属性:1,state 属性,如果为取消(即我们调用了timer的cancel方法取消了某一任务), 则会从队列中删除这个元素,然后继续循环;2,period 属性,如果为单次执行,这个值为0,周期执行的话, 为我们传入的intervalTime值,如果为0,则会移出队列,并设置任务状态为已执行,然后下面的 task.run()会执行任务, 如果这个值不为0,则会修正队列,设置这个任务的再一次执行时间,queue.rescheduleMin这个函数来完成的这个操作; 3,taskFired 属性, 如果 executionTime<=currentTime 则设置为true,可以执行, 否则线程就会进行休眠,休眠时间为两者之差。 */ synchronized(task.lock) { if (task.state == TimerTask1.CANCELLED) { queue.removeMin(); continue; // No action required, poll queue again } currentTime = System.currentTimeMillis(); executionTime = task.nextExecutionTime; if (taskFired = (executionTime<=currentTime)) { if (task.period == 0) { // Non-repeating, remove queue.removeMin(); task.state = TimerTask1.EXECUTED; } else { // Repeating task, reschedule queue.rescheduleMin( task.period<0 ? currentTime - task.period : executionTime + task.period); } } } //会对TaskQueue队列的首元素进行判断,看是否达到执行时间, //如果没有,则进行休眠,休眠时间为队首任务的开始执行时间到当前时间的时间差。 if (!taskFired) queue.wait(executionTime - currentTime); } if (taskFired) task.run(); } catch(InterruptedException e) { } } } } class TaskQueue { /*TaskQueue是一个平衡二叉堆,具有最小 nextExecutionTime 的 TimerTask 在队列中为 queue[1] , 也就是堆中的根节点。第 n 个位置 queue[n] 的子节点分别在 queue[2n] 和 queue[2n+1] 也就是说TimerTask 在堆中的位置其实是通过nextExecutionTime 来决定的。 nextExecutionTime 越小,那么在堆中的位置越靠近根,越有可能先被执行。而nextExecutionTime意思就是下一次执行开始的时间。 */ private TimerTask1[] queue = new TimerTask1[128];//默认128 private int size = 0; int size() { return size; } void add(TimerTask1 task) {//根据执行时间的先后对数组元素进行排序,从而确定最先开始执行的任务, if (size + 1 == queue.length) queue = Arrays.copyOf(queue, 2*queue.length); queue[++size] = task; fixUp(size); } TimerTask1 getMin() { return queue[1]; } TimerTask1 get(int i) { return queue[i]; } void removeMin() { queue[1] = queue[size]; queue[size--] = null; // Drop extra reference to prevent memory leak fixDown(1); } /** * Removes the ith element from queue without regard for maintaining * the heap invariant. Recall that queue is one-based, so * 1 <= i <= size. */ void quickRemove(int i) { assert i <= size; queue[i] = queue[size]; queue[size--] = null; // Drop extra ref to prevent memory leak } /** * Sets the nextExecutionTime associated with the head task to the * specified value, and adjusts priority queue accordingly. */ void rescheduleMin(long newTime) { queue[1].nextExecutionTime = newTime; fixDown(1); } boolean isEmpty() { return size==0; } void clear() { // Null out task references to prevent memory leak for (int i=1; i<=size; i++) queue[i] = null; size = 0; } private void fixUp(int k) {//维护最小堆 while (k > 1) { int j = k >> 1; if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime) break; TimerTask1 tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp; k = j; } } private void fixDown(int k) { int j; while ((j = k << 1) <= size && j > 0) { if (j < size && queue[j].nextExecutionTime > queue[j+1].nextExecutionTime) j++; // j indexes smallest kid if (queue[k].nextExecutionTime <= queue[j].nextExecutionTime) break; TimerTask1 tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp; k = j; } } void heapify() { for (int i = size/2; i >= 1; i--) fixDown(i); } }
public abstract class TimerTask1 implements Runnable { /** * This object is used to control access to the TimerTask1 internals. */ final Object lock = new Object(); /** * The state of this task, chosen from the constants below. */ int state = VIRGIN; /** * This task has not yet been scheduled. */ static final int VIRGIN = 0; /** * This task is scheduled for execution. If it is a non-repeating task, * it has not yet been executed. */ static final int SCHEDULED = 1; /** * This non-repeating task has already executed (or is currently * executing) and has not been cancelled. */ static final int EXECUTED = 2; /** * This task has been cancelled (with a call to TimerTask1.cancel). */ static final int CANCELLED = 3; /** * Next execution time for this task in the format returned by * System.currentTimeMillis, assuming this task is scheduled for execution. * For repeating tasks, this field is updated prior to each task execution. */ long nextExecutionTime; /** * Period in milliseconds for repeating tasks. A positive value indicates * fixed-rate execution. A negative value indicates fixed-delay execution. * A value of 0 indicates a non-repeating task. */ long period = 0; /** * Creates a new timer task. */ protected TimerTask1() { } /** * The action to be performed by this timer task. */ public abstract void run(); public boolean cancel() { synchronized(lock) { boolean result = (state == SCHEDULED); state = CANCELLED; return result; } } public long scheduledExecutionTime() { synchronized(lock) { return (period < 0 ? nextExecutionTime + period : nextExecutionTime - period); } } }