zoukankan      html  css  js  c++  java
  • 并发编程学习笔记(三十四、线程池源码四,execute方法分析)

    目录:

    • execute()
    • addWorker()

    execute()

     1 public void execute(Runnable command) {
     2     if (command == null)
     3         throw new NullPointerException();
     4     /*
     5      * 处理过程分为以下三步:
     6      *
     7      * 1. 线程池线程数小于corePoolSize:
     8      * 则创建新的核心线程执行任务,command=新线程的第一个执行的任务。
     9      * 否则进入步骤2。
    10      *
    11      * 2. 若任务可以成功进入阻塞队列,则重新检查线程池状态。
    12      * 如果线程池已经停止了,则将刚刚入队的任务做处理。
    13      * 如果线程池内没有线程了,则创建一个新的线程
    14      *
    15      * 3. 如果任务入队失败,则创建一个新的非核心线程。
    16      * 如果创建非核心线程失败了,则线程池可能已经关闭或饱和了。
    17      * 因此需要拒绝这个任务。
    18      */
    19     int c = ctl.get();
    20     // 工作线程小于corePoolSize
    21     if (workerCountOf(c) < corePoolSize) {
    22         // 创建新的核心线程执行任务
    23         if (addWorker(command, true))
    24             return;
    25         c = ctl.get();
    26     }
    27     // 重新检查线程状态,并调用offer方法入队
    28     if (isRunning(c) && workQueue.offer(command)) {
    29         int recheck = ctl.get();
    30         // 线程处于非running状态,并且成功删除刚刚入队的任务
    31         if (!isRunning(recheck) && remove(command))
    32             // 执行reject方法,拒绝此次提交的任务
    33             reject(command);
    34         // 线程池内没有任务了
    35         else if (workerCountOf(recheck) == 0)
    36             // 创建一个非核心线程任务
    37             addWorker(null, false);
    38     }
    39     // 线程处于非running状态,或是running状态但入队失败了
    40     // 尝试通过addWorker方法创建一个非核心线程
    41     // 创建失败则拒绝任务
    42     else if (!addWorker(command, false))
    43         reject(command);
    44 }

    addWorker()

     1 private boolean addWorker(Runnable firstTask, boolean core) {
     2     // 外层循环标记
     3     retry:
     4     for (;;) {
     5         int c = ctl.get();
     6         // 获取运行状态
     7         int rs = runStateOf(c);
     8 
     9         // 当线程池状态为SHUTDOWN、STOP、TIDYING、TERMINATED且
    10         // 状态不为SHUTDOWN 或 firstTask!=null 或 队列为空时返回false
    11         if (rs >= SHUTDOWN &&
    12             ! (rs == SHUTDOWN &&
    13                firstTask == null &&
    14                ! workQueue.isEmpty()))
    15             // 添加工作线程失败
    16             return false;
    17 
    18         for (;;) {
    19             // 获取工作线程数
    20             int wc = workerCountOf(c);
    21             // 工作线程数大于最大容量
    22             // 或在创建核心线程时,工作线程数大于核心线程数
    23             // 或在创建非核心线程时,工作线程数大于最大线程数
    24             if (wc >= CAPACITY ||
    25                 wc >= (core ? corePoolSize : maximumPoolSize))
    26                 // 添加工作线程失败
    27                 return false;
    28             // CAS自增工作线程数,成功则调到最外层循环
    29             if (compareAndIncrementWorkerCount(c))
    30                 break retry;
    31             c = ctl.get();  // Re-read ctl
    32             // 如果内存循环获取到的线程工作状态与外层循环的不一致(在执行内存循环时状态被修改)
    33             if (runStateOf(c) != rs)
    34                 // 则进入下一次内存循环
    35                 continue retry;
    36             // else CAS failed due to workerCount change; retry inner loop
    37         }
    38     }
    39 
    40     // 标记工作线程是否被启动
    41     boolean workerStarted = false;
    42     // 标记工作线程是否被添加成功
    43     boolean workerAdded = false;
    44     // 工作线程
    45     Worker w = null;
    46     try {
    47         w = new Worker(firstTask);
    48         final Thread t = w.thread;
    49         if (t != null) {
    50             final ReentrantLock mainLock = this.mainLock;
    51             // 使用可重入锁加锁
    52             mainLock.lock();
    53             try {
    54                 // 获取线程池工作状态
    55                 int rs = runStateOf(ctl.get());
    56 
    57                 // rs < SHUTDOWN:RUNNING
    58                 // rs == SHUTDOWN && firstTask为空
    59                 if (rs < SHUTDOWN ||
    60                     (rs == SHUTDOWN && firstTask == null)) {
    61                     if (t.isAlive()) // precheck that t is startable
    62                         throw new IllegalThreadStateException();
    63                     // 添加此工作线程
    64                     workers.add(w);
    65                     int s = workers.size();
    66                     // 如果工作线程数超过largestPoolSize
    67                     if (s > largestPoolSize)
    68                         // 则设置largestPoolSize=5
    69                         largestPoolSize = s;
    70                     workerAdded = true;
    71                 }
    72             } finally {
    73                 mainLock.unlock();
    74             }
    75             if (workerAdded) {
    76                 // 当满足workerAdded时,启动线程逻辑
    77                 t.start();
    78                 workerStarted = true;
    79             }
    80         }
    81     } finally {
    82         if (! workerStarted)
    83             addWorkerFailed(w);
    84     }
    85     return workerStarted;
    86 }

    其实大多数的逻辑都是对线程状态的卡控,当满足workerAdded时,启动线程逻辑(thread.start())。

  • 相关阅读:
    scheduletask任务调度(2间隔时间)
    scheduletask任务调度
    初始webservice
    ssh整合
    aop
    自定义框架(MyMvc)
    数据校验
    原生态ajax
    struts2国际化
    文件下载
  • 原文地址:https://www.cnblogs.com/bzfsdr/p/13362575.html
Copyright © 2011-2022 走看看