zoukankan      html  css  js  c++  java
  • 【JUC源码解析】ScheduledThreadPoolExecutor

    简介

    它是一个线程池执行器(ThreadPoolExecutor),在给定的延迟(delay)后执行。在多线程或者对灵活性有要求的环境下,要优于java.util.Timer。

    提交的任务在执行之前支持取消,默认情况下,在延迟到来之前,不会自动从队列中删除,但可以设置,使其立刻从队列中移除。

    有两种模式,固定频率(scheduleAtFixedRate)和固定延迟(scheduleWithFixedDelay),不管哪种模式,同一个任务不会被叠加执行,即便是不同的线程执行同一个任务。

    继承ThreadPoolExecutor,维护一个固定大小的线程池和一个无界延迟队列(delay queue)。

    ScheduledFutureTask,用来描述要执行的任务,DelayedWorkQueue,则是装在这些任务的delay queue.

    固定频率

    一个任务,从第一次开始执行的时间点开始,每隔一定的时间执行一次,如果执行的时间大于间隔时间,则要等这次执行结束,再执行下一次。

    如上图所示,蓝色表示任务执行,白色表示间隔时间。

    固定延迟

    一个任务,每一次执行结束之后,延迟一定的时间,执行下一次。

     

     如上图所示,蓝色表示任务执行,白色表示间隔时间。

    源码分析

    属性

    1     private volatile boolean continueExistingPeriodicTasksAfterShutdown; // shut down之后,是否取消period任务
    2 
    3     private volatile boolean executeExistingDelayedTasksAfterShutdown = true; // shut down之后,是否取消non-period任务
    4 
    5     private volatile boolean removeOnCancel = false; // cancel后,是否从队列里移除此任务
    6 
    7     private static final AtomicLong sequencer = new AtomicLong(); // 给任务编号

    ScheduledFutureTask

    属性

    1         private final long sequenceNumber; // 序列编号
    2 
    3         private long time; // 执行时间
    4 
    5         private final long period; // 周期,正值:固定频率;负值:固定延迟;0:不重复执行
    6 
    7         RunnableScheduledFuture<V> outerTask = this; // 实际任务
    8 
    9         int heapIndex; // 堆索引

    构造方法

     1         ScheduledFutureTask(Runnable r, V result, long ns) {
     2             super(r, result);
     3             this.time = ns;
     4             this.period = 0; // 不重复执行
     5             this.sequenceNumber = sequencer.getAndIncrement();
     6         }
     7 
     8         ScheduledFutureTask(Runnable r, V result, long ns, long period) {
     9             super(r, result);
    10             this.time = ns;
    11             this.period = period;
    12             this.sequenceNumber = sequencer.getAndIncrement();
    13         }
    14 
    15         ScheduledFutureTask(Callable<V> callable, long ns) {
    16             super(callable);
    17             this.time = ns;
    18             this.period = 0;
    19             this.sequenceNumber = sequencer.getAndIncrement();
    20         }

    关键方法

    getDelay(TimeUnit)

    1         public long getDelay(TimeUnit unit) {
    2             return unit.convert(time - now(), NANOSECONDS);
    3         }

    compareTo(Delayed other)

     1         public int compareTo(Delayed other) { // 根据延迟比较元素,在延迟队列中,延迟越小越靠前,延迟最小的在队首,最先出队被执行
     2             if (other == this)
     3                 return 0;
     4             if (other instanceof ScheduledFutureTask) {
     5                 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>) other;
     6                 long diff = time - x.time;
     7                 if (diff < 0)
     8                     return -1;
     9                 else if (diff > 0)
    10                     return 1;
    11                 else if (sequenceNumber < x.sequenceNumber)
    12                     return -1;
    13                 else
    14                     return 1;
    15             }
    16             long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    17             return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
    18         }

    setNextRunTime()

    1         private void setNextRunTime() { // 设置下一次执行时间
    2             long p = period;
    3             if (p > 0) // 固定频率,从第一次时间点,每次加period
    4                 time += p;
    5             else
    6                 time = triggerTime(-p); // 固定延迟,每次执行结束后,加period作为下一次执行时间
    7         }

    cancel(boolean mayInterruptIfRunning)

    1         public boolean cancel(boolean mayInterruptIfRunning) { // 取消任务
    2             boolean cancelled = super.cancel(mayInterruptIfRunning);
    3             if (cancelled && removeOnCancel && heapIndex >= 0)
    4                 remove(this);
    5             return cancelled;
    6         }

    run()

     1         public void run() { // 执行任务
     2             boolean periodic = isPeriodic();
     3             if (!canRunInCurrentRunState(periodic))
     4                 cancel(false);
     5             else if (!periodic)
     6                 ScheduledFutureTask.super.run(); // 单次执行
     7             else if (ScheduledFutureTask.super.runAndReset()) { // 周期执行,runAndReset
     8                 setNextRunTime(); // 设置下次执行时间
     9                 reExecutePeriodic(outerTask); // 重新加入队列
    10             }
    11         }

    构造方法

     1     public ScheduledThreadPoolExecutor(int corePoolSize) {
     2         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
     3     }
     4 
     5     public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
     6         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory);
     7     }
     8 
     9     public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
    10         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler);
    11     }
    12 
    13     public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory,
    14             RejectedExecutionHandler handler) {
    15         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler);
    16     }

    关键方法

    scheduleAtFixedRate

     1     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { // 固定频率
     2         if (command == null || unit == null)
     3             throw new NullPointerException();
     4         if (period <= 0)
     5             throw new IllegalArgumentException();
     6         ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit),
     7                 unit.toNanos(period)); // period 大于 0
     8         RunnableScheduledFuture<Void> t = decorateTask(command, sft);
     9         sft.outerTask = t;
    10         delayedExecute(t);
    11         return t;
    12     }

    scheduleWithFixedDelay

     1     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { // 固定延迟
     2         if (command == null || unit == null)
     3             throw new NullPointerException();
     4         if (delay <= 0)
     5             throw new IllegalArgumentException();
     6         ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit),
     7                 unit.toNanos(-delay)); // -delay 小于 0
     8         RunnableScheduledFuture<Void> t = decorateTask(command, sft);
     9         sft.outerTask = t;
    10         delayedExecute(t);
    11         return t;
    12     }

    delayedExecute(RunnableScheduledFuture<?> task)

     1     private void delayedExecute(RunnableScheduledFuture<?> task) {
     2         if (isShutdown()) // 如果线程池已经shut down,则拒绝任务
     3             reject(task);
     4         else {
     5             super.getQueue().add(task); // 否则,添加任务到延迟队列
     6             if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
     7                 task.cancel(false); // 根据run-after-shutdown参数,决定是否取任务
     8             else
     9                 ensurePrestart(); // 保证线程启动
    10         }
    11     }

    reExecutePeriodic(RunnableScheduledFuture<?> task)

    1     void reExecutePeriodic(RunnableScheduledFuture<?> task) { // 周期性任务重新入队,策略同delayedExecute
    2         if (canRunInCurrentRunState(true)) {
    3             super.getQueue().add(task);
    4             if (!canRunInCurrentRunState(true) && remove(task))
    5                 task.cancel(false);
    6             else
    7                 ensurePrestart();
    8         }
    9     }

    ensurePrestart()

    1     void ensurePrestart() {
    2         int wc = workerCountOf(ctl.get());
    3         if (wc < corePoolSize)
    4             addWorker(null, true);
    5         else if (wc == 0)
    6             addWorker(null, false);
    7     }

    该方法在ThreadPoolExecutor类,保证线程池中至少有一个活动线程。

    triggerTime()

     1     long triggerTime(long delay) { // 返回延迟动作的触发时间
     2         return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
     3     }
     4 
     5     private long overflowFree(long delay) { // 处理溢出情况
     6         Delayed head = (Delayed) super.getQueue().peek();
     7         if (head != null) {
     8             long headDelay = head.getDelay(NANOSECONDS);
     9             if (headDelay < 0 && (delay - headDelay < 0))
    10                 delay = Long.MAX_VALUE + headDelay;
    11         }
    12         return delay;
    13     }

    DelayedWorkQueue

    属性

    1         private static final int INITIAL_CAPACITY = 16; // 初始容量
    2         private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; // 堆,充当优先级队列
    3         private final ReentrantLock lock = new ReentrantLock(); // 可重入锁
    4         private int size = 0;
    5         private Thread leader = null; // 领导者线程
    6         private final Condition available = lock.newCondition(); // 条件队列

    关键方法

    以下这些方法的解释可参考前两篇文章,【JUC源码解析】DelayQueue【JUC源码解析】PriorityBlockingQueue

    siftUp

     1         private void siftUp(int k, RunnableScheduledFuture<?> key) { // 向上调整,同
     2             while (k > 0) {
     3                 int parent = (k - 1) >>> 1;
     4                 RunnableScheduledFuture<?> e = queue[parent];
     5                 if (key.compareTo(e) >= 0)
     6                     break;
     7                 queue[k] = e;
     8                 setIndex(e, k);
     9                 k = parent;
    10             }
    11             queue[k] = key;
    12             setIndex(key, k);
    13         }

    siftDown

     1         private void siftDown(int k, RunnableScheduledFuture<?> key) { // 向下调整
     2             int half = size >>> 1;
     3             while (k < half) {
     4                 int child = (k << 1) + 1;
     5                 RunnableScheduledFuture<?> c = queue[child];
     6                 int right = child + 1;
     7                 if (right < size && c.compareTo(queue[right]) > 0)
     8                     c = queue[child = right];
     9                 if (key.compareTo(c) <= 0)
    10                     break;
    11                 queue[k] = c;
    12                 setIndex(c, k);
    13                 k = child;
    14             }
    15             queue[k] = key;
    16             setIndex(key, k);
    17         }

    grow

    1         private void grow() { // 扩容
    2             int oldCapacity = queue.length;
    3             int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
    4             if (newCapacity < 0) // overflow
    5                 newCapacity = Integer.MAX_VALUE;
    6             queue = Arrays.copyOf(queue, newCapacity);
    7         }

    offer

     1         public boolean offer(Runnable x) {
     2             if (x == null)
     3                 throw new NullPointerException();
     4             RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>) x;
     5             final ReentrantLock lock = this.lock;
     6             lock.lock();
     7             try {
     8                 int i = size;
     9                 if (i >= queue.length)
    10                     grow();
    11                 size = i + 1;
    12                 if (i == 0) {
    13                     queue[0] = e;
    14                     setIndex(e, 0);
    15                 } else {
    16                     siftUp(i, e);
    17                 }
    18                 if (queue[0] == e) {
    19                     leader = null;
    20                     available.signal();
    21                 }
    22             } finally {
    23                 lock.unlock();
    24             }
    25             return true;
    26         }

    take

     1         public RunnableScheduledFuture<?> take() throws InterruptedException {
     2             final ReentrantLock lock = this.lock;
     3             lock.lockInterruptibly();
     4             try {
     5                 for (;;) {
     6                     RunnableScheduledFuture<?> first = queue[0];
     7                     if (first == null)
     8                         available.await();
     9                     else {
    10                         long delay = first.getDelay(NANOSECONDS);
    11                         if (delay <= 0)
    12                             return finishPoll(first);
    13                         first = null; // don't retain ref while waiting
    14                         if (leader != null)
    15                             available.await();
    16                         else {
    17                             Thread thisThread = Thread.currentThread();
    18                             leader = thisThread;
    19                             try {
    20                                 available.awaitNanos(delay);
    21                             } finally {
    22                                 if (leader == thisThread)
    23                                     leader = null;
    24                             }
    25                         }
    26                     }
    27                 }
    28             } finally {
    29                 if (leader == null && queue[0] != null)
    30                     available.signal();
    31                 lock.unlock();
    32             }
    33         }

    行文至此结束。

    尊重他人的劳动,转载请注明出处:http://www.cnblogs.com/aniao/p/aniao_stpe.html

  • 相关阅读:
    Vue 4.0及以上修改默认8080端口号
    EasyRTC进入会议室视频父组件传递数据给子组件显示为null的问题修复
    EasyRTC通过Golang缓存库fastcache实现在线用户存储在内存中加快速度
    视频远程通话系统EasyRTC 日志显示调用位置不正确如何优化?
    通过浏览器使用WebRTC时会话终止或断开都是什么原因?
    为什么我们要用BS架构来开发流媒体平台?
    TSINGSEE青犀视频流媒体平台为什么会存在跨域问题?
    EasyDSS/EasyNVR传输高清视频如何优化及节省带宽消耗?
    【解决方案】TSINGSEE青犀视频AI+智慧工地,助力工地安全生产
    【解决方案】TSINGSEE青犀视频助力危化企业安全生产,实现AI全流程监管
  • 原文地址:https://www.cnblogs.com/aniao/p/aniao_stpe.html
Copyright © 2011-2022 走看看