zoukankan      html  css  js  c++  java
  • 稍稍解读下ThreadPoolExecutor

    # 说说ThreadPoolExecutor


    ## 认识
    先来看看它所在的架构体系:
    ```java
    package java.util.concurrent;
    public interface Executor { void execute(Runnable command); }
    public interface ExecutorService extends Executor {
    //新加一些方法
    }
    public abstract class AbstractExecutorService implements ExecutorService {
    //新加一些方法,以及一些方法的基本实现
    }
    public class ThreadPoolExecutor extends AbstractExecutorService {
    //新加一些方法,以及继承方法的实现
    }
    ```
    - `Executor`接口就一个方法,用来执行Runnable。官方的说法是,将 `任务的执行``线程` 解耦和。
    - `ExecutorService`接口,继承了`Executor`,同时添加了一些管理任务的方法,如submit/invokeAll/invokeAny/shutdown/shutdownNow 等。
    - `AbstractExecutorService`抽象类,实现了`ExecutorService`,提供了默认的一些实现,并添加了三个工具方法。见下图:
    - ![AbstractExecutorService.jpg](img/AbstractExecutorService.jpg)
    -
    - `ThreadPoolExecutor`类,直接可用的类,相对来说很复杂,也是本文的重点。
    ## 构造
    先从构造方法入手。
    ![ThreadPoolExecutorConstructor.jpg](img/ThreadPoolExecutorConstructor.jpg)

    由上图可见,有4个构造方法,大同小异。具体如下:
    ```java
    public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
    Executors.defaultThreadFactory(), defaultHandler);
    }
    public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
    threadFactory, defaultHandler);
    }
    public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
    Executors.defaultThreadFactory(), handler);
    }
    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
    maximumPoolSize <= 0 ||
    maximumPoolSize < corePoolSize ||
    keepAliveTime < 0)
    throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
    throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
    null :
    AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
    }
    ```
    可见,本质上都是在调用最后一个。它的这些参数的含义如下:
    - `corePoolSize`: 注意,这是核心线程池的尺寸,但并不是初始化时线程数。当提交的任务数量少于这个数值时,不会经过`workQueue`中转,直接创建新线程来执行。如果线程数量多于这个数值,且线程空闲(任务执行完毕)达到指定时间(keepAliveTime),则会干掉多出来的线程。 -- **需要结合下面的参来理解**
    - `maximumPoolSize`: 这个很好理解,就是`线程池`所能提供的最大线程数量。
    - `keepAliveTime`: 线程空闲时的存活时间 - 参考`corePoolSize`
    - `unit`: 存活时间的时间单位。
    - `workQueue`: 工作队列 - 其实是任务队列。注意,可以使用有界队列,如ArrayBlockingQueue,也可以使用无界队列,如LinkedBlockingQueue。区别在于,能够接收有限/无限的任务。
    - `threadFactory`: 线程工厂,提供线程用的。注意,不一定负责线程的管理,仅负责提供线程!线程的管理可能是ExecutorService负责的。
    - `handler`: 任务被拒绝(执行或者提交)时的处理器。
    ## 怎么使用?
    实际上,我们很少直接使用这个类,更多的时候是使用`Executors`这个工厂类的`#newFixedThreadPool(int nThreads)`,源码如下:
    ```java
    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
    }
    ```
    对比前面的构造参数,很容易理解。
    1. `corePoolSize``maximumPoolSize`相同 - 所以线程数量是一定的(当然开始的时候肯定不是,从0开始,见前面`corePoolSize`的说明部分)。
    2. `keepAliveTime`**0**,所以没有等待时间(实际上这个参数用不到,因为最后的线程数量是一定的 - 如果都使用了)。
    3. `workQueue``new LinkedBlockingQueue<Runnable>()`,所以这是无界队列,可以接收任意多任务。
    这样使用的话,`threadFactory``handler`都是默认的,前者是 `Executors.defaultThreadFactory()`,看源码仅是在创建线程的时候添加了"线程组"、"线程名";后者是 `ThreadPoolExecutor.defaultHandler` 。源码如下:
    ```java
    //Executors.defaultThreadFactory() {return new DefaultThreadFactory();}
    static class DefaultThreadFactory implements ThreadFactory {
    // ...

    public Thread newThread(Runnable r) {
    Thread t = new Thread(group, r,
    namePrefix + threadNumber.getAndIncrement(),
    0);
    if (t.isDaemon())
    t.setDaemon(false);
    if (t.getPriority() != Thread.NORM_PRIORITY)
    t.setPriority(Thread.NORM_PRIORITY);
    return t;
    }
    // ...
    }

    //ThreadPoolExecutor.defaultHandler
    private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); //什么都不做,直接抛出异常!
    ```
    ## 验证
    我们来验证下:
    1. `corePoolSize``workQueue`的关系 - 也就是前面提到的“当提交的任务数量少于`corePoolSize`时,直接开启新线程,不经过`workQueue`”。
    2. 当任务无法提交或执行时,直接抛出异常!
    ```java
    @Test
    public void testRejectionHandler(){
    int corePoolSize = 2; //提交任务时,如果线程数低于2,则创建新线程。完成后则会始终维持最少2个线程。
    int maximumPoolSize = 5; //线程池最多允许5个线程存在。-和workQueue什么关系呢?
    long keepAliveTime = 5; // 线程数量超过corePoolSize时,如果空闲了,那会空闲多久。
    TimeUnit timeUnit = TimeUnit.SECONDS;

    // TODO 注意,ArrayBlockingQueue是有界队列,还可以用LinkedBlockingQueue 无界队列 - 就是可以submit无限任务!
    ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);//TODO 该队列仅hold由execute方法提交的Runnable tasks。submit会调用execute!
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, workQueue);
    Runnable runnable = () -> {
    System.out.println(Thread.currentThread().getName());
    try{
    Thread.sleep(1000L * 1000); //任务一直进行,这样线程就一直不释放
    } catch(InterruptedException e){
    e.printStackTrace();
    }
    };
    try{
    for(int i = 0; i < 10; i++){ //提交10个任务
    System.out.println(i);
    Future<?> submit = threadPoolExecutor.submit(runnable);
    //submit后,Queue里可能有内容,也可能没有 - 可能已经移交给worker线程了!
    //FIXME 下面队列用法不对。因为 只有在线程数少于corePoolSize时,才不走队列。
    BlockingQueue<Runnable> queue = threadPoolExecutor.getQueue();
    System.out.println("queue size: " + queue.size());
    System.out.println("queue.remainingCapacity: " + queue.remainingCapacity()); // 这个可能有用
    // queue.take();//wait until
    System.out.println("queue.peek: " + queue.peek());//奇怪,总是同一个,难道需要同步?
    // queue.poll();//retrive and remove head
    }
    System.out.println("----a");
    threadPoolExecutor.execute(runnable); //嗯?什么时候用这个比较好?
    System.out.println("----b");
    } catch(Exception e){
    System.out.println(threadPoolExecutor.isShutdown());
    System.out.println(threadPoolExecutor.isTerminated());
    System.out.println(threadPoolExecutor.isTerminating());
    while(true){
    try{
    System.out.println(threadPoolExecutor.getQueue().take());
    } catch(InterruptedException e1){
    e1.printStackTrace();
    }
    }
    }
    try{
    threadPoolExecutor.awaitTermination(1L, TimeUnit.HOURS);
    } catch(InterruptedException e){
    e.printStackTrace();
    }
    }
    ```
    执行结果如下:
    ```txt
    0
    queue size: 0
    queue.remainingCapacity: 2
    queue.peek: null
    1
    pool-1-thread-1
    queue size: 0
    queue.remainingCapacity: 2
    queue.peek: null
    2
    queue size: 1
    pool-1-thread-2
    queue.remainingCapacity: 1
    queue.peek: java.util.concurrent.FutureTask@3fb4f649
    3
    queue size: 2
    queue.remainingCapacity: 0
    queue.peek: java.util.concurrent.FutureTask@3fb4f649
    4
    queue size: 2
    queue.remainingCapacity: 0
    queue.peek: java.util.concurrent.FutureTask@3fb4f649
    5
    pool-1-thread-3
    queue size: 2
    queue.remainingCapacity: 0
    queue.peek: java.util.concurrent.FutureTask@3fb4f649
    6
    pool-1-thread-4
    queue size: 2
    queue.remainingCapacity: 0
    queue.peek: java.util.concurrent.FutureTask@3fb4f649
    7
    pool-1-thread-5
    false
    false
    false
    ```

    能够看到,前面2个任务提交的时候,队列中并没有内容,之后则一直有内容。很好的验证了第一条。
    最后5行,则是异常后输出的内容,可以看出`threadPoolExecutor`仍在进行,但新提交的任务则被拒绝执行。- 其实这里应该吧try-catch放到循环里面,这样可以看到后续的提交都失败了。
    感兴趣的可以自己试一下,同时输出下e.printStackTrace()。
    ## 其他
    其实`ThreadPoolExecutor`的状态设计非常赞,不过那是另外的事了。
    套用网友一句话,“李大爷设计的api,很巧妙,处处是坑”,哈哈。
  • 相关阅读:
    git打补丁、还原补丁
    mysql 查两个表相同的值
    系统更新后vs2012无法打开方案资源管理器
    Node.js之Buffer
    html元素固定
    在windows上用netsh动态配置端口转发
    Git忽略规则及.gitignore规则不生效的解决办法
    MySQL5.7.10 初始化失败error
    Nginx和PHP-FPM的启动、重启、停止脚本分享
    centos添加nginx为系统服务
  • 原文地址:https://www.cnblogs.com/larryzeal/p/9063620.html
Copyright © 2011-2022 走看看