zoukankan      html  css  js  c++  java
  • 线程池ThreadPoolExecutor源码解读研究(JDK1.8)

    一、什么是线程池

           为什么要使用线程池?在多线程并发开发中,线程的数量较多,且每个线程执行一定的时间后就结束了,下一个线程任务到来还需要重新创建线程,这样线程数量特别庞大的时候,频繁的创建线程和销毁线程需要一定时间而且增加系统的额外开销。基于这样的场景,线程池就出现了,线程池可以做到一个线程的任务处理完可以接受下一个任务,并不需要频繁的创建销毁,这样大大节省了时间和系统的开销。

             线程池,顾名思义,就是一个池子,任务提交的到线程池后,线程池会在池子里边找有没有空闲的线程,如果没有,就会进入等待状态,有就会分配一个空闲的线程来接受这个任务,当服务执行完,从新放回到线程池,不需要销毁。在这种模式下,系统大大减少了创建线程个销毁线程的资源开销,而且一个线程可以用来执行多个任务,我们可以根据系统的配置灵活调整线程池的大小,从而更高效的执行任务。

    二、线程池类之间关系

           线程池主要包含:Executors,Executor,ExecutorService,AbstractExecutorService,ThreadPoolExecutor这些类。Executors用来创建线程池,返回ExecutorService的对象,该对象就可以调用execute方法或者submit方法执行线程。当然,我们也可以自己new一个。

    Executor,ExecutorService,AbstractExecutorService,ThreadPoolExecutor的继承关系的继承关系为:Executor是一个接口,里面只有execute方法声明,接口ExecutorService继承Executor接口,里面包含shutdown(),shutdownNow(),isTerminated(),submit等方法; AbstractExecutorService是ExecutorService的实现类,实现了该类中的方法,ThreadPoolExecutor继承AbstractExecutorService。

    三、线程池状态说明

    RUNNING可以接受新任务,也可以处理阻塞队列里面的任务

    SHUTDOWN不接受新任务,但是可以处理阻塞队列里的任务

    STOP不在接收新任务,也不再处理阻塞队列里的任务,并中断正在处理的任务

    TIDYING中间状态:线程池中没有有效的线程,调用terminate进入TERMINATE状态

    TERMINATE:终止状态

    四、线程池源码分析

    ExecutorService  executor = Executors.newFixedThreadPool(100);

    通过API我们可以看到创建线程池的过程。

    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }

    Executors这个类中基本都是静态方法,代理了线程池的创建,大大简化了我么创建线程池工作量,通过方法名我们就可以创建我们想要的线程池,他的内部其实都是统一的方法实现的,通过构造方法重载实现不同的功能,但是不看源码,是很难知道他们的具体作用的。我们可以看到,这里面有好几种创建线程池的方法,他们有什么区别呢?

    1. newFixedThreadPool(int)方法,内部实现如下:

    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

    创建指定大小的线程池,如果超出大小,放入block队列,即LinkedBlockingQueue队列,默认的线程工厂为defaultThreadFactory。

    2. newWorkStealingPool(int),内部实现如下:

    public static ExecutorService newWorkStealingPool(int parallelism) {
            return new ForkJoinPool
                (parallelism,
                 ForkJoinPool.defaultForkJoinWorkerThreadFactory,
                 null, true);
    }

    JDK1.8新增,返回ForkJoin,个人感觉有一点mapReduce的思想。

    3.newSingleThreadPool,源码如下:

    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));

    创建单个线程的线程池。

    4. newCachedThreadPool,源码如下:

    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }

    线程池长度超过处理需要,灵活回收空闲线程,若无可回收,则创建新线程。

            Executors里面还有好多方法,我们仔细查看API就可以了解的个大概,它是一个工具类,提供了一些静态方法。从源码中我们可以看到创建线程池返回的是return new ThreadPoolExecutor方法,它的源码如下:

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    View Code

            刚看源码的时候的确很痛苦,我们得熟悉作者的思想,他为什么要这么写,知道了作者的思想以后就好办多了,我是结合英文说明来揣摩的,下面我们看每个参数的意思。

    corePoolSize核心线程大小,线程数一旦超过这个值,多余的就会被放入等待队列

    maximumPoolSize线程池中的最大线程数量,这个一般用不到,源码中可以看到corePoolSize和maximumPoolSize是一样的,不同的是大于这个值会由丢弃处理机制来处理,不会被放入等待队列。

    keepAliveTime保持时间,当线程没有任务处理后,保持多久结束,默认是0

    workQueue等待队列,默认为LinkedBlockingQueue,这就是前面提到的等待队列,里面是一个HashSet,内部包装了一层。

    threadFactory构造Thread方法,我们可以自己包装和传递,实现newThread方法

    handler这就是前面提到的丢弃处理机制方法,实现接口RejectExecutionHandler中的方法即可。

    在做项目的时候发现线程池有两个执行方法可供调用,分别是execute和submit,那么这两个方法有什么区别呢?在看submit源码的时候可以看到submit最终还是会调用execute方法。

    不同的是submit方法提供了一个Future来托管返回值的处理,当调用这个方法需要有返回值的时候,可以用这个方法,execute只能接受Runnable作为参数,而submit除了Runnable还可以接收Callable。

    下面来分析最重要的execute方法源码:

     1 public void execute(Runnable command) {
     2         if (command == null)
     3             throw new NullPointerException();
     4         /*
     5          * Proceed in 3 steps:
     6          *
     7          * 1. If fewer than corePoolSize threads are running, try to
     8          * start a new thread with the given command as its first
     9          * task.  The call to addWorker atomically checks runState and
    10          * workerCount, and so prevents false alarms that would add
    11          * threads when it shouldn't, by returning false.
    12          *
    13          * 2. If a task can be successfully queued, then we still need
    14          * to double-check whether we should have added a thread
    15          * (because existing ones died since last checking) or that
    16          * the pool shut down since entry into this method. So we
    17          * recheck state and if necessary roll back the enqueuing if
    18          * stopped, or start a new thread if there are none.
    19          *
    20          * 3. If we cannot queue task, then we try to add a new
    21          * thread.  If it fails, we know we are shut down or saturated
    22          * and so reject the task.
    23          */
    24         int c = ctl.get();
    25         if (workerCountOf(c) < corePoolSize) {
    26             if (addWorker(command, true))
    27                 return;
    28             c = ctl.get();
    29         }
    30         if (isRunning(c) && workQueue.offer(command)) {
    31             int recheck = ctl.get();
    32             if (! isRunning(recheck) && remove(command))
    33                 reject(command);
    34             else if (workerCountOf(recheck) == 0)
    35                 addWorker(null, false);
    36         }
    37         else if (!addWorker(command, false))
    38             reject(command);
    39 }
    View Code

    代码解释:如果任务为空,返回空异常;接下来int c = ctl.get();获取线程池的状态位,进入if中计算线程池的数量,如果小于线程池的核心线程数,就封装成一个工作(work),失败了继续获取线程池状态位;if (isRunning(c) && workQueue.offer(command))判断线程池是否正常运行,正常的话就把当前线程添加到工作队列并且再次获取线程池状态位,if (! isRunning(recheck) && remove(command))如果没有运行的线程了,就把刚才添加的线程移除,移除成功后,使用拒绝策略reject(command); else if (workerCountOf(recheck) == 0)

                    addWorker(null, false);如果线程池的线程数为0,那么就要添加一个空任务继续运行,以此来保证可以继续接收新任务而继续运行。

    else if (!addWorker(command, false))

                reject(command);

    如果核心线程满了,工作队列也饱和了,开启非核心线程也失败了就会拒绝,此时已经达到最大线程数了。

    从英文解释中,我们可以看到:基本分三步:

    a)       开启线程执行任务,直到达到最大核心线程数

    b)      达到核心线程数时,将接受的新任务放入工作队列

    c)       当工作队列也放满后,就会开启线程(非核心)执行任务,直到到达最大线程数

    d)      以上条件都不满足时,就执行默认的拒绝策略

    addWork源码:

    private boolean addWorker(Runnable firstTask, boolean core) {
            retry: //循环标志
            for (;;) { 死循环
                int c = ctl.get();//获取状态位
                int rs = runStateOf(c);//计算线程池的状态
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;//这一段说的是线程池不能正常运行的情况:线程池状态关闭、任务为空、队列为空返回错误
    
                for (;;) {//死循环
                    int wc = workerCountOf(c);//计算线程数
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;//如果线程数超出核心线程数,返回错误
                    if (compareAndIncrementWorkerCount(c))//增加worker的数量
                        break retry;//回到进入该方法的循环状态
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;//如果状态发生改变,就回退
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            boolean workerStarted = false;//线程是否开始运行
            boolean workerAdded = false;//worker是否添加成功
            Worker w = null;
            try {
                w = new Worker(firstTask);//封装成worker
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;//加锁
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());计算线程池状态
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;//worker添加成功
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();//启动刚刚添加的任务
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);//失败后执行的操作
            }
            return workerStarted;
    }
    View Code

    从对源码的翻译中我们可以知道这个方法是有什么作用,简单说就是:创建任务,封装任务。

    五、线程测试

    进行一个简单的测试模拟线程池的工作原理:

    模拟多线程:

    public class TestThreadPool implements Runnable {
        @Override
        public void run() {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    View Code

    测试类:

    public static void main(String[] args) {
            //指定3个长度的工作队列
            LinkedBlockingDeque<Runnable> workQueue=new LinkedBlockingDeque<>(3);
            //指定线程池参数:核心线程数,线程池最大线程数量,活跃时间,工作队列
            ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(4, 7, 90, 
                    TimeUnit.SECONDS, workQueue);
            for (int i = 0; i < 15; i++) {
                threadPoolExecutor.execute(new Thread(new TestThreadPool(), 
                        "线程:".concat(i+"")));
                System.out.println("线程池中活跃线程数"+threadPoolExecutor.getActiveCount());
                if(workQueue.size()>0){
                    System.out.println("被阻塞的线程数为:"+workQueue.size());
                }
            }
        }
    View Code

    指定线程池核心数为4,最大线程数量7,工作队列最大放入3个线程,模拟15个线程并发。运行结果如下:

    线程池中活跃线程数1
    线程池中活跃线程数2
    线程池中活跃线程数3
    线程池中活跃线程数4
    线程池中活跃线程数4
    被阻塞的线程数为:1
    线程池中活跃线程数4
    被阻塞的线程数为:2
    线程池中活跃线程数4
    被阻塞的线程数为:3
    线程池中活跃线程数5
    被阻塞的线程数为:3
    线程池中活跃线程数6
    被阻塞的线程数为:3
    线程池中活跃线程数7
    被阻塞的线程数为:3
    Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task Thread[线程:10,5,main] rejected from java.util.concurrent.ThreadPoolExecutor@42a57993[Running, pool size = 7, active threads = 7, queued tasks = 3, completed tasks = 0]
        at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
        at main.Main.main(Main.java:19)
    View Code

            可以看到,创建了4个核心线程和3个非核心线程,当线程数超出了线程池可容纳的的最大数量,执行了拒绝策略Reject,说明队列和线程池都满了,线程池处于饱和状态,另外一个原因是完成的线程没有及时释放,而是进入了休眠。

            线程池工作原理:任务开始后,开始创建新的线程,当达到核心线程数后,新的任务进来不在创建新的线程,这时候把任务加入工作队列,当达到工作队列的长度后,新任务开始创建新的普通线程,直到数量达到线程池的最大核心数量,后面再有新任务则执行饱和策略或拒绝,抛出异常。

  • 相关阅读:
    C++设计模式——代理模式
    C++设计模式——享元模式
    C++设计模式——外观模式
    C++设计模式——装饰模式
    C++设计模式——组合模式
    C++设计模式——桥接模式
    C++设计模式——适配器模式
    C++设计模式——原型模式
    云服务器和虚拟主机的区别
    ES6的Module系统
  • 原文地址:https://www.cnblogs.com/10158wsj/p/8620200.html
Copyright © 2011-2022 走看看