zoukankan      html  css  js  c++  java
  • 【详解】ThreadPoolExecutor源码阅读(二)

    系列目录

    AQS在Worker中的应用——标识空闲or非空闲工作线程

    我对这个上锁一直搞不懂,虽然有注释说是允许中断啥的,但是还是一头雾水,就打算直接看代码分析。第一眼看到这个lock的时候,我就吓到了。

    产生了第一个问题:"啥,一上锁,多个线程不是就要同步排队了嘛? 而且也没这必要啊! "

    看清楚了才知道,锁来自于方法参数Worker,也就是说,每个线程请求的同步锁都是各自的Worker的锁,故不存在这些个线程竞争一个锁的情况。

    问题又来了我自己的锁,又没人跟我抢,犯得着每做一个任务都上锁吗?

    实际上是有的,只是在这个方法里,不会发生竞争。

    注:此段代码同【详解】ThreadPoolExecutor源码阅读(一) 中的runWorker。

    final void runWorker(Worker w) {
        //获得当前执行这段代码的线程
        Thread wt = Thread.currentThread();
        //先尝试从worker取得初始任务
        Runnable task = w.firstTask;
        w.firstTask = null;
        //允许中断,unlock后state=1,中断方法获取到锁,则判断为空闲线程,可中断
        w.unlock(); 
        boolean completedAbruptly = true;
        try {
            //不断地取任务执行、 其中getTask提供阻塞。如果getTask返回null则退出循环
            while (task != null || (task = getTask()) != null) {
                //获取锁,标识此线程正在工作,非空闲线程
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //钩子函数,空实现,子类可根据需要进行实现
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //运行获取到的任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //钩子函数
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            //如果因为异常退出,这段语句不会被执行,也就是说completedAbruptly==true
            completedAbruptly = false;
        } finally {
            //工作线程退出的处理操作,如获取当前worker完成的任务量
            //如果异常退出,还需弥补,补充工作线程等等
            processWorkerExit(w, completedAbruptly);
        }
    }

    interruptIdleWorkers 中断空闲线程

    注释不是说了吗,允许中断,那肯定跟中断有关,朝这个方向去找啊。当然,我当时并没有这样去找,而是机缘巧合,看到ThreadPoolExecutor其他代码的时候突然意识到的。

    我先看到了shutdown方法,发现有中断空闲Worker的方法。但是在此之前,我并不知道线程池是如何区别Worker线程是空闲还是忙碌的,只知道线程池有workers集合用来存储创建的Worker。

    于是,我就顺着方法查看下去。找到关闭空闲Worker方法的实现。

    注:shutdown的语义是,关闭线程池,停止接收新的任务,继续执行任务队列中的任务。中断多余的空闲线程。

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        //获取线程池锁
        mainLock.lock();
        try {
            //检查执行线程是否有权关闭线程池,暂未深入了解
            checkShutdownAccess();
            //更改线程池运行状态为SHUTDOWN
            advanceRunState(SHUTDOWN);
            //中断空闲线程
            interruptIdleWorkers();
            //钩子函数
            onShutdown(); 
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }

    interruptIdleWorkers方法的注释:

    关闭等待任务的线程(也就是没有被上锁的线程),由此可得Worker有没有获得锁,是区分其是否空闲的标志。结合源码:

    private void interruptIdleWorkers(boolean onlyOne) {
        //获取线程池的锁,保持独占访问
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //遍历workers集合中的所有工作线程
            for (Worker w : workers) {
                //获得worker对象中的线程引用
                Thread t = w.thread;
                //如果获得锁成功,则中断对应线程
                //如果工作线程正在执行任务,因为开始执行前,任务会获取worker的锁,故其无法被中断
                //如果工作线程正在等待任务,因其没获得锁,则当前线程可以获得其worker的锁,此工作线程被中断
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                //如果只需要关闭一个工作线程,则到此为止
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

     那问题又来了,如何终止已经开始的任务呢?

    这里终止已经开始的任务,就是shutdownNow方法要做的。(shutdownNow就是停止所有任务,已经开始也要停止。其对应的线程池状态是STOP

    前面由于无法获取到Worker的锁,故无法通过interruptIdleWorkers方法将其中断。但是ThreadPoolExecutor还提供了interruptWorkers方法,该方法不用获取锁,直接调用Worker的interruptIfstarted方法中断线程。

     AQS(AbstractQueuedSynchronizer)在Worker中的锁管理方式

    AQS是基于状态和等待队列的同步器,这个实例中,Worker继承于AQS。AQS的acquire依赖于tryAcquire,release依赖于tryAcquire。而这两个方法它自己都没有实现,而是由子类提供。(模板方法设计模式的一种体现)。

    我们先来看看获取锁的操作,以下代码来自AQS

    public final void acquire(int arg) {
        //注意,java表达式会短路,如果前面的结果使得表达式结果固定,那么后面的代码就不会被执行
        //这里如果tryAcquire方法返回true, 那么!tryAcquire就是false,false '与' 任何东西都是false,故后面的表达式不会被执行
        //也就是说如果,一次请求获取成功,则此方法直接结束,返回。如果请求失败则加入到等待队列中,代码在这里停顿
        //如果等待过程被中断,则中断当前线程
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    AQS没有提供tryAcquire方法的实现

    protected boolean tryAcquire(int arg) {
         throw new UnsupportedOperationException();
    }

    acquire依赖于tryAcquire方法,如果该方法成功,则acquire方法直接返回,如果失败,则将当前线程加入等待队列(此操作将park当前线程,使其进入waiting状态)。

    我们来看看Worker中是如何实现tryAcquire方法的:

    protected boolean tryAcquire(int unused) { //这里指明了参数无用,方法体内都是写死的
        //利用CAS, 如果当前state值为0,则更改为1
        //如果其他线程已获得锁,那么state就是1, 而不是预期的0,则此方法失败
        if (compareAndSetState(0, 1)) {
            //设置当前独占拥有者线程
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    我们再来看看release操作,以下代码来自AQS:

    public final boolean release(int arg) {
        //尝试释放,如果释放成功,则唤醒等待队列中的第一个线程
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    同样的,AQS没有实现tryRelease方法

    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

    Worker中的实现:

    protected boolean tryRelease(int unused) {
        //清空当前独占拥有者
        setExclusiveOwnerThread(null);
        //设置状态为0
        setState(0);
        return true;
    }

    将当前获取独占锁的线程置为null,然后将state为0,这里与前面tryAcquire一一对应。这个时候其他线程就可以获取锁了。

  • 相关阅读:
    eyoucms遍历子栏目,有二级栏目的点击展开或者收缩
    eyoucms 遍历栏目下的子栏目
    帝国cms 内容页根据关键词来调用相关内容
    帝国cms 上传的图片前台不显示
    帝国cms 通过字段内容来获取数据
    eyoucms 去掉 index.php后缀
    通过jquery插件复制文字
    帝国cms 表单弹窗提交,判断后才能提交到后台
    动态库和静态库
    J-520-2018年第二届河北省大学生程序设计竞赛(快速幂取模)
  • 原文地址:https://www.cnblogs.com/longfurcat/p/9892395.html
Copyright © 2011-2022 走看看