zoukankan      html  css  js  c++  java
  • Java 线程 — ScheduledThreadPoolExecutor

    ScheduledThreadPoolExecutor

    该类继承自ThreadPoolExecutor,增加了定时执行线程和延迟启动的功能,这两个功能是通过延时队列DelayedWorkQueue辅助实现的。
    线程池里面的线程需要从队列里面获取任务,任务根据延时时长是有顺序的,线程池的线一直获取延时最短的任务,也就是最小二叉堆中的堆顶元素,这个时候堆顶元素成为各个线程争夺的资源,

    1. 在获取堆顶元素的时候加锁(ReentrabtLock,可重入,独占锁),这样获取到锁的线程开始获取堆顶元素,其他线程在不能获取锁被阻塞
    2. 如果堆顶元素的延时还没有到,当前线程成为leader线程,进入超时等待
      1.1. 这个时候其他被阻塞的线程有机会获取锁
      1.2. 获取锁的线程发现leader线程已经另有其人(leader != null)
      1.3. 线程进入等待,available.await();
    3. 如果线程等待正常结束(时间已到),让出leander地位,再次进入循环,发现delayed <= 0,获取对顶元素,并重新堆化筛选出堆顶元素,调用available.signal()唤醒等待的线程(比如1.1的情况),释放锁
      3.1 假设是1.1里面的线程被唤醒(实际不一定,唤醒的也可能是其他线程)
      3.2 重复1、2、3、4的流程
    4. 线程获取到任务开始运行,运行ScheduledFutureTask.run方法,如果是定时任务的话,会重新计算延时时间,将任务加入队列,等待下次运行

    DelayedWorkQueue

    这个队列是一个阻塞的队列,队列基于二叉堆实现的,根据线程距离下次运行的时间比较大小,所以添加和删除元素都是二叉堆的重新堆化

    offer

    put、add都是调用下面的offer方法

    public boolean offer(Runnable x) {
        if (x == null)
            throw new NullPointerException();
        RunnableScheduledFuture e = (RunnableScheduledFuture)x;
        final ReentrantLock lock = this.lock;
        // 获取锁
        lock.lock();
        try {
            int i = size;
            if (i >= queue.length)
            	// 如果队列已满进行扩容
                grow();
            size = i + 1;
            if (i == 0) {
            	// 第一个元素直接入队
                queue[0] = e;
                setIndex(e, 0);
            } else {
            	// 加入新元素重新堆化
                siftUp(i, e);
            }
            if (queue[0] == e) {
            	// 如果原来队列为空,说明可能有线程在等待,所以唤醒一个线程
                leader = null;
                available.signal();
            }
        } finally {
            lock.unlock();
        }
        return true;
    }
    
    // 队列扩容,每次增加50%,直到Integer的最大值
    private void grow() {
        int oldCapacity = queue.length;
        int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
        if (newCapacity < 0) // overflow
            newCapacity = Integer.MAX_VALUE;
        queue = Arrays.copyOf(queue, newCapacity);
    }
    
    // 新加入元素之后重新堆化,最小堆
    private void siftUp(int k, RunnableScheduledFuture key) {
        while (k > 0) {
        	// 二叉堆的特性父节点的序号 = (当前节点序号 - 1) / 2
            int parent = (k - 1) >>> 1;
            RunnableScheduledFuture e = queue[parent];
            // 找到新加入元素合适的位置
            if (key.compareTo(e) >= 0)
                break;
            queue[k] = e;
            setIndex(e, k);
            k = parent;
        }
        // 新元素入队列
        queue[k] = key;
        setIndex(key, k);
    }
    
    

    take

    take的时候使用的是leader-follow模式,只有一个leader,其他都是follow,在每次finishPoll的时候都会选举出新的

    public RunnableScheduledFuture take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                RunnableScheduledFuture first = queue[0];
                if (first == null)
                	// 队列为空则进入等待
                    available.await();
                else {
                    long delay = first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay <= 0)
                    	// 延迟或者定时时间(其实定时间也是一种延迟)到,从队列中取出任务执行
                        return finishPoll(first);
                    // 如果leader != null 说明leader是另外的线程(有可能是leader线程在available.awaitNanos(delay))是leader,那么当前线程进入等待
                    else if (leader != null)
                        available.await();
                    else {
                    	// 没有leader线程的时候,当前线程成为新的leader
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                        	// 这里进行超时等待,超过delay之后就会恢复运行,或者是被其他线程唤醒
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                            	// 重置leader以便进入下一次循环
                                leader = null;
                        }
                    }
                }
            }
        } finally {
        	// 队列不为空的时候发出signal,leader == null的条件是防止leader线程在available.awaitNanos(delay)的时候被唤醒
            if (leader == null && queue[0] != null)
                available.signal();
            lock.unlock();
        }
    }
    
    // 返回第一个等待的线程(延时已到),并将剩余元素再次堆化
    private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
        int s = --size;
        RunnableScheduledFuture x = queue[s];
        queue[s] = null;
        if (s != 0)
            siftDown(0, x);
        setIndex(f, -1);
        return f;
    }
    
    // 因为key是原来堆中的元素位于堆得最底层,key本来就是较大的元素,
    private void siftDown(int k, RunnableScheduledFuture key) {
        int half = size >>> 1;
        while (k < half) {
            int child = (k << 1) + 1;
            RunnableScheduledFuture c = queue[child];
            int right = child + 1;
            if (right < size && c.compareTo(queue[right]) > 0)
                c = queue[child = right];
            if (key.compareTo(c) <= 0)
            	// 找到key的位置,大于父节点,小于子节点
                break;
            queue[k] = c;
            setIndex(c, k);
            k = child;
        }
        queue[k] = key;
        setIndex(key, k);
    }
    

    问题

    period线程怎么实现定时调用

    setNextRunTime会重新计算下次运行需要等待的时间,因为period线程运行完后已经从队列中删除,在reExecutePeriodic方法中会重新进入队列,调用ensurePrestart重新开始执行任务

    public void run() {
        boolean periodic = isPeriodic();
        if (!canRunInCurrentRunState(periodic))
            cancel(false);
        else if (!periodic)
        	// 非定时线程调用FutureTask的run方法
            ScheduledFutureTask.super.run();
        else if (ScheduledFutureTask.super.runAndReset()) {	// 定时线程调用FutureTask的runAndReset方法
        	// 设置下次运行时间
            setNextRunTime();
            // 重新准备运行
            reExecutePeriodic(outerTask);
        }
    }
    
    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        if (canRunInCurrentRunState(true)) {
        	// task进入队列
            super.getQueue().add(task);
            if (!canRunInCurrentRunState(true) && remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }
    
    void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        // 进入线程池等待运行,接下来就和ThreadPoolExecutor运行顺序一样了
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }
    

    线程运行完成之后任务会不会从队列中删除,怎么删除的?

    会删除,在finnishPoll中,重新堆化选出堆顶元素,原来的堆顶元素被覆盖,也就是删除了

  • 相关阅读:
    数据结构与算法之PHP实现二叉树的遍历
    数据结构与算法之二叉树的基本概念和类型
    聚集索引,非聚集索引,覆盖索引 原理
    Vue学习笔记:methods、computed、watch的区别
    xsl 和xml transform方法的调用
    Chrome , Firfox 不支持fireEvent的方法
    分布式存储
    firefox并不支持selectSingleNode和selectNodes的解决方法
    503 Service Unavailable
    处理【由于 Web 服务器上的“ISAPI 和 CGI 限制”列表设置,无法提供您请求的页面】
  • 原文地址:https://www.cnblogs.com/sunshine-2015/p/6091438.html
Copyright © 2011-2022 走看看