zoukankan      html  css  js  c++  java
  • java 线程池(1)

    问题 :

    • 线程池中的 coreSize 和 maxSize 的作用分别是什么?
    • 未执行的线程池存在在哪种数据类型,为什么使用这种类型的数据结构

    ThreadPoolExecutor概述

            ThreadPoolExecutor 下文简称 TPE ,我们使用它都是从Executror 这个类中的方法 :

      1     public static ExecutorService newFixedThreadPool(int nThreads) {
      2         return new ThreadPoolExecutor(nThreads, nThreads,
      3                                       0L, TimeUnit.MILLISECONDS,
      4                                       new LinkedBlockingQueue<Runnable>());
      5     }
      6 
      7 
      8     public static ExecutorService newSingleThreadExecutor() {
      9         return new FinalizableDelegatedExecutorService
     10             (new ThreadPoolExecutor(1, 1,
     11                                     0L, TimeUnit.MILLISECONDS,
     12                                     new LinkedBlockingQueue<Runnable>()));
     13     }
     14 
     15 
     16 
     17     public static ExecutorService newCachedThreadPool() {
     18         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
     19                                       60L, TimeUnit.SECONDS,
     20                                       new SynchronousQueue<Runnable>());
     21     }
     22 
     23 
     24     public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
     25         return new ScheduledThreadPoolExecutor(corePoolSize);
     26     }
     27 
     28     //ScheduledExecutorService
     29     public ScheduledThreadPoolExecutor(int corePoolSize) {
     30         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
     31               new DelayedWorkQueue());
     32     }
     33 
     34 
     35     public class ScheduledThreadPoolExecutor
     36         extends ThreadPoolExecutor
     37         implements ScheduledExecutorService
     38 
     39 

           Executror 的方法名很明显地说明了创建的对象的用途,我们也可以看到它们实际的都是走到了TLE构造函数,只是传入的参数不同。

      1     public ThreadPoolExecutor(int corePoolSize,
      2                               int maximumPoolSize,
      3                               long keepAliveTime,
      4                               TimeUnit unit,
      5                               BlockingQueue<Runnable> workQueue,
      6                               ThreadFactory threadFactory) {
      7         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
      8              threadFactory, defaultHandler);
      9     }
     10 
     11 
     12     public ThreadPoolExecutor(int corePoolSize,
     13                               int maximumPoolSize,
     14                               long keepAliveTime,
     15                               TimeUnit unit,
     16                               BlockingQueue<Runnable> workQueue,
     17                               ThreadFactory threadFactory,
     18                               RejectedExecutionHandler handler) {
     19         if (corePoolSize < 0 ||
     20             maximumPoolSize <= 0 ||
     21             maximumPoolSize < corePoolSize ||
     22             keepAliveTime < 0)
     23             throw new IllegalArgumentException();
     24         if (workQueue == null || threadFactory == null || handler == null)
     25             throw new NullPointerException();
     26         this.corePoolSize = corePoolSize;
     27         this.maximumPoolSize = maximumPoolSize;
     28         this.workQueue = workQueue;
     29         this.keepAliveTime = unit.toNanos(keepAliveTime);
     30         this.threadFactory = threadFactory;
     31         this.handler = handler;
     32     }

              由此可以推断通过配置TLE的各个参数,实现不同的功能。

    ThreadPoolExecutor重要知识点

          官方文档中有详细介绍,细节请看官方文档

    Core and maximum pool sizes 核心线程数 和 线程池最大线程数

            这两个在构造方法中需要指定,核心线程数和最大线程池线程数,很好理解,就像两条上限线,当来任务时没达到核心线程数,那么就开启一条新线程去执行,要是达到核心数量了,怎么办,任务入列,要是超过了我设定的最大线程数量,那么不再接受任务。这两个值可以动态设置:   setCorePoolSize(int)setMaximumPoolSize(int).

            core 线程数,当空闲的时候依然会存活,除非设置了存活时间。

    On-demand construction

             默认情况下,甚至核心线程最初只在新任务到达时创建并启动,但可以使用方法prestartCoreThread()或prestartAllCoreThreads()动态覆盖。 如果使用非空队列构造池,则可能需要预启动线程。

    Creating new threads 创建新线程

             使用ThreadFactory创建新线程。 如果没有另外指定,则使用Executors.defaultThreadFactory(),它将所有线程创建在同一个ThreadGroup中,并具有相同的NORM_PRIORITY优先级和非守护进程状态。 通过提供不同的ThreadFactory,您可以更改线程的名称,线程组,优先级,守护程序状态等。如果ThreadFactory在通过从newThread返回null请求时无法创建线程,则执行程序将继续,但可能无法 执行任何任务。

           线程应该拥有“modifyThread”RuntimePermission。 如果使用池的工作线程或其他线程不具有此权限,则服务可能会降级:配置更改可能不会及时生效,并且关闭池可能保持可以终止但未完成的状态。(取至官方文档谷歌翻译)

     

    Keep-alive times 存活时间

              前面说到  Core and maximum pool sizes  就像两个上限线,当超过了核心线程数后,任务开始执行完成,那么线程就空闲了,此时要是空闲达到了 Keep-alive times 这个设定值,那么线程就会被回收。使用Long.MAX_VALUE类型的TimeUnit.NANOSECONDS有效地禁止空闲线程在关闭之前终止。

          the keep-alive policy applies only when there are more than corePoolSize threads. But method allowCoreThreadTimeOut(boolean) can be used to apply this time-out policy to core threads as well, so long as the keepAliveTime value is non-zero

    任务处理原则      
    Queuing

             队列的使用和线程池的线程数有关。

    • 如果现有线程少于 corePoolSize 线程数量,尝试开启一条新的线程执行(能执行任务就不排队解决的原则)
    • 达到或超过 corePoolSize ,任务入列
    • 任务队列满了,开新线程直到 maximumPoolSize,当线程数达到 maximumPoolSize,拒绝请求。

             有以下几种队列策略 :

    • Direct handoffs.(直接交付) A good default choice for a work queue is a SynchronousQueue that hands off tasks to threads without otherwise holding them. Here, an attempt to queue a task will fail if no threads are immediately available to run it, so a new thread will be constructed. This policy avoids lockups when handling sets of requests that might have internal dependencies. Direct handoffs generally require unbounded maximumPoolSizes to avoid rejection of new submitted tasks. This in turn admits the possibility of unbounded thread growth when commands continue to arrive on average faster than they can be processed.
    • Unbounded queues.(无界队列) Using an unbounded queue (for example a LinkedBlockingQueue without a predefined capacity) will cause new tasks to wait in the queue when all corePoolSize threads are busy. Thus, no more than corePoolSize threads will ever be created. (And the value of the maximumPoolSize therefore doesn't have any effect.) This may be appropriate when each task is completely independent of others, so tasks cannot affect each others execution; for example, in a web page server. While this style of queuing can be useful in smoothing out transient bursts of requests, it admits the possibility of unbounded work queue growth when commands continue to arrive on average faster than they can be processed.
    • Bounded queues.(有界队列) A bounded queue (for example, an ArrayBlockingQueue) helps prevent resource exhaustion when used with finite maximumPoolSizes, but can be more difficult to tune and control. Queue sizes and maximum pool sizes may be traded off for each other: Using large queues and small pools minimizes CPU usage, OS resources, and context-switching overhead, but can lead to artificially low throughput. If tasks frequently block (for example if they are I/O bound), a system may be able to schedule time for more threads than you otherwise allow. Use of small queues generally requires larger pool sizes, which keeps CPUs busier but may encounter unacceptable scheduling overhead, which also decreases throughput.

             实际上不同应用场景的线程池为适应不同需求,使用了不同的 queue 策略。

    状态标识
              状态控制有一个变量 ctl ,它又两部分组成,
     
    • (后29位)workCount : 指示当前有效的线程数量,但是并不能代表当前存活的活跃的线程数。
    • (高3位)runStatus : 指示线程池本身的状态

              下面是状态变量的定义

      1     private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
      2     private static final int COUNT_BITS = Integer.SIZE - 3;
      3     private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
      4 
      5     // runState is stored in the high-order bits
      6     private static final int RUNNING    = -1 << COUNT_BITS;
      7     private static final int SHUTDOWN   =  0 << COUNT_BITS;
      8     private static final int STOP       =  1 << COUNT_BITS;
      9     private static final int TIDYING    =  2 << COUNT_BITS;
     10     private static final int TERMINATED =  3 << COUNT_BITS;
     11 
     12     // Packing and unpacking ctl
     13     private static int runStateOf(int c)     { return c & ~CAPACITY; }
     14     private static int workerCountOf(int c)  { return c & CAPACITY; }
     15     private static int ctlOf(int rs, int wc) { return rs | wc; }
     16 
     17     /*
     18      * Bit field accessors that don't require unpacking ctl.
     19      * These depend on the bit layout and on workerCount being never negative.
     20      */
     21 
     22     private static boolean runStateLessThan(int c, int s) {
     23         return c < s;
     24     }
     25 
     26     private static boolean runStateAtLeast(int c, int s) {
     27         return c >= s;
     28     }
     29 
     30     private static boolean isRunning(int c) {
     31         return c < SHUTDOWN;
     32     }

              runStatus 提供这几种状态 :

    • RUNNING : 运行
    • SHUTDOWN : 停止接收新任务,执行已经入队的任务。
    • STOP :  不再接受新任务,不执行已经入列的任务,中断正在执行的任务。
    • TIDING : 停止接收新任务,所有任务被暂停,workCount 变为 0 ,线程状态变为 TIDYING.将执行hook 方法 terminated.
    • TERMINATED : terminated方法完成。

    拒绝任务策略

        当线程池关闭或是达到了消费队列中满了的情况下,拒绝任务,线程池提供了这几种方式的拒绝策略

    • 中断策略AbortPolicy (默认) : 丢弃并抛出异常
    • 调用者调用策略CallerRunsPolicy : 当前线程执行任务,并告知线程以低速率接收新的任务
    • 抛弃策略DiscardPolicy  :  丢弃策略
    • 抛弃最老策略DiscardOldestPolicy : 丢弃进消费队列最长时间的任务,当线程池没有关闭,则重新尝试执行任务

    ThreadPoolExecutor源码分析

             看 execute 方法。

      1      //可以创建线程处理就处理,不行就入列,入列也失败,拒绝!
      2      public void execute(Runnable command) {
      3         if (command == null)
      4             throw new NullPointerException();
      5         /*
      6          * Proceed in 3 steps:
      7          *
      8          * 1. If fewer than corePoolSize threads are running, try to
      9          * start a new thread with the given command as its first
     10          * task.  The call to addWorker atomically checks runState and
     11          * workerCount, and so prevents false alarms that would add
     12          * threads when it shouldn't, by returning false.
     13          *
     14          * 2. If a task can be successfully queued, then we still need
     15          * to double-check whether we should have added a thread
     16          * (because existing ones died since last checking) or that
     17          * the pool shut down since entry into this method. So we
     18          * recheck state and if necessary roll back the enqueuing if
     19          * stopped, or start a new thread if there are none.
     20          *
     21          * 3. If we cannot queue task, then we try to add a new
     22          * thread.  If it fails, we know we are shut down or saturated
     23          * and so reject the task.
     24          */
     25         int c = ctl.get();
     26         //未达到 corePoolSize 增加新线程执行
     27         if (workerCountOf(c) < corePoolSize) {
     28             if (addWorker(command, true))
     29                 return;
     30             c = ctl.get();
     31         }
     32         //增加线程失败,或者有可能 worker的数量大于等于 core ,或是大于 maxSize ,任务入列 
     33         if (isRunning(c) && workQueue.offer(command)) {
     34             //注意 :此时任务已成功入列!!!
     35             int recheck = ctl.get();
     36             //再次检查,要是 此时是 SHUTDOWN 状态(线程池关闭),那么移除这个任务,同时拒绝这个请求
     37             if (! isRunning(recheck) && remove(command))
     38                 reject(command);
     39             //非running 状态 ,同时 workerCounter 为 0 (可能线程池里的任务都执行完了),那么新建一个线程,而不去处理,为什么要这样呢?
     40             //因为任务此时在队列中了,创建线程后,自动会去获取任务并处理
     41             //要是都不是就退出这个方法,此时任务在队列中等待被处理
     42             else if (workerCountOf(recheck) == 0)
     43                 addWorker(null, false);
     44         }
     45         //线程池 shut down 或是队列满了,再次新建一个线程执行,但是这次的线程数的判断边界是 maxSize ,即是
     46         // addWorker的第二个参数来指定 
     47         else if (!addWorker(command, false))
     48             reject(command);
     49     }

             假设目前还未达到 core 的数量,那么进入 addWorker 方法 。

      1     private boolean addWorker(Runnable firstTask, boolean core) {
      2         retry:
      3         for (;;) {
      4             int c = ctl.get();
      5             int rs = runStateOf(c);
      6 
      7             // Check if queue empty only if necessary.  
      8             // 线程池满足如下条件中的任意一种时, 就会直接结束该方法, 并且返回 false
      9            // 表示没有创建新线程, 新提交的任务也没有被执行.
     10            // 1 .处于 STOP, TYDING 或 TERMINATD 状态
     11            // 2 .处于 SHUTDOWN 状态, 并且参数 firstTask != null
     12           // 3 .处于 SHUTDOWN 状态, firstTask == null 且阻塞队列 workQueue为空
     13             if (rs >= SHUTDOWN &&
     14                 ! (rs == SHUTDOWN &&
     15                    firstTask == null &&
     16                    ! workQueue.isEmpty()))
     17                 return false;
     18             for (;;) {
     19                 int wc = workerCountOf(c);
     20                 //此处可以看到 第二个参数,core 是用来 选定线程数边界的
     21                 if (wc >= CAPACITY ||
     22                     wc >= (core ? corePoolSize : maximumPoolSize))
     23                     return false;
     24                 //自旋增加 c , ctl的值 ,成功就 break 退出
     25                 if (compareAndIncrementWorkerCount(c))
     26                     break retry;
     27                 c = ctl.get();  // Re-read ctl
     28                 if (runStateOf(c) != rs)
     29                     //说明有人抢了,
     30                     continue retry;
     31                 // else CAS failed due to workerCount change; retry inner loop
     32                 // 或者 CAS 失败是因为 workerCount 改变,继续loop 
     33             }
     34         }
     35 
     36         //下面是增加一个线程的操作,创建 worker ,上锁,再次判断,创建成功后线程开始执行。
     37         boolean workerStarted = false;
     38         boolean workerAdded = false;
     39         Worker w = null;
     40         try {
     41             w = new Worker(firstTask);
     42             final Thread t = w.thread;
     43             if (t != null) {
     44                 final ReentrantLock mainLock = this.mainLock;
     45                 mainLock.lock();
     46                 try {
     47                     // Recheck while holding lock.
     48                     // Back out on ThreadFactory failure or if
     49                     // shut down before lock acquired.
     50                     // recheck 当获得锁的时候 ,退出因为 ThreadFactory 失败或是 在获得锁之前 线程池 shut down 
     51                     int rs = runStateOf(ctl.get());
     52 
     53                     if (rs < SHUTDOWN ||
     54                         (rs == SHUTDOWN && firstTask == null)) {
     55                         if (t.isAlive()) // precheck that t is startable
     56                             throw new IllegalThreadStateException();
     57                         workers.add(w);
     58                         int s = workers.size();
     59                         if (s > largestPoolSize)
     60                             largestPoolSize = s;
     61                         workerAdded = true;
     62                     }
     63                 } finally {
     64                     mainLock.unlock();
     65                 }
     66                 if (workerAdded) {
     67                     //走到这里线程肯定是创建了,并且线程池一定是正常的。
     68                     t.start();
     69                     workerStarted = true;
     70                 }
     71             }
     72         } finally {
     73             if (! workerStarted)
     74                 addWorkerFailed(w);
     75         }
     76         return workerStarted;
     77     }

              addWoker 中创建了一个Worker,我们先看一下构造方法,再慢慢分析它。

      1         Worker(Runnable firstTask) {
      2             setState(-1); // inhibit interrupts until runWorker
      3             this.firstTask = firstTask;
      4             this.thread = getThreadFactory().newThread(this);
      5         }
      1 public interface ThreadFactory {
      2 
      3     /**
      4      * Constructs a new {@code Thread}.  Implementations may also initialize
      5      * priority, name, daemon status, {@code ThreadGroup}, etc.
      6      *
      7      * @param r a runnable to be executed by new thread instance
      8      * @return constructed thread, or {@code null} if the request to
      9      *         create a thread is rejected
     10      */
     11     Thread newThread(Runnable r);
     12 }

              可以看到 ThreadFactory 实际就是创建线程的方法,同时传入一个 Runnable , worker 里面传入了一个this ,我们赶紧看一下worker的定义。

      1     private final class Worker  extends AbstractQueuedSynchronizer  implements Runnable

            意图很明显,就是worker 本身带有一个任务(可以为NULL),让刚创建的线程去执行这个任务。下面看一下它到底执行了什么?

      1         /** Delegates main run loop to outer runWorker  */
      2         public void run() {
      3             runWorker(this);
      4         }
      5 
      6 
      7 
      8         /** Delegates main run loop to outer runWorker  */
      9     public void run() {
     10         runWorker(this);
     11     }
     12 
     13 
     14      /**
     15      * Main worker run loop.  Repeatedly gets tasks from queue and
     16      * executes them, while coping with a number of issues:
     17      *
     18      * 1. We may start out with an initial task, in which case we
     19      * don't need to get the first one. Otherwise, as long as pool is
     20      * running, we get tasks from getTask. If it returns null then the
     21      * worker exits due to changed pool state or configuration
     22      * parameters.  Other exits result from exception throws in
     23      * external code, in which case completedAbruptly holds, which
     24      * usually leads processWorkerExit to replace this thread.
     25      *
     26      * 2. Before running any task, the lock is acquired to prevent
     27      * other pool interrupts while the task is executing, and then we
     28      * ensure that unless pool is stopping, this thread does not have
     29      * its interrupt set.
     30      *
     31      * 3. Each task run is preceded by a call to beforeExecute, which
     32      * might throw an exception, in which case we cause thread to die
     33      * (breaking loop with completedAbruptly true) without processing
     34      * the task.
     35      *
     36      * 4. Assuming beforeExecute completes normally, we run the task,
     37      * gathering any of its thrown exceptions to send to afterExecute.
     38      * We separately handle RuntimeException, Error (both of which the
     39      * specs guarantee that we trap) and arbitrary Throwables.
     40      * Because we cannot rethrow Throwables within Runnable.run, we
     41      * wrap them within Errors on the way out (to the thread's
     42      * UncaughtExceptionHandler).  Any thrown exception also
     43      * conservatively causes thread to die.
     44      *
     45      * 5. After task.run completes, we call afterExecute, which may
     46      * also throw an exception, which will also cause thread to
     47      * die. According to JLS Sec 14.20, this exception is the one that
     48      * will be in effect even if task.run throws.
     49      *
     50      * The net effect of the exception mechanics is that afterExecute
     51      * and the thread's UncaughtExceptionHandler have as accurate
     52      * information as we can provide about any problems encountered by
     53      * user code.
     54      *
     55      * @param w the worker
     56      *
     57      *
     58      *  这里可以看到执行完任务后,就会阻塞在 getTask ,而线程没有被回收
     59      *  除非getTask 返回 null ,所有我们利用 一些调用满足 getTask 返回 null
     60      *  例如 : 超时设置
     61      *
     62      *
     63      **/
     64     final void runWorker(Worker w) {
     65         Thread wt = Thread.currentThread();
     66         Runnable task = w.firstTask;
     67         w.firstTask = null;
     68         w.unlock(); // allow interrupts  允许中断,释放锁(为了下面抢任务)
     69         boolean completedAbruptly = true;
     70         try {
     71             //抢任务,抢到就加锁,即是 firstTask 为null 时才会去 getTask 
     72             while (task != null || (task = getTask()) != null) {
     73                 // 加锁,防止线程被其他线程中断 
     74                 w.lock();
     75                 // If pool is stopping, ensure thread is interrupted;
     76                 // if not, ensure thread is not interrupted.  This
     77                 // requires a recheck in second case to deal with
     78                 // shutdownNow race while clearing interrupt
     79                 if ((runStateAtLeast(ctl.get(), STOP) ||
     80                      (Thread.interrupted() &&
     81                       runStateAtLeast(ctl.get(), STOP))) &&
     82                     !wt.isInterrupted())
     83                     wt.interrupt();
     84                 try {
     85                     //子类实现
     86                     beforeExecute(wt, task);
     87                     Throwable thrown = null;
     88                     try {
     89                         task.run();
     90                     } catch (RuntimeException x) {
     91                         thrown = x; throw x;
     92                     } catch (Error x) {
     93                         thrown = x; throw x;
     94                     } catch (Throwable x) {
     95                         thrown = x; throw new Error(x);
     96                     } finally {
     97                         //抛出异常后,依旧执行这里
     98                         afterExecute(task, thrown);
     99                     }
    100                 } finally {
    101                     task = null;
    102                     w.completedTasks++;
    103                     w.unlock();
    104                 }
    105             }
    106             //来到这里,1.task == null  
    107             completedAbruptly = false;
    108         } finally {
    109             processWorkerExit(w, completedAbruptly);
    110         }
    111     }
    112 
    113 
    114 
    115 
    116 
    117     /**
    118      * Performs blocking or timed wait for a task, depending on
    119      * current configuration settings, or returns null if this worker
    120      * must exit because of any of:
    121      * 1. There are more than maximumPoolSize workers (due to
    122      *    a call to setMaximumPoolSize).
    123      * 2. The pool is stopped.
    124      * 3. The pool is shutdown and the queue is empty.
    125      * 4. This worker timed out waiting for a task, and timed-out
    126      *    workers are subject to termination (that is,
    127      *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
    128      *    both before and after the timed wait, and if the queue is
    129      *    non-empty, this worker is not the last thread in the pool.
    130      *
    131      * @return task, or null if the worker must exit, in which case
    132      *         workerCount is decremented
    133      *
    134      *
    135      *
    136      * 上面的注解是 4 种返回 null 的情况, 其中第一种的原因有没有可能是因为其他线程创建线程导致的呢?
    137      *  worker 必须退出,顺便数量在这里减少一
    138      *
    139      */
    140     private Runnable getTask() {
    141         boolean timedOut = false; // Did the last poll() time out?
    142 
    143         for (;;) {
    144             int c = ctl.get();
    145             int rs = runStateOf(c);
    146 
    147             // Check if queue empty only if necessary.
    148             if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    149                 decrementWorkerCount();
    150                 return null;
    151             }
    152 
    153             int wc = workerCountOf(c);
    154 
    155             // Are workers subject to culling?
    156             boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    157 
    158             //自旋失败后继续 loop 直到 成功 
    159             if ((wc > maximumPoolSize || (timed && timedOut))
    160                 && (wc > 1 || workQueue.isEmpty())) {
    161                 //这里自旋
    162                 if (compareAndDecrementWorkerCount(c))
    163                     return null;
    164                 continue;
    165             }
    166 
    167             try {
    168                 //前面  boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    169                 //可以知道 keepAliveTime 区别于 上面两个条件  1. allowCoreThreadTimeOut  2.wc > corePoolSize 
    170                 Runnable r = timed ?
    171                     workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    172                     workQueue.take();
    173                 if (r != null)
    174                     return r;
    175                 timedOut = true;
    176             // 在这里会捕获中断异常!!这里很重要,提供了假如是调用了 shutDown() 方法,线程可以退出的出口
    177             } catch (InterruptedException retry) {
    178                 timedOut = false;
    179             }
    180         }
    181     }

            getTask 里的 timed 这个变量判断条件一个是 allowCoreThreadTimeOut  和 wc > corePoolSize ,下面有个例子,corePoolSize 为 0 ,而MaxSize 为 Integer.MAX_VALUE,keepAliveTime 为60L,我们由此可以知道当一分钟后,线程没有获取到任务,那么getTask 就会返回null ,退出后该线程就会被回收。

      1     public static ExecutorService newCachedThreadPool() {
      2         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
      3                                       60L, TimeUnit.SECONDS,
      4                                       new SynchronousQueue<Runnable>());
      5     }

            思路就是从任务队列中不断拿取任务(无任务状态下,线程阻塞),然后执行该任务。有个很奇怪的地方,runworker 中在获取任务前释放了锁,在获取到任务后再次获取锁。为什么呢?在 worker 这个类前有注解。

      1    /**
      2      * Class Worker mainly maintains interrupt control state for
      3      * threads running tasks, along with other minor bookkeeping.
      4      * This class opportunistically extends AbstractQueuedSynchronizer
      5      * to simplify acquiring and releasing a lock surrounding each
      6      * task execution.  This protects against interrupts that are
      7      * intended to wake up a worker thread waiting for a task from
      8      * instead interrupting a task being run.  We implement a simple
      9      * non-reentrant mutual exclusion lock rather than use
     10      * ReentrantLock because we do not want worker tasks to be able to
     11      * reacquire the lock when they invoke pool control methods like
     12      * setCorePoolSize.  Additionally, to suppress interrupts until
     13      * the thread actually starts running tasks, we initialize lock
     14      * state to a negative value, and clear it upon start (in
     15      * runWorker).
     16      */

             worker 使用了“中断控制状态”来维护线程运行,该类继承 AbstractQueueSynchronizer ,它的作用是当这个线程在执行任务时不被其他线程中断,而是让其他线程等待被唤醒。同时,该类使用无重入的独占互斥锁而不是 ReentrantLock ,因为我们不想在调用setCorePoolSize 重入该锁。

             现在线程池就在愉快地执行任务了,假如我这时候停止线程池。

      1     public void shutdown() {
      2         final ReentrantLock mainLock = this.mainLock;
      3         mainLock.lock();
      4         try {
      5             //检查是否可以 shutdown 
      6             checkShutdownAccess();
      7             //设置为 SHUTDOWN 
      8             advanceRunState(SHUTDOWN);
      9             //中断所有worker 
     10             interruptIdleWorkers();
     11             onShutdown(); // hook for ScheduledThreadPoolExecutor
     12         } finally {
     13             mainLock.unlock();
     14         }
     15         //终结这个线程池
     16         tryTerminate();
     17     }
     18 
     19 
     20 
     21     /**
     22      * If there is a security manager, makes sure caller has
     23      * permission to shut down threads in general (see shutdownPerm).
     24      * If this passes, additionally makes sure the caller is allowed
     25      * to interrupt each worker thread. This might not be true even if
     26      * first check passed, if the SecurityManager treats some threads
     27      * specially.
     28      */
     29     private void checkShutdownAccess() {
     30         SecurityManager security = System.getSecurityManager();
     31         if (security != null) {
     32             security.checkPermission(shutdownPerm);
     33             final ReentrantLock mainLock = this.mainLock;
     34             mainLock.lock();
     35             try {
     36                 for (Worker w : workers)
     37                     security.checkAccess(w.thread);
     38             } finally {
     39                 mainLock.unlock();
     40             }
     41         }
     42     }
     43 
     44 
     45 
     46     /**
     47      * Transitions runState to given target, or leaves it alone if
     48      * already at least the given target.
     49      *
     50      * @param targetState the desired state, either SHUTDOWN or STOP
     51      *        (but not TIDYING or TERMINATED -- use tryTerminate for that)
     52      */
     53     private void advanceRunState(int targetState) {
     54         for (;;) {
     55             int c = ctl.get();
     56             if (runStateAtLeast(c, targetState) ||
     57                 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
     58                 break;
     59         }
     60     }
     61 
     62 
     63     private void interruptIdleWorkers() {
     64         interruptIdleWorkers(false);
     65     }
     66 
     67 
     68 
     69     /**
     70      * Interrupts threads that might be waiting for tasks (as
     71      * indicated by not being locked) so they can check for
     72      * termination or configuration changes. Ignores
     73      * SecurityExceptions (in which case some threads may remain
     74      * uninterrupted).
     75      *
     76      * @param onlyOne If true, interrupt at most one worker. This is
     77      * called only from tryTerminate when termination is otherwise
     78      * enabled but there are still other workers.  In this case, at
     79      * most one waiting worker is interrupted to propagate shutdown
     80      * signals in case all threads are currently waiting.
     81      * Interrupting any arbitrary thread ensures that newly arriving
     82      * workers since shutdown began will also eventually exit.
     83      * To guarantee eventual termination, it suffices to always
     84      * interrupt only one idle worker, but shutdown() interrupts all
     85      * idle workers so that redundant workers exit promptly, not
     86      * waiting for a straggler task to finish.
     87      *
     88      *
     89      *  从方法名可以看出 : 中断空闲 workers
     90      *  从下面的代码也可以看到要是传过来的参数是 false
     91      *  那么所有线程将被中断  (shutDown()方法有运用到)
     92      *
     93      */
     94     private void interruptIdleWorkers(boolean onlyOne) {
     95         final ReentrantLock mainLock = this.mainLock;
     96         mainLock.lock();
     97         try {
     98 
     99             for (Worker w : workers) {
    100                 Thread t = w.thread;
    101                 //这里 worker 的 tryLock 需要注意一下,这里要是 worker 正在执行任务(这就解释了为什么在runWorker 方法中,worker要加锁了),
    102                 // 那么 tryLock 返回 false,
    103                 if (!t.isInterrupted() && w.tryLock()) {
    104                     try {
    105                         t.interrupt();
    106                     } catch (SecurityException ignore) {
    107                         //注意这里忽略了这个 exception 
    108                         //所以文档中指出了有可能某些线程依旧会保持为非中断状态 
    109                     } finally {
    110                         w.unlock();
    111                     }
    112                 }
    113                 if (onlyOne)
    114                     break;
    115             }
    116         } finally {
    117             mainLock.unlock();
    118         }
    119     }
    120 
    121 
    122     /**
    123      * Performs any further cleanup following run state transition on
    124      * invocation of shutdown.  A no-op here, but used by
    125      * ScheduledThreadPoolExecutor to cancel delayed tasks.
    126      */
    127     void onShutdown() {
    128     }
    129 
    130 
    131 
    132 
    133     /**
    134      * Attempts to stop all actively executing tasks, halts the
    135      * processing of waiting tasks, and returns a list of the tasks
    136      * that were awaiting execution. These tasks are drained (removed)
    137      * from the task queue upon return from this method.
    138      *
    139      * <p>This method does not wait for actively executing tasks to
    140      * terminate.  Use {@link #awaitTermination awaitTermination} to
    141      * do that.
    142      *
    143      * <p>There are no guarantees beyond best-effort attempts to stop
    144      * processing actively executing tasks.  This implementation
    145      * cancels tasks via {@link Thread#interrupt}, so any task that
    146      * fails to respond to interrupts may never terminate.
    147      *
    148      * @throws SecurityException {@inheritDoc}
    149      */
    150     public List<Runnable> shutdownNow() {
    151         List<Runnable> tasks;
    152         final ReentrantLock mainLock = this.mainLock;
    153         mainLock.lock();
    154         try {
    155             checkShutdownAccess();
    156             // STOP 终于在这里发挥了作用!!S:D
    157             advanceRunState(STOP);
    158             interruptWorkers();
    159             //remove task 从队列中
    160             tasks = drainQueue();
    161         } finally {
    162             mainLock.unlock();
    163         }
    164         tryTerminate();
    165         return tasks;
    166     }
    167 

             看一下 tryTerminate 方法。

      1     /**
      2      * Transitions to TERMINATED state if either (SHUTDOWN and pool
      3      * and queue empty) or (STOP and pool empty).  If otherwise
      4      * eligible to terminate but workerCount is nonzero, interrupts an
      5      * idle worker to ensure that shutdown signals propagate. This
      6      * method must be called following any action that might make
      7      * termination possible -- reducing worker count or removing tasks
      8      * from the queue during shutdown. The method is non-private to
      9      * allow access from ScheduledThreadPoolExecutor.
     10      *
     11      *
     12      * 如果满足其中一个条件 :
     13      *    1. SHUTDOWN 并且 workerCount为 0 并且 队列为空
     14      *    2. STOP 并且 workerCount为 0
     15      *    那么将状态转化为 TERMINATED ;
     16      *
     17      *  如果  workerCount(c)!=0 ,那么调用  interruptIdleWorkers(true); 然后就return
     18      *
     19      *
     20      *  所以我们可以知道在线程池正常的状态的下必定直接 return
     21      *
     22      */
     23     final void tryTerminate() {
     24         for (;;) {
     25             int c = ctl.get();
     26             if (isRunning(c) ||
     27                 runStateAtLeast(c, TIDYING) ||
     28                 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
     29                 return;
     30             //具备条件时执行下面
     31             if (workerCountOf(c) != 0) { // Eligible to terminate
     32                 // 传入一个参数  true ,表示只中断一个,这是因为,每个线程当自己没任务时,肯定
     33                 interruptIdleWorkers(ONLY_ONE);
     34                 return;
     35             }
     36 
     37             final ReentrantLock mainLock = this.mainLock;
     38             mainLock.lock();
     39             try {
     40                 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
     41                     try {
     42                         //看到了吗,介绍线程池状态时,讲到要是是 TIDYING 状态时,会调用这个钩子方法
     43                         terminated();
     44                     } finally {
     45                         ctl.set(ctlOf(TERMINATED, 0));
     46                         termination.signalAll();
     47                     }
     48                     return;
     49                 }
     50             } finally {
     51                 mainLock.unlock();
     52             }
     53             // else retry on failed CAS
     54         }
     55     }

           从上面我们看到要是调用了 shutDown() 或是 shutDownNow ()那么我们正阻塞在 getTask()方法的线程就会收到中断异常,于是就会getTask就会返回 null 。那么我们继续看一下线程继续向下执行的逻辑。

      1     /**
      2      * Performs cleanup and bookkeeping for a dying worker. Called
      3      * only from worker threads. Unless completedAbruptly is set,
      4      * assumes that workerCount has already been adjusted to account
      5      * for exit.  This method removes thread from worker set, and
      6      * possibly terminates the pool or replaces the worker if either
      7      * it exited due to user task exception or if fewer than
      8      * corePoolSize workers are running or queue is non-empty but
      9      * there are no workers.
     10      *
     11      * @param w the worker
     12      * @param completedAbruptly if the worker died due to user exception
     13      */
     14     private void processWorkerExit(Worker w, boolean completedAbruptly) {
     15         if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
     16             decrementWorkerCount();
     17 
     18         //加锁执行
     19         final ReentrantLock mainLock = this.mainLock;
     20         mainLock.lock();
     21         try {
     22             completedTaskCount += w.completedTasks;
     23             //移除 worker 
     24             workers.remove(w);
     25         } finally {
     26             mainLock.unlock();
     27         }
     28 
     29         tryTerminate();
     30 
     31         int c = ctl.get();
     32         // replacement 的意思在这里,是上面已经移除了一个 worker , 这里调用 addWorker 再补充 
     33         if (runStateLessThan(c, STOP)) {
     34             if (!completedAbruptly) {
     35                 int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
     36                 if (min == 0 && ! workQueue.isEmpty())
     37                     min = 1;
     38                 if (workerCountOf(c) >= min)
     39                     return; // replacement not needed
     40             }
     41             addWorker(null, false);
     42         }
     43     }
     44 

    线程池线程数量的选定

    • 如果是CPU密集型应用,则线程池大小设置为N+1
    • 如果是IO密集型应用,则线程池大小设置为2N+1

    CPU密集型

           一些进程绝大多数时间在计算上,称为计算密集型(CPU密集型)computer-bound。一些大量循环的代码(例如:图片处理、视频编码、人工智能等)就是CPU密集型。

    I/O密集型

           有一些进程则在input 和output上花费了大多时间,称为I/O密集型,I/O-bound。比如搜索引擎蜘蛛大多时间是在等待相应这种就属于I/O密集型。

              当然这也是大家常用的估算方法,阅读以下文章帮助了解 。

    总结

    •   线程池主要的使用到了一个 workQueue ,这是一个BlockQueue (阻塞队列) ,用于存放还没执行的任务。
    •   参数里面 coreSize 主要的作用是控制创建线程的数量,maxSize 主要的作用是是否还存放还没执行的任务,还是执行拒绝策略

    参考资料 :

  • 相关阅读:
    TVB西游记-观音的眼泪化作金河送唐僧回长安
    天下没有免费的午餐是什么意思
    什么样的经历、体验或者行为等能彻底的改变一个人
    看人先看什么
    python字符串中查找指定子字符串
    字符串的分隔及连接
    流媒体服务器音视频直播平台的开发为什么需要CDN?
    微信公众号小程序如何做流媒体视频直播?
    搭建专属于自己的视频流媒体直播/点播平台都需要注意哪些事项?
    音视频流媒体服务器的虚拟直播推流失败断流无法播放如何解决?
  • 原文地址:https://www.cnblogs.com/Benjious/p/10200248.html
Copyright © 2011-2022 走看看