zoukankan      html  css  js  c++  java
  • JDK线程池框架Executor源码阅读

    Executor框架

    Executor

    ExecutorService

    AbstractExecutorService

    ThreadPoolExecutor

    ThreadPoolExecutor继承AbstractExecutorService,是一个线程池的具体的实现

    内部类 Worker

        private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable 

     代表线程池的一个工作线程,继承了AbstractQueuedSynchronizer,将该类实现为一个简单的互斥锁 。

    重要的成员:

        final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;

    Worker封装了执行任务的线程,首次执行的任务,完成的任务数。

            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }

    Worker初始化设置state为 -1(1表示锁被占用,0表示未被占用),因为 interruptIfStarted 方法中有 getState() >= 0,这样刚初始化还没运行task的线程不能被中断。

    1         void interruptIfStarted() {
    2             Thread t;
    3             if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
    4                 try {
    5                     t.interrupt();
    6                 } catch (SecurityException ignore) {
    7                 }
    8             }
    9         }
    View Code

    Worker.thread开始运行,执行 Worker 的run(),run()中调用 ThreadPoolExecutor 的 runWorker(this)

     1 final void runWorker(Worker w) {
     2         Thread wt = Thread.currentThread();
     3         Runnable task = w.firstTask;
     4         w.firstTask = null;
     5         w.unlock(); // allow interrupts
     6         boolean completedAbruptly = true;
     7         try {
     8             while (task != null || (task = getTask()) != null) {
     9                 w.lock();
    10                 // If pool is stopping, ensure thread is interrupted;
    11                 // if not, ensure thread is not interrupted.  This
    12                 // requires a recheck in second case to deal with
    13                 // shutdownNow race while clearing interrupt
    14                 if ((runStateAtLeast(ctl.get(), STOP) ||
    15                      (Thread.interrupted() &&
    16                       runStateAtLeast(ctl.get(), STOP))) &&
    17                     !wt.isInterrupted())
    18                     wt.interrupt();
    19                 try {
    20                     beforeExecute(wt, task);
    21                     Throwable thrown = null;
    22                     try {
    23                         task.run();
    24                     } catch (RuntimeException x) {
    25                         thrown = x; throw x;
    26                     } catch (Error x) {
    27                         thrown = x; throw x;
    28                     } catch (Throwable x) {
    29                         thrown = x; throw new Error(x);
    30                     } finally {
    31                         afterExecute(task, thrown);
    32                     }
    33                 } finally {
    34                     task = null;
    35                     w.completedTasks++;
    36                     w.unlock();
    37                 }
    38             }
    39             completedAbruptly = false;
    40         } finally {
    41             processWorkerExit(w, completedAbruptly);
    42         }
    43     }
    View Code

    这段代码是线程池实际执行task的代码,总结了几点说明一下:

    1. 除 firstTask 外, 线程会不断的调用 getTask() 从 workQueue 中拉取任务执行;

    2. task.run()前后执行 beforeExecute(wt, task); afterExecute(task, thrown)方法。这两个方法对自定义线程池功能扩展。

    3. task.run()抛出的异常继续向上抛出。

    4. task.run()执行时,序获得该Worker的锁。执行完成释放锁。

    5. 第5行w.unlock(),设置Worker.state = 0,设置锁空闲状态,使其可以响应中断。其默认状态为 -1,不能响应中断。

    6. 如果线程池停止,确认中断thread中断。处理shutDownNow竞争的问题,仔细阅读14-18行代码。

    7. processWorkerExit(w, completedAbruptly) 处理Worker的退出。

     getTask()的实现:

     1     private Runnable getTask() {
     2         boolean timedOut = false; // Did the last poll() time out?
     3 
     4         retry:
     5         for (;;) {
     6             int c = ctl.get();
     7             int rs = runStateOf(c);
     8 
     9             // Check if queue empty only if necessary.
    10             if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    11                 decrementWorkerCount();
    12                 return null;
    13             }
    14 
    15             boolean timed;      // Are workers subject to culling?
    16 
    17             for (;;) {
    18                 int wc = workerCountOf(c);
    19                 timed = allowCoreThreadTimeOut || wc > corePoolSize;
    20 
    21                 if (wc <= maximumPoolSize && ! (timedOut && timed))
    22                     break;
    23                 if (compareAndDecrementWorkerCount(c))
    24                     return null;
    25                 c = ctl.get();  // Re-read ctl
    26                 if (runStateOf(c) != rs)
    27                     continue retry;
    28                 // else CAS failed due to workerCount change; retry inner loop
    29             }
    30 
    31             try {
    32                 Runnable r = timed ?
    33                     workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    34                     workQueue.take();
    35                 if (r != null)
    36                     return r;
    37                 timedOut = true;
    38             } catch (InterruptedException retry) {
    39                 timedOut = false;
    40             }
    41         }
    42     }
    View Code

    getTask() 以blocking或者timed wait的方式从workQueue中拉取task,下面几种情况下返回null:

    1. 线程池中线程数量多于maximumPoolSize时(因为调用了setMaximumPoolSize())

    2. 线程池STOP

    3. 线程池SHUTDOWN,workQueue为空

    4. Worker等待任务超时,并且在等待之前和之后 Worker 是可 termination 的,即 (allowCoreThreadTimeOut || wc > corePoolSize)。

     getTask()返回null时,completedAbruptly=false,runWorker()跳出while循环,执行processWorkerExit(),它的实现:

     1     private void processWorkerExit(Worker w, boolean completedAbruptly) {
     2         if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
     3             decrementWorkerCount();
     4 
     5         final ReentrantLock mainLock = this.mainLock;
     6         mainLock.lock();
     7         try {
     8             completedTaskCount += w.completedTasks;
     9             workers.remove(w);
    10         } finally {
    11             mainLock.unlock();
    12         }
    13 
    14         tryTerminate();
    15 
    16         int c = ctl.get();
    17         if (runStateLessThan(c, STOP)) {
    18             if (!completedAbruptly) {
    19                 int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
    20                 if (min == 0 && ! workQueue.isEmpty())
    21                     min = 1;
    22                 if (workerCountOf(c) >= min)
    23                     return; // replacement not needed
    24             }
    25             addWorker(null, false);
    26         }
    27     }
    View Code

    该方法主要由 Worker.thread调用,完成清理死亡的Worker和记录一些计量值。它从workers移除Worker、执行可能的terminate、替换Worker如果是因为用户task exeption导致该Wroker的退出,

    或者当前线程池线程数小于corePoolSize。注意,除非completedAbruptly=true,不然workerCount已经在getTask()中被调整过了的(即已经减1了)。

    内部类 CallerRunsPolicy、AbortPolicy、DiscardPolicy、DiscardOldestPolicy

    几种不同的拒绝策略。

    主要成员

        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        private static final int COUNT_BITS = Integer.SIZE - 3;
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        // runState is stored in the high-order bits
        private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;

    AtomicInteger类型的私有变量ctl,使用高3bit表示线程池状态runState,低29bit来表示线程池线程总数。可见,Executor线程数量是有限制的,最多(2^29)-1个(about 500 million)。

    前3bit表示线程池生命周期的5个状态,RNUNING,SHUTDOWN,STOP,TIDYING,TERMINATED。注释中给出了线程池所有可能的状态转化,随着生命周期的变更,几个状态从小到大赋值。

    • RUNNING 在ThreadPoolExecutor被实例化的时候就是这个状态
    • SHUTDOWN 通常是已经执行过shutdown()方法,不再接受新任务,等待线程池中和队列中任务完成
    • STOP 通常是已经执行过shutdownNow()方法,不接受新任务,队列中的任务也不再执行,并尝试终止线程池中的线程
    • TIDYING 线程池为空,就会到达这个状态,执行terminated()方法
    • TERMINATED terminated()执行完毕,就会到达这个状态,ThreadPoolExecutor终结
        private final BlockingQueue<Runnable> workQueue;

    存储待执行tasks的任务队列。

        private final HashSet<Worker> workers = new HashSet<Worker>();

    工作线程集合。

        private final ReentrantLock mainLock = new ReentrantLock();

    访问workers时必须持有的锁。

        private final Condition termination = mainLock.newCondition();

    mainLock Condition 用来支持 awaitTermination。

        private int largestPoolSize;

    记录最大线程池大小,访问必须持有mainLock。

        private long completedTaskCount;

    记录完成的任务数目,访问必须持有mainLock。

        private volatile ThreadFactory threadFactory;

    volatile ThreadFactory 类变量,生产线程(addWorker方法)。用户可能会对线程总数有限制,在addWorker时可能导致失败。调用者必须有失败处理方案。

        private volatile RejectedExecutionHandler handler;

    线程被线程池拒绝接收时调用。

        private volatile boolean allowCoreThreadTimeOut;

    默认为false,core threads 保持永远激活状态。当为true时,核心线程等待keepAliveTime空闲时间后关闭。

        private volatile long keepAliveTime;

    线程最多空闲等待的时间,之后自动关闭。

        private volatile int corePoolSize;

    线程池最小存活的线程数目。当allowCoreThreadTimeOut为true时,该数目为0。

        private static final RejectedExecutionHandler defaultHandler =
            new AbortPolicy();

     默认的 rejected execution handler。

        private static final RuntimePermission shutdownPerm =
            new RuntimePermission("modifyThread");

    主要方法

    前面介绍Worker的时候介绍了一些线程池的方法。下面接着介绍其主要的方法。最主要的execute()方法:

     1     public void execute(Runnable command) {
     2         if (command == null)
     3             throw new NullPointerException();
     4         int c = ctl.get();
     5         if (workerCountOf(c) < corePoolSize) {
     6             if (addWorker(command, true))
     7                 return;
     8             c = ctl.get();
     9         }
    10         if (isRunning(c) && workQueue.offer(command)) {
    11             int recheck = ctl.get();
    12             if (! isRunning(recheck) && remove(command))
    13                 reject(command);
    14             else if (workerCountOf(recheck) == 0)
    15                 addWorker(null, false);
    16         }
    17         else if (!addWorker(command, false))
    18             reject(command);
    19     }
    View Code

    提交新任务的时候,如果没达到核心线程数corePoolSize,则开辟新线程执行。如果达到核心线程数corePoolSize, 而队列未满,则放入队列,否则开新线程处理任务,直到maximumPoolSize,超出则丢弃处理。

    代码中task加入workQueue前后都作了isRunning()的检查。也就是说,只有在加入队列后,线程池还保持RUNNING状态,才算加入workQueue成功。RejectedExecutionHandler处理丢弃的task,具体代码在reject(command)中。

    shutdown()方法

     1     public void shutdown() {
     2         final ReentrantLock mainLock = this.mainLock;
     3         mainLock.lock();
     4         try {
     5             checkShutdownAccess();
     6             advanceRunState(SHUTDOWN);
     7             interruptIdleWorkers();
     8             onShutdown(); // hook for ScheduledThreadPoolExecutor
     9         } finally {
    10             mainLock.unlock();
    11         }
    12         tryTerminate();
    13     }
    View Code

    调用shutdown()方法,线程池至少处于SHUTDOWN状态。线程池不再接收新任务,之前提交的任务仍会执行。

     shutdown()调用interruptIdleWorkers()方法,终止空闲的Workers:

     1     private void interruptIdleWorkers(boolean onlyOne) {
     2         final ReentrantLock mainLock = this.mainLock;
     3         mainLock.lock();
     4         try {
     5             for (Worker w : workers) {
     6                 Thread t = w.thread;
     7                 if (!t.isInterrupted() && w.tryLock()) {
     8                     try {
     9                         t.interrupt();
    10                     } catch (SecurityException ignore) {
    11                     } finally {
    12                         w.unlock();
    13                     }
    14                 }
    15                 if (onlyOne)
    16                     break;
    17             }
    18         } finally {
    19             mainLock.unlock();
    20         }
    21     }
    View Code

    shutdownNow()方法和shutdown()方法差不多实现。

    tryTerminate()方法是一个很重要的方法,在内部实现很多方法中,shutdown过程中,减少worker count或者从workQueue移除task后,需调用该方法。

     1     final void tryTerminate() {
     2         for (;;) {
     3             int c = ctl.get();
     4             if (isRunning(c) ||
     5                 runStateAtLeast(c, TIDYING) ||
     6                 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
     7                 return;
     8             if (workerCountOf(c) != 0) { // Eligible to terminate
     9                 interruptIdleWorkers(ONLY_ONE);
    10                 return;
    11             }
    12 
    13             final ReentrantLock mainLock = this.mainLock;
    14             mainLock.lock();
    15             try {
    16                 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
    17                     try {
    18                         terminated();
    19                     } finally {
    20                         ctl.set(ctlOf(TERMINATED, 0));
    21                         termination.signalAll();
    22                     }
    23                     return;
    24                 }
    25             } finally {
    26                 mainLock.unlock();
    27             }
    28             // else retry on failed CAS
    29         }
    30     }
    View Code

    该方法实现的是,如果线程池(SHUTDOWN and pool and queue empty)或者(STOP and pool empty),将线程池状态转为TERMINATED。如果线程池(SHUTDOWN and pool not empty and queue empty)或者(STOP and pool not empty),执行interruptIdleWorkers(ONLY_ONE),中断一个idle worker确保shutdown信号传递(系统不再接受新的任务,不再产生新的Worker,workercount减少为0,Worker数量也减少到0)。其它情况直接返回。

    awaitTermination(long timeout, TimeUnit unit)等待线程池终止。

     1     public boolean awaitTermination(long timeout, TimeUnit unit)
     2         throws InterruptedException {
     3         long nanos = unit.toNanos(timeout);
     4         final ReentrantLock mainLock = this.mainLock;
     5         mainLock.lock();
     6         try {
     7             for (;;) {
     8                 if (runStateAtLeast(ctl.get(), TERMINATED))
     9                     return true;
    10                 if (nanos <= 0)
    11                     return false;
    12                 nanos = termination.awaitNanos(nanos);
    13             }
    14         } finally {
    15             mainLock.unlock();
    16         }
    17     }
    View Code

    purge()方法:

     1     public void purge() {
     2         final BlockingQueue<Runnable> q = workQueue;
     3         try {
     4             Iterator<Runnable> it = q.iterator();
     5             while (it.hasNext()) {
     6                 Runnable r = it.next();
     7                 if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
     8                     it.remove();
     9             }
    10         } catch (ConcurrentModificationException fallThrough) {
    11             // Take slow path if we encounter interference during traversal.
    12             // Make copy for traversal and call remove for cancelled entries.
    13             // The slow path is more likely to be O(N*N).
    14             for (Object r : q.toArray())
    15                 if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
    16                     q.remove(r);
    17         }
    18 
    19         tryTerminate(); // In case SHUTDOWN and now empty
    20     }
    View Code

    总结

    ThreadPoolExecutor本身是个并发系统,关注作者编写并发程序的技巧(锁运用与竞争情况的考虑)。

    ScheduledExecutorService

    public interface ScheduledExecutorService extends ExecutorService

    可以延迟固定时间或者定制执行时间执行task的ExecutorService。在ExecutorService基础上额外提供了4中方法的服务:

        public ScheduledFuture<?> schedule(Runnable command,
                                           long delay, TimeUnit unit);
        public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                               long delay, TimeUnit unit);
        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                      long initialDelay,
                                                      long period,
                                                      TimeUnit unit);
        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                         long initialDelay,
                                                         long delay,
                                                         TimeUnit unit);

    ScheduledThreadPoolExecutor

    内部类 ScheduledFutureTask

        private class ScheduledFutureTask<V>
                extends FutureTask<V> implements RunnableScheduledFuture<V>

    封装在SchedualedThreadPoolExector执行的task,继承FutrueTask,同时实现了Comparable和Delay接口。可以判断该task是否是repeating task,获取当前Delay,可与其它ScheduledFutureTask比较。

    主要成员:

            //
            private final long sequenceNumber;
            private long time; 
            //repeating task需要被re-enqueued
            RunnableScheduledFuture<V> outerTask = this;
            //period可判断是否是repeating task。正值fixed-rate,负值fixed-delay。0代表non-repeating task
            private final long period;
            int heapIndex;
    View Code

    run()方法重写了FutureTask的run()方法,是该task执行的主方法。重写是为当periodic = true时,周期性的执行task。执行非周期性task和周期性task分别使用FutureTask的run()和runAndReset()方法。

     1         public void run() {
     2             boolean periodic = isPeriodic();
     3             if (!canRunInCurrentRunState(periodic))
     4                 cancel(false);
     5             else if (!periodic)
     6                 ScheduledFutureTask.super.run();
     7             else if (ScheduledFutureTask.super.runAndReset()) {
     8                 setNextRunTime();
     9                 reExecutePeriodic(outerTask);
    10             }
    11         }
    View Code

    canRunInCurrentRunState(periodic)判断能否在当前状态下运行,不能则调用cancel()方法:

            public boolean cancel(boolean mayInterruptIfRunning) {
                boolean cancelled = super.cancel(mayInterruptIfRunning);
                if (cancelled && removeOnCancel && heapIndex >= 0)
                    remove(this);
                return cancelled;
            }
    View Code

    内部类 DelayedWorkQueue

    DelayedWorkQueue是JDK1.7以后内部实现的类,其实现了PriorityQueue和DelayQueue的功能,之前直接使用PriorityQueue。还是看看他的具体实现,顺便复习下集合的实现:

        static class DelayedWorkQueue extends AbstractQueue<Runnable>
            implements BlockingQueue<Runnable> {
    
            /*
             * A DelayedWorkQueue is based on a heap-based data structure
             * like those in DelayQueue and PriorityQueue, except that
             * every ScheduledFutureTask also records its index into the
             * heap array. This eliminates the need to find a task upon
             * cancellation, greatly speeding up removal (down from O(n)
             * to O(log n)), and reducing garbage retention that would
             * otherwise occur by waiting for the element to rise to top
             * before clearing. But because the queue may also hold
             * RunnableScheduledFutures that are not ScheduledFutureTasks,
             * we are not guaranteed to have such indices available, in
             * which case we fall back to linear search. (We expect that
             * most tasks will not be decorated, and that the faster cases
             * will be much more common.)
             *
             * All heap operations must record index changes -- mainly
             * within siftUp and siftDown. Upon removal, a task's
             * heapIndex is set to -1. Note that ScheduledFutureTasks can
             * appear at most once in the queue (this need not be true for
             * other kinds of tasks or work queues), so are uniquely
             * identified by heapIndex.
             */
    
            private static final int INITIAL_CAPACITY = 16;
            private RunnableScheduledFuture[] queue =
                new RunnableScheduledFuture[INITIAL_CAPACITY];
            private final ReentrantLock lock = new ReentrantLock();
            private int size = 0;
    
            /**
             * Thread designated to wait for the task at the head of the
             * queue.  This variant of the Leader-Follower pattern
             * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
             * minimize unnecessary timed waiting.  When a thread becomes
             * the leader, it waits only for the next delay to elapse, but
             * other threads await indefinitely.  The leader thread must
             * signal some other thread before returning from take() or
             * poll(...), unless some other thread becomes leader in the
             * interim.  Whenever the head of the queue is replaced with a
             * task with an earlier expiration time, the leader field is
             * invalidated by being reset to null, and some waiting
             * thread, but not necessarily the current leader, is
             * signalled.  So waiting threads must be prepared to acquire
             * and lose leadership while waiting.
             */
            private Thread leader = null;
    
            /**
             * Condition signalled when a newer task becomes available at the
             * head of the queue or a new thread may need to become leader.
             */
            private final Condition available = lock.newCondition();
    
            /**
             * Set f's heapIndex if it is a ScheduledFutureTask.
             */
            private void setIndex(RunnableScheduledFuture f, int idx) {
                if (f instanceof ScheduledFutureTask)
                    ((ScheduledFutureTask)f).heapIndex = idx;
            }
    
            /**
             * Sift element added at bottom up to its heap-ordered spot.
             * Call only when holding lock.
             */
            private void siftUp(int k, RunnableScheduledFuture key) {
                while (k > 0) {
                    int parent = (k - 1) >>> 1;
                    RunnableScheduledFuture e = queue[parent];
                    if (key.compareTo(e) >= 0)
                        break;
                    queue[k] = e;
                    setIndex(e, k);
                    k = parent;
                }
                queue[k] = key;
                setIndex(key, k);
            }
    
            /**
             * Sift element added at top down to its heap-ordered spot.
             * Call only when holding lock.
             */
            private void siftDown(int k, RunnableScheduledFuture key) {
                int half = size >>> 1;
                while (k < half) {
                    int child = (k << 1) + 1;
                    RunnableScheduledFuture c = queue[child];
                    int right = child + 1;
                    if (right < size && c.compareTo(queue[right]) > 0)
                        c = queue[child = right];
                    if (key.compareTo(c) <= 0)
                        break;
                    queue[k] = c;
                    setIndex(c, k);
                    k = child;
                }
                queue[k] = key;
                setIndex(key, k);
            }
    
            /**
             * Resize the heap array.  Call only when holding lock.
             */
            private void grow() {
                int oldCapacity = queue.length;
                int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
                if (newCapacity < 0) // overflow
                    newCapacity = Integer.MAX_VALUE;
                queue = Arrays.copyOf(queue, newCapacity);
            }
    
            /**
             * Find index of given object, or -1 if absent
             */
            private int indexOf(Object x) {
                if (x != null) {
                    if (x instanceof ScheduledFutureTask) {
                        int i = ((ScheduledFutureTask) x).heapIndex;
                        // Sanity check; x could conceivably be a
                        // ScheduledFutureTask from some other pool.
                        if (i >= 0 && i < size && queue[i] == x)
                            return i;
                    } else {
                        for (int i = 0; i < size; i++)
                            if (x.equals(queue[i]))
                                return i;
                    }
                }
                return -1;
            }
    
            public boolean contains(Object x) {
                final ReentrantLock lock = this.lock;
                lock.lock();
                try {
                    return indexOf(x) != -1;
                } finally {
                    lock.unlock();
                }
            }
    
            public boolean remove(Object x) {
                final ReentrantLock lock = this.lock;
                lock.lock();
                try {
                    int i = indexOf(x);
                    if (i < 0)
                        return false;
    
                    setIndex(queue[i], -1);
                    int s = --size;
                    RunnableScheduledFuture replacement = queue[s];
                    queue[s] = null;
                    if (s != i) {
                        siftDown(i, replacement);
                        if (queue[i] == replacement)
                            siftUp(i, replacement);
                    }
                    return true;
                } finally {
                    lock.unlock();
                }
            }
    
            public int size() {
                final ReentrantLock lock = this.lock;
                lock.lock();
                try {
                    return size;
                } finally {
                    lock.unlock();
                }
            }
    
            public boolean isEmpty() {
                return size() == 0;
            }
    
            public int remainingCapacity() {
                return Integer.MAX_VALUE;
            }
    
            public RunnableScheduledFuture peek() {
                final ReentrantLock lock = this.lock;
                lock.lock();
                try {
                    return queue[0];
                } finally {
                    lock.unlock();
                }
            }
    
            public boolean offer(Runnable x) {
                if (x == null)
                    throw new NullPointerException();
                RunnableScheduledFuture e = (RunnableScheduledFuture)x;
                final ReentrantLock lock = this.lock;
                lock.lock();
                try {
                    int i = size;
                    if (i >= queue.length)
                        grow();
                    size = i + 1;
                    if (i == 0) {
                        queue[0] = e;
                        setIndex(e, 0);
                    } else {
                        siftUp(i, e);
                    }
                    if (queue[0] == e) {
                        leader = null;
                        available.signal();
                    }
                } finally {
                    lock.unlock();
                }
                return true;
            }
    
            public void put(Runnable e) {
                offer(e);
            }
    
            public boolean add(Runnable e) {
                return offer(e);
            }
    
            public boolean offer(Runnable e, long timeout, TimeUnit unit) {
                return offer(e);
            }
    
            /**
             * Performs common bookkeeping for poll and take: Replaces
             * first element with last and sifts it down.  Call only when
             * holding lock.
             * @param f the task to remove and return
             */
            private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
                int s = --size;
                RunnableScheduledFuture x = queue[s];
                queue[s] = null;
                if (s != 0)
                    siftDown(0, x);
                setIndex(f, -1);
                return f;
            }
    
            public RunnableScheduledFuture poll() {
                final ReentrantLock lock = this.lock;
                lock.lock();
                try {
                    RunnableScheduledFuture first = queue[0];
                    if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
                        return null;
                    else
                        return finishPoll(first);
                } finally {
                    lock.unlock();
                }
            }
    
            public RunnableScheduledFuture take() throws InterruptedException {
                final ReentrantLock lock = this.lock;
                lock.lockInterruptibly();
                try {
                    for (;;) {
                        RunnableScheduledFuture first = queue[0];
                        if (first == null)
                            available.await();
                        else {
                            long delay = first.getDelay(TimeUnit.NANOSECONDS);
                            if (delay <= 0)
                                return finishPoll(first);
                            else if (leader != null)
                                available.await();
                            else {
                                Thread thisThread = Thread.currentThread();
                                leader = thisThread;
                                try {
                                    available.awaitNanos(delay);
                                } finally {
                                    if (leader == thisThread)
                                        leader = null;
                                }
                            }
                        }
                    }
                } finally {
                    if (leader == null && queue[0] != null)
                        available.signal();
                    lock.unlock();
                }
            }
    
            public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
                throws InterruptedException {
                long nanos = unit.toNanos(timeout);
                final ReentrantLock lock = this.lock;
                lock.lockInterruptibly();
                try {
                    for (;;) {
                        RunnableScheduledFuture first = queue[0];
                        if (first == null) {
                            if (nanos <= 0)
                                return null;
                            else
                                nanos = available.awaitNanos(nanos);
                        } else {
                            long delay = first.getDelay(TimeUnit.NANOSECONDS);
                            if (delay <= 0)
                                return finishPoll(first);
                            if (nanos <= 0)
                                return null;
                            if (nanos < delay || leader != null)
                                nanos = available.awaitNanos(nanos);
                            else {
                                Thread thisThread = Thread.currentThread();
                                leader = thisThread;
                                try {
                                    long timeLeft = available.awaitNanos(delay);
                                    nanos -= delay - timeLeft;
                                } finally {
                                    if (leader == thisThread)
                                        leader = null;
                                }
                            }
                        }
                    }
                } finally {
                    if (leader == null && queue[0] != null)
                        available.signal();
                    lock.unlock();
                }
            }
    
            public void clear() {
                final ReentrantLock lock = this.lock;
                lock.lock();
                try {
                    for (int i = 0; i < size; i++) {
                        RunnableScheduledFuture t = queue[i];
                        if (t != null) {
                            queue[i] = null;
                            setIndex(t, -1);
                        }
                    }
                    size = 0;
                } finally {
                    lock.unlock();
                }
            }
    
            /**
             * Return and remove first element only if it is expired.
             * Used only by drainTo.  Call only when holding lock.
             */
            private RunnableScheduledFuture pollExpired() {
                RunnableScheduledFuture first = queue[0];
                if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
                    return null;
                return finishPoll(first);
            }
    
            public int drainTo(Collection<? super Runnable> c) {
                if (c == null)
                    throw new NullPointerException();
                if (c == this)
                    throw new IllegalArgumentException();
                final ReentrantLock lock = this.lock;
                lock.lock();
                try {
                    RunnableScheduledFuture first;
                    int n = 0;
                    while ((first = pollExpired()) != null) {
                        c.add(first);
                        ++n;
                    }
                    return n;
                } finally {
                    lock.unlock();
                }
            }
    
            public int drainTo(Collection<? super Runnable> c, int maxElements) {
                if (c == null)
                    throw new NullPointerException();
                if (c == this)
                    throw new IllegalArgumentException();
                if (maxElements <= 0)
                    return 0;
                final ReentrantLock lock = this.lock;
                lock.lock();
                try {
                    RunnableScheduledFuture first;
                    int n = 0;
                    while (n < maxElements && (first = pollExpired()) != null) {
                        c.add(first);
                        ++n;
                    }
                    return n;
                } finally {
                    lock.unlock();
                }
            }
    
            public Object[] toArray() {
                final ReentrantLock lock = this.lock;
                lock.lock();
                try {
                    return Arrays.copyOf(queue, size, Object[].class);
                } finally {
                    lock.unlock();
                }
            }
    
            @SuppressWarnings("unchecked")
            public <T> T[] toArray(T[] a) {
                final ReentrantLock lock = this.lock;
                lock.lock();
                try {
                    if (a.length < size)
                        return (T[]) Arrays.copyOf(queue, size, a.getClass());
                    System.arraycopy(queue, 0, a, 0, size);
                    if (a.length > size)
                        a[size] = null;
                    return a;
                } finally {
                    lock.unlock();
                }
            }
    
            public Iterator<Runnable> iterator() {
                return new Itr(Arrays.copyOf(queue, size));
            }
    
            /**
             * Snapshot iterator that works off copy of underlying q array.
             */
            private class Itr implements Iterator<Runnable> {
                final RunnableScheduledFuture[] array;
                int cursor = 0;     // index of next element to return
                int lastRet = -1;   // index of last element, or -1 if no such
    
                Itr(RunnableScheduledFuture[] array) {
                    this.array = array;
                }
    
                public boolean hasNext() {
                    return cursor < array.length;
                }
    
                public Runnable next() {
                    if (cursor >= array.length)
                        throw new NoSuchElementException();
                    lastRet = cursor;
                    return array[cursor++];
                }
    
                public void remove() {
                    if (lastRet < 0)
                        throw new IllegalStateException();
                    DelayedWorkQueue.this.remove(array[lastRet]);
                    lastRet = -1;
                }
            }
        }
    View Code

    可以看到DelayedWorkQueue基本上就是实现了基于堆的优先级队列,take()、poll()函数实现为DelayQueue的延迟获取。他增加了两点服务,1.为堆中的每个元素RunnableScheduledFuture对象,增加了其在堆中的索引的字段。2.应用了leader-follower模式来最小化并发取元素的等待时间。leader线程在Delay时间后一定会被唤醒,去获取queue[0],take()或poll()成功后,释放leader权,其它等待的线程竞争leader权。每当一个新的RunnableScheduledFuture对象成为queue[0]时,执行leader=null, available.signal()设置leader为空,释放lock Condition。如果此时有新的线程取竞争获取queue[0],它就会成为leader,由它来释放Condition,否则还是有原来的leader线程释放Condition。

    主要成员

    //False if should cancel/suppress periodic tasks on shutdown
    private volatile boolean continueExistingPeriodicTasksAfterShutdown;
    //False if should cancel non-periodic tasks on shutdown.
    private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
    //True if ScheduledFutureTask.cancel should remove from queue
    private volatile boolean removeOnCancel = false;
    //Sequence number to break scheduling ties, and in turn to guarantee FIFO order among tied entries.
    private static final AtomicLong sequencer = new AtomicLong(0);

     主要方法

    ScheduledThreadPoolExecutor对提交的任务制定了不同的执行方法,通过schedule()方法来执行:

        public <T> Future<T> submit(Callable<T> task) {
            return schedule(task, 0, TimeUnit.NANOSECONDS);
        }
    
        public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                               long delay,
                                               TimeUnit unit) {
            if (callable == null || unit == null)
                throw new NullPointerException();
            RunnableScheduledFuture<V> t = decorateTask(callable,
                new ScheduledFutureTask<V>(callable,
                                           triggerTime(delay, unit)));
            delayedExecute(t);
            return t;
        }
    View Code

    他首先调用decorateTask()方法将callable封装成RunnableScheduledFuture,然后调用delayedExecute()执行。

        protected <V> RunnableScheduledFuture<V> decorateTask(
            Callable<V> callable, RunnableScheduledFuture<V> task) {
            return task;
        }
    
        private void delayedExecute(RunnableScheduledFuture<?> task) {
            if (isShutdown())
                reject(task);
            else {
                super.getQueue().add(task);
                if (isShutdown() &&
                    !canRunInCurrentRunState(task.isPeriodic()) &&
                    remove(task))
                    task.cancel(false);
                else
                    ensurePrestart();
            }
        }
    View Code

    delayedExecute(task)在可执行的情况下将task放入workQueue中。

    Executors

    总结

  • 相关阅读:
    [整理]Cadence 生成带有网络追踪的 PDF 原理图
    [整理]FSM 有限状态机
    [原创]Quartus 中调用 Modelsim 波形仿真
    [原创]SPI 协议介绍以及基于 Verilog 的 IP 核实现
    [原创]Verilog 代码编程规范(个人用)
    [算法]线段树
    [算法]tarjan
    poj3280
    poj 3258 River Hopscotch
    [poj 1251]Jungle Roads
  • 原文地址:https://www.cnblogs.com/qquan/p/5616373.html
Copyright © 2011-2022 走看看