zoukankan      html  css  js  c++  java
  • 【详解】ThreadPoolExecutor源码阅读(一)

    系列目录

    工作原理简介

      ThreadPoolExecutor会创建一组工作线程,每当一个工作线程完成其任务的时候,会向任务队列获取新的任务执行。如果任务队列为空,获取任务的线程将被阻塞。不出意外的话,工作线程会一直工作,直到线程池主动释放空闲线程,或者随着线程池的终结而结束。

        

    工作者线程

    在ThreadPoolExecutor中有一个内部类Worker,但这个Woker类并没有像想象中的那样继承于Thread,而是通过组合的方式绑定一个线程。在一定程度上,也可以把这个Worker看作是一个工作者线程

    可能是由于想要使用AbstractQueuedSynchronizer的功能吧,Java的类不支持多继承,就只好采取组合的方式来处理了

    这个Worker如何与一个线程绑定?

    这个工作者任务是在创建的时候与一个线程绑定的,其通过外部类ThreadPoolExecutor提供的线程工厂,创建一个线程,把自己传递给它,并保留线程的引用。

    Worker(Runnable firstTask) {
        //防止在runWorker之前被中断,因为worker一旦建立就会加入workers集合中
        //其他线程可能会中断空闲线程
        //而空闲线程的依据就是能否获得worker的锁
        setState(-1); 
        //设置初始任务,注意这里没有null检查,故初始任务可以为空
        this.firstTask = firstTask;
        //通过ThreadPoolExecutor的提供线程工厂来创建线程,并把自身赋值给它,作为其线程任务
        //保留线程引用,用于中断线程
        this.thread = getThreadFactory().newThread(this);
    }

    Worker绑定的线程何时启动?

    至此,线程的创建和绑定完成了(这里的线程指的只是Java的Thread对象),但是还没见到线程的启动(启动后才创建OS线程)。因为启动线程,必须通过Thread的start方法启动。那就来找找start方法在何处调用。

    在ThreadPoolExecutor的addWorker中,我们找到,当创建的Worker对象成功加入workers集合后,将启动对应线程。

    private boolean addWorker(Runnable firstTask, boolean core) { //core表示是否是核心线程
        //先试图改变控制信息内 工作线程数 的值
        retry:
        for (;;) {
            //获得控制信息
            int c = ctl.get();
            //从控制信息内 获取线程池运行状态
            int rs = runStateOf(c);
    
            //如果已经SHUTDOWN或者STOP则不再添加新工作线程
            //除非,在SHUTDOWN状态下,有任务尚未完成,不接受新任务
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
    
            for (;;) {
                //从控制信息内获取 工作线程数
                int wc = workerCountOf(c);
                //工作线程以超过容量 或 
                //核心线程,超过核心线程数
                //非核心线程超过最大线程数
                //不得添加新线程
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //CAS改变控制信息内  工作线程数的值 +1 ,并结束自旋
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }
    
    
        boolean workerStarted = false; //worker线程是否已经启动
        boolean workerAdded = false; //worker线程是否已加入workers集合
        Worker w = null;
        try {
            w = new Worker(firstTask); //创建新线程,把初始任务赋值给它
            final Thread t = w.thread; //获取Worker的线程引用
            if (t != null) {
                //因为要修改集合HashSet,故需获取线程池的锁,以保证线程安全
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    
                    //获取锁后再次检查状态,有可能在获得锁之前,线程池已经被shutdown了
                    int rs = runStateOf(ctl.get());
    
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) //提前检查线程能否start
                            throw new IllegalThreadStateException();
                        //把worker对象加入workers集合
                        workers.add(w);
                        int s = workers.size();
                        //更新largetstPoolSize,此字段表示线程池运行时,最多开启过多少个线程
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        //线程已加入集合,如果前面出现异常,这里不会被执行
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //如果添加成功,则启动线程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            //如果启动失败了,则表示添加Worker失败,回滚
            if (! workerStarted)
                //这个方法,会把前面添加到workers集合中的对应worker删除
                //并且把前面更新的 控制信息内的工作线程数再减回来
                addWorkerFailed(w);
        }
        return workerStarted;
    }

    那线程启动后,将执行什么方法呢?

      那当然是执行Thread对象的run方法了,由于这里采用的是传递Runnable对象的方式创建线程任务,故Thread的run方法执行的是其target的run方法。而这个target正是前面传递给它的Worker。故执行的是Worker的run方法,如下:

    这里的runWorker是其外部类ThreadPoolExecutor的方法。

    final void runWorker(Worker w) {
        //获得当前执行这段代码的线程
        Thread wt = Thread.currentThread();
        //先尝试从worker取得初始任务
        Runnable task = w.firstTask;
        w.firstTask = null;
        //允许中断,unlock后state=1,中断方法获取到锁,则判断为空闲线程,可中断
        w.unlock(); 
        boolean completedAbruptly = true;
        try {
            //不断地取任务执行、 其中getTask提供阻塞。如果getTask返回null则退出循环
            while (task != null || (task = getTask()) != null) {
                //获取锁,标识此线程正在工作,非空闲线程
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //钩子函数,空实现,子类可根据需要进行实现
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //运行获取到的任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //钩子函数
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            //如果因为异常退出,这段语句不会被执行,也就是说completedAbruptly==true
            completedAbruptly = false;
        } finally {
            //工作线程退出的处理操作,如获取当前worker完成的任务量
            //如果异常退出,还需弥补,补充工作线程等等
            processWorkerExit(w, completedAbruptly);
        }
    }

           注:这里还提供了beforeExecute和afterExecute两个钩子函数,如果子类有需要,可以覆盖它们。在这两个时刻做一些操作。

      也就是说,每个工作者任务绑定的线程,执行的就是上述代码。那么就会有多个线程访问上述代码。问题来了,上述代码会不会出现线程安全问题?

      线程安全问题多出于多个线程对同一资源的访问,但是上述代码中,每个线程操作的是各自绑定的Worker。这些线程唯一有交集的,就是取任务操作了。但是任务已经交由BlockingQueue处理了,BlockingQueue的同步特性使得多个线程能够安全地获取任务。也就是说,不会有线程安全问题。

    ThreadPoolExecutor与ThreadPool在线程池的实现上有何差别

    注:在之前的博文【胡思乱想】JNI与线程池的维护 中有引用一个线程池的实现案例,后文就叫他ThreadPool,该案例基本实现了线程池的功能。但是在实际生产中,由于有更细致的需求,线程池的实现也复杂的多。JDK就有线程池的实现,ThreadPoolExecutor。

    至此,我们来对比一下ThreadPoolExecutor与ThreadPool两个线程池实现的差别

    ThreadPool中,工作者线程完成手头任务后,是回归到线程池,等待ThreadPool给它分配任务。(ThreadPool是一个线程类),也就是说在ThreadPool的实现中线程池还有一个线程用来分发任务

    ThreadPoolExecutor中,工作者线程一旦完成手头的任务,就自行从队列中获取新的任务接着做。如果没有任务,将被阻塞,其线程池把任务分发(可能需要的同步,阻塞)的责任剥离了出来,交由BlockingQueue进行处理。

  • 相关阅读:
    发送邮件程序
    T-SQL存储过程、游标
    GPS经纬度换算成XY坐标
    开博了
    你应该知道的 50 个 Python 单行代码
    想提升java知识的同学请进
    adb工具包使用方法
    红米note3刷安卓原生
    hadoop 使用和javaAPI
    django学习——url的name
  • 原文地址:https://www.cnblogs.com/longfurcat/p/9892075.html
Copyright © 2011-2022 走看看