zoukankan      html  css  js  c++  java
  • Java线程池执行器ThreadPoolExecutor工作原理(转载)

    一、构造方法及其参数
    ThreadPoolExecutor位于java.util.concurrent包,有4个带参数的构造方法。最终被调用的构造方法如下。其他构造方法只是提供了默认的ThreadFactory或者RejectedExecutionHandler作为参数。

    1 public ThreadPoolExecutor(int corePoolSize,
    2                 int maximumPoolSize,
    3                 long keepAliveTime,
    4                 TimeUnit unit,
    5                 BlockingQueue<Runnable> workQueue,
    6                 ThreadFactory threadFactory,
    7                 RejectedExecutionHandler handler)


    构造方法参数解析:

    1. int corePoolSize
    此参数字面上的意思是核心线程的数量。在public void execute(Runnable command)方法中,有以下几行代码:

    1 if (workerCountOf(c) < corePoolSize) {
    2 if (addWorker(command, true))
    3 return;
    4 }

    其中workerCountOf(c)方法返回的是当前正在运行的线程数,addWorker(command,true)方法的功能则是启动一个新的线程以执行command。
    可见,当有新任务来到,当前运行的线程数少于corePoolSize的时候,ThreadPoolExecutor二话不说就启动一个新的线程来执行这个任务。

    2.int maximumPoolSize
    最大可运行的线程数量。 在addWorker方法中:

     1 private boolean addWorker(Runnable firstTask, boolean core) {
     2 ...
     3 int wc = workerCountOf(c);
     4 if (wc >= CAPACITY ||
     5 wc >= (core ? corePoolSize : maximumPoolSize))
     6 return false;
     7 ...
     8 ...
     9 }
    10 addWorker的功能是创建新的线程并执行。
    11 从上面的代码可见,当当前运行的线程数量大于等于maximumPoolSize时,ThreadPoolExecutor将不会再创建新的线程。
    12 
    13 3.long keepAliveTime
    14 private Runnable getTask() {
    15 boolean timedOut = false; // Did the last poll() time out?
    16 for (;;) {
    17 ...
    18 int wc = workerCountOf(c);
    19 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    20 ...
    21 
    22 Runnable r = timed ?
    23 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    24 workQueue.take();
    25 ...
    26 }
    27 }

    线程池中的线程运行完上一个任务之后,将会通过getTask方法获取下一个任务来执行。即从构造方法中的BlockingQueue<Runnable> workQueue参数--阻塞队列中获取。
    从上面的代码中可知,当timed为true的时候,keepAliveTime才起作用。而timed在两种情况下才为true:1.当前运行的线程数量大于核心线程的数量。2.allowCoreThreadTimeOut为true(此变量可通过public void allowCoreThreadTimeOut(boolean value)方法设置),即设置核心线程也受等待时长限制。
    所以keepAliveTime的作用是,当前线程执行完这个任务之后,等待下一个任务到来的最长等待时间。如果在这个时间内没有新的任务来到,那当前线程就会退出。前提是满足两个上面说的两个条件。
    4.其他参数
    TimeUnit unit : keepAliveTime的时间单位。详见TimeUnit类说明。

     BlockingQueue<Runnable> workQueue : 阻塞队列,用于存放待执行的任务。

    ThreadFactory threadFactory : 为Runnable任务创建线程的接口,实现可参照ThreadPoolExecutor的内部类DefaultThreadFactory。
    RejectedExecutionHandler handler : java.util.concurrent包下的回调接口,当用户传入的任务无法被接收时,此接口的rejectedExecution方法会被调用。

    二、主要方法及逻辑
    ThreadPoolExecutor的主要逻辑写在public void execute(Runnable command)方法中:

     1 public void execute(Runnable command) {
     2 if (command == null)
     3 throw new NullPointerException();
     4 int c = ctl.get();
     5 if (workerCountOf(c) < corePoolSize) {
     6 if (addWorker(command, true))
     7 return;
     8 c = ctl.get();
     9 }
    10 if (isRunning(c) && workQueue.offer(command)) {
    11 int recheck = ctl.get();
    12 if (! isRunning(recheck) && remove(command))
    13 reject(command);
    14 else if (workerCountOf(recheck) == 0)
    15 addWorker(null, false);
    16 }
    17 else if (!addWorker(command, false))
    18 reject(command);
    19 }

    前面说了,workerCountOf(c)方法可以获取当前正在运行的线程数,那我们进到该方法看看:

    1 private static int workerCountOf(int c)  { return c & CAPACITY; }
    2 private static final int COUNT_BITS = Integer.SIZE - 3;
    3 private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    所以整数c的后28位存放着当前运行线程的数量?要验证这个问题,我们先看看c是从哪里来的:int c = ctl.get(); ,那ctl又是什么?
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    原来ctl是一个提供原子级操作的整数,关于AtomicInteger大家可以百度下,此处我们就当它是一个整数。其初始值为ctlOf(RUNNINT,0),这个又是什么方法?

    1 private static int ctlOf(int rs, int wc) { return rs | wc; }
    2 private static final int RUNNING    = -1 << COUNT_BITS;
    3 private static final int SHUTDOWN   =  0 << COUNT_BITS;
    4 private static final int STOP       =  1 << COUNT_BITS;
    5 private static final int TIDYING    =  2 << COUNT_BITS;
    6 private static final int TERMINATED =  3 << COUNT_BITS;

    看到这里就明了了,原来整数c的前4位存放ThreadPoolExecutor的运行状态,后28位存放运行线程的数量。那为什么要把这两个数打包到一起呢?这个问题我们后面再看。

    所以当workerCountOf(c) < corePoolSize时,我们会进入到addWorker方法:

     1 private boolean addWorker(Runnable firstTask, boolean core) {
     2 retry:
     3 for (;;) {
     4 int c = ctl.get();//当前运行线程的数量
     5 int rs = runStateOf(c);//当前运行状态
     6 
     7 // Check if queue empty only if necessary.
     8 if (rs >= SHUTDOWN &&
     9 ! (rs == SHUTDOWN &&
    10 firstTask == null &&
    11 ! workQueue.isEmpty())) //当前的运行状态不为RUNNING,或者在SHUTDOWN状态下(不接受新任务但是会执行完队列中的任务),否则返回false
    12 return false;
    13 
    14 for (;;) {
    15 int wc = workerCountOf(c);//再检查一遍当前运行的线程数
    16 if (wc >= CAPACITY ||
    17 wc >= (core ? corePoolSize : maximumPoolSize))//线程的数量不能大于最大容量。如果该任务标记为核心任务,那线程的数量不能大于corePoolSize,不然的话不能大于maximunPoolSize,否则返回false
    18 return false;
    19 if (compareAndIncrementWorkerCount(c))//将线程数量加1,如果增加成功,则退出循环,执行后面的代码。失败一般都是因为线程数不同步。
    20 break retry;
    21 c = ctl.get(); // Re-read ctl
    22 if (runStateOf(c) != rs)//再检查一遍运行状态,如果运行状态改变了,将从外循环开头开始执行,再次确认能否添加任务。
    23 continue retry;
    24 // else CAS failed due to workerCount change; retry inner loop<pre name="code" class="java">//如果程序运行到这里,是因为线程数加1失败但是运行状态没有发生改变,此时会再执行内循环验证线程数。
    25 } } 
    26 //下面的代码功能是创建新的Worker实例 
    27 boolean workerStarted = false;//线程是否启动 
    28 boolean workerAdded = false;//Worker是否已添加到Set中 
    29 Worker w = null; 
    30 try { w = new Worker(firstTask); 
    31 final Thread t = w.thread;//Worker实例中存放了一个Thread 
    32 if (t != null) { 
    33 final ReentrantLock mainLock = this.mainLock; 
    34 mainLock.lock(); 
    35 try { // Recheck while holding lock. 
    36 // Back out on ThreadFactory failure or if 
    37 // shut down before lock acquired. 
    38 int rs = runStateOf(ctl.get()); 
    39 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {//再次检查运行状态,逻辑就不说了,和上面的相同 
    40 if (t.isAlive()) // precheck that t is startable 
    41 throw new IllegalThreadStateException(); 
    42 workers.add(w); 
    43 int s = workers.size(); 
    44 if (s > largestPoolSize)//这里应该是统计,没有实际作用 
    45 largestPoolSize = s; workerAdded = true; 
    46 } 
    47 } finally { 
    48 mainLock.unlock(); 
    49 }
    50 if (workerAdded) { 
    51 t.start();//启动新线程 
    52 workerStarted = true; 
    53 } 
    54 } 
    55 } finally {
    56 if (! workerStarted)//如果启动线程失败 
    57 addWorkerFailed(w); 
    58 } 
    59 return workerStarted; 
    60 }

    看来addWorker方法的大致功能就是创建一个新Worker实例,并执行Worker实例中的线程。
    所以如果addWorker执行成功,则execute方法将直接返回。
    如果addWorker执行失败,即不能为该任务创建一个新的线程。那么execute方法将尝试把这个方法放到阻塞队列中。即workQueue.offer(command)。如果offer成功,将再次检查运行状态和当前运行的线程数量,保证这两个值没有在offer期间发生改变。如果当前状态不是RUNNING,将拒绝这个任务。如果当前运行的线程数量为0,则执行addWorker(null, false);。这又是什么意思?为什么参数是null?
    我们回到addWorker方法看一下。在创建Worker方法的时候,并没有判断Runnable是否为null。那它是怎么执行的?我们再看看Woker类

    1 Worker(Runnable firstTask) {
    2 setState(-1); // inhibit interrupts until runWorker
    3 this.firstTask = firstTask;
    4 this.thread = getThreadFactory().newThread(this);
    5 }

    原来Worker类初始化线程变量的时候传入的Runnable不是我们传入的参数,而是它自己。我们在看看它自己的run方法。

     1 /** Delegates main run loop to outer runWorker. */
     2 public void run() {
     3 runWorker(this);
     4 }
     5 final void runWorker(Worker w) {
     6         Thread wt = Thread.currentThread();
     7         Runnable task = w.firstTask;//Worker构造方法中的参数在这里
     8         w.firstTask = null;
     9         w.unlock(); // allow interrupts
    10         boolean completedAbruptly = true;
    11         try {
    12             while (task != null || (task = getTask()) != null) {//当task为null时,尝试获取阻塞队列中的任务
    13                 w.lock();
    14                 // If pool is stopping, ensure thread is interrupted;
    15                 // if not, ensure thread is not interrupted.  This
    16                 // requires a recheck in second case to deal with
    17                 // shutdownNow race while clearing interrupt
    18                 if ((runStateAtLeast(ctl.get(), STOP) ||
    19                      (Thread.interrupted() &&
    20                       runStateAtLeast(ctl.get(), STOP))) &&
    21                     !wt.isInterrupted())
    22                     wt.interrupt();
    23                 try {
    24                     beforeExecute(wt, task);
    25                     Throwable thrown = null;
    26                     try {
    27                         task.run();//执行任务
    28                     } ... finally {
    29                         afterExecute(task, thrown);
    30                     }
    31                 } finally {
    32                     task = null;
    33                     w.completedTasks++;
    34                     w.unlock();
    35                 }
    36             }
    37             completedAbruptly = false;
    38         } finally {
    39             processWorkerExit(w, completedAbruptly);
    40         }
    41     }
    42 private Runnable getTask() {
    43         boolean timedOut = false; // Did the last poll() time out?
    44         for (;;) {
    45             int c = ctl.get();
    46             int rs = runStateOf(c);
    47             // Check if queue empty only if necessary.
    48             if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    49                 decrementWorkerCount();
    50                 return null;
    51             }
    52             int wc = workerCountOf(c);
    53             // Are workers subject to culling?
    54             boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    55             if ((wc > maximumPoolSize || (timed && timedOut))
    56                 && (wc > 1 || workQueue.isEmpty())) {
    57                 if (compareAndDecrementWorkerCount(c))
    58                     return null;
    59                 continue;
    60             }
    61             try {
    62                 Runnable r = timed ?
    63                     workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    64                     workQueue.take();
    65                 if (r != null)
    66                     return r;//这里返回阻塞队列的Runnable。此处逻辑在前面的keepAliveTime中已讲过。
    67                 timedOut = true;
    68             } catch (InterruptedException retry) {
    69                 timedOut = false;
    70             }
    71         }
    72     }

    到这里就明白了。当当前运行的线程数为0时,调用addWorker(null,false)时,将创建一个新的线程去取阻塞队列中的任务来执行。


    最后,execute方法中,如果调用offer在阻塞队列存入任务失败,将调用addWorker(command, false)尝试启动一个非核心线程来执行任务。如果启动失败,此任务将被拒绝。

    3.总结
    所以,ThreadPoolExecutor的主要逻辑是,当用户调用execute(Runnable command) ,大致逻辑如下(注意是大致逻辑,代码中的逻辑更复杂更详细):
    1.查看当前运行状态,如果不是RUNNING状态,将直接拒绝新任务。否则进入步骤2。
    2.查看当前运行线程的数量,如果数量少于核心线程数,将直接创建新的线程执行该任务。否则进入步骤3。
    3.将该任务添加到阻塞队列,等待核心线程执行完上一个任务再来获取。如果添加到阻塞队列失败,进入步骤4。
    4.尝试创建一个非核心线程执行该任务,前提是线程的数量少于等于最大线程数。如果失败,拒绝该任务。

    所以使用ThreadPoolExecutor需要注意下面几点:
    1.allowCoreThreadTimeOut默认为false,如果不设置为true,那已被创建的少于等于核心线程数量的线程,将一直存在,处于运行状态或者阻塞在workQueue.take()中。
    2.当前运行的线程数等于核心线程数了,任务才会被加入队列,当队列满了,才会创建非核心线程。所以如果使用LinkedBlockingQueue,那么队列将不会满,非核心线程不会被创建。
    3.如果队列满了,当前运行的线程数也达到最大线程数了,那么新增的任务将被拒绝,即使并没有发生任何错误。所以应该做好被拒绝任务的处理工作。

  • 相关阅读:
    Constants and Variables
    随想
    C#基础篇之语言和框架介绍
    Python基础19 实例方法 类方法 静态方法 私有变量 私有方法 属性
    Python基础18 实例变量 类变量 构造方法
    Python基础17 嵌套函数 函数类型和Lambda表达式 三大基础函数 filter() map() reduce()
    Python基础16 函数返回值 作用区域 生成器
    Python基础11 List插入,删除,替换和其他常用方法 insert() remove() pop() reverse() copy() clear() index() count()
    Python基础15 函数的定义 使用关键字参数调用 参数默认值 可变参数
    Python基础14 字典的创建修改访问和遍历 popitem() keys() values() items()
  • 原文地址:https://www.cnblogs.com/wangjincai/p/13368587.html
Copyright © 2011-2022 走看看