zoukankan      html  css  js  c++  java
  • ThreadPoolExecutor

           Executor框架集对线程的调度进行了封装,它把任务的提交与执行进行了解耦,同时,还提供了线程生命周期调度的所有方法,大大简化了线程调度和同步的门槛。

    1,Executor接口

           java.util.concurrent.Executor是一个接口,这个接口只定义了一个方法execute用于执行已经提交的Runnable任务。

    1 public interface Executor {
    2     void execute(Runnable command);
    3 }

           java.lang.Runnable通常用于封装一段可执行代码,从java8开始,它甚至可以被一段lambda表达式代替。 

           为什么要提供Executor#execute来调用Runnable呢?直接使用Runnable#run不能满足需求吗?这是因为,Runnable可以被认为是业务规定好的一段执行逻辑,但它究竟要如何执行,什么情况下执行,还未确定。Executor#execute提供了很好的灵活性。

           利用这个接口的各种实现,可以实现类似于js编程中的回调、链式编程等代码风格。

    2,ExecutorService

           java.util.concurrent.ExecutorService接口继承自Executor,作为一个service,它提供了一系列对Executor的生命周期的管理。

           它提供了一系列方法来生成和管理Future,Futurn则用于跟踪异步任务的处理流程。

    ExecutorService包含的方法有:

    方法

    功能

    shutdown()

      有序完成所有任务,不再接受新任务;

      如果ExecutorService已经关闭,那么调用该方法不会起任何效果。

      这个方法只是将线程池状态设置为SHUTDOWN状态,同时会尝试执行完等待队列中剩下的任务

    shutdownNow()

      立刻尝试关闭所有正在执行的任务,停止等待中的任务处理,返回等待任务列表

      这个方法将线程池的状态设置为STOP,正在执行的任务则被停止,没有被执行的任务则被返回  

    isShutDown()

      返回true说明已经关闭

    isTerminated()

      返回true说明执行关闭后,所有任务都已完成

    awaitTermination(long, TimeUnit)

      阻塞指定的时长,以在关闭后,等待所有任务完成

    submit(Callable):Future

      提交一个带返回值的任务,Callable用于执行;

      返回一个代表该任务未来结果的Future对象

      Future的get方法,在任务成功后会返回结果

      注意,submit不会阻塞当前线程,当时Futurn.get()会阻塞

    submit(Runnable, T):Future

      提交一个不需要返回值的任务Runnable并且返回Future

      如果执行成功,那么Future#get会返回参数T

    submit(Runnable)Futurn

      提交一个不需要返回值的任务Runnable并且返回Future

      如果执行成功,那么Future#返回null

    invokeAll(Collection<Callable>):List<Future>

      执行提供的任务集合,全部任务完成后返回Future列表

      Future#isDone为true时表示对应的任务完成

    invokeAll(Collection<Callable>, long, TimeUnit);List<Future>

    执行提供的任务集合,全部任务完成后返回Future列表

      Future#isDone为true时表示对应的任务完成或超时

    invokeAny(Collection<Callable>)

      执行给定的任务集合,返回第一个执行完成结果;

      注意:该方法的处理过程中,集合不建议修改,否则返回结果会是null

    invokeAny(Collection<Callable>, long, TimeUnit)

      执行给定的任务集合,直到有任意任务完成,或者超时

      注意:该方法的处理过程中,集合不建议修改,否则返回结果会是null

    3,ThreadPoolExecutor

           java.util.concurrent.ThreadPoolExecutor是ExecutorService的一个实现,也是最常见的线程池之一,线程池的意义在于它可以最大化的利用线程的空余时间以节省系统资源。例如有一万个任务需要异步执行,一般的CPU并没有这么大的吞吐量,而线程创建的本身又要占用额外的内存,所以利用线程池,如果有空余的线程,那么执行任务,如果没有,那么等待执行中的线程空闲,同时,用户对线程最大数量的合理控制,能够取得执行时间和内存消耗的平衡。

    ThreadPoolExecutor

     1 //运行中,可以接受新任务,并处理排队任务
     2 public static final int RUNNING = -1 << COUNT_BITS;
     3 //关闭,不在接受新任务,不过仍然会处理排队任务
     4 public static final int SHUTDOWN = 0 << COUNT_BITS;
     5 //停止,不在接受新任务,也不处理排队中的任务,同时中断处理中的任务
     6 public static final int STOP = 1 << COUNT_BITS;
     7 //整理,当所有任务终止,workerCount计数归零,线程会转换到TIDYING
     8 public static final int TIDYING = 2 << COUNT_BITS;
     9 //终止,说明terminal()方法执行完成
    10 public static final int TERMINATED = 3 << COUNT_BITS;

    ThreadPoolExecutor成员变量 

    1 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

           ctl是control的缩写,代表控制器,atomicInteger是原子化的integer,它的原子化性质由CAS来保证,ctl的特殊性在于,它同时表达了多种含义: 

    workCount:worker计数,worker的存在表示有正在处理中的任务。

    runState:运行状态,对应RUNING、SHUTDOWN、STOP、TIDYING、TERMINATED五个状态。

     1 //通过ctl获取运行状态
     2 private static int runStateOf(int c) {
     3     return c & ~CAPACITY;
     4 }
     5 //通过ctl获取worker计数
     6 private static int workerCountOf(int c) {
     7     return c & CAPACITY;
     8 }
     9 //通过workerCount获取运行状态获取control
    10 private static ctlOf(int rs, int wc) {
    11     return rs | wc;
    12 }

           CAPACITY的值为(1<<COUNT_BITS) -1 

           COUNT_BITS的值为Integer.SIZE - 3

    构造方法初始化时可以配置的成员变量

     1 //线程工厂,用于生成线程池中的工作线程
     2 private volatile ThreadFactory threadFactory;
     3 //工作线程超出maximumPoolSize时,被拒绝的任务的处理策略
     4 private volatile RejectedExecutionHander handler;
     5 //决定线程多长时间没有接到任务后可以结束
     6 private volatile long keepAliveTime;
     7 //线程池的基本大小,就算没有任务执行,线程池至少也要保持这个size
     8 //不过如果allowCoreThreadTimeOut为true,那么corePoolSize可能会0
     9 private volatile int corePoolSize;
    10 //线程池最大容量,线程数不能超过这个数量
    11 private volatile int maximunPoolSize;
    12 //曾经同时运行过线程的最大数量
    13 private int largestPoolSize;

    其他成员变量 

    1 //队列中等待处理的工作线程
    2 private final BlockingQueue<Runnable> workQueue;
    3 //所有工作线程,只有在持有lock时才会处理
    4 private final HashSet<Worker> workers = new HashSet<Worker>();

    4,execute方法 

           ExecutorService的submit方法在AbstractExecutorService重写之后,最终都会委托给execute方法来处理,execute方法主要有三个步骤:

    1)       如果当前执行的线程数小于coolPoolSize核心池容量,那么会尝试启动一个新线程来执行任务,这个过程会调用addWorker来检查运行状态和Worker容量,如果添加新的工作线程成功,那么直接返回。

    2)       如果添加工作线程失败,那么会尝试把任务放到队列中。

    3)       如果任务不能加入到队列,那么可能是线程池已经关闭或者满了,此时拒绝任务。

     1 public void execute(Runnable command) {
     2     //必须要有一个可执行的command参数
     3     if(command == null) {
     4         throw new NullPointerException;
     5     }
     6     int c = ctl.get();
     7     //如果工作线程数低于核心池容量,那么尝试在核心池添加工作线程
     8     if(workerCountOf(c) < corePoolSize) {
     9         if(addWorker(command, true)) {
    10             return;
    11         }
    12         //添加工作线程失败,有多重可能性,更新线程池状态
    13     }
    14     //线程池还处于RUNNING状态,说明线程池中线程超出了corePoolSize,需要让人物排队
    15     if(isRunning(c) && workQueue.offer(command)) {
    16         int recheck = ctl.get();
    17         //二次检查线程池,线程池状态如果不是运行态,那么一出并拒绝任务
    18         if(!isRunning(recheck) && remove(command)) {
    19             reject(command);
    20         } else if(workerCountOf(recheck) == 0) {
    21             //该分支用于工作线程数为0的时候,从队列里取出任务执行,有以下处出发情景
    22             //线程池是运行态
    23             //线程不是运行态,任务不在队列里
    24             addWorker(null, false);
    25         } else if(!addWorker(command, false)) {
    26             //该分支会在线程池处于非运行态时尝试创建工作线程
    27             reject(command);
    28         }
    29     }
    30 }

    addWorker方法 

         在execute()方法中,频繁地调用了addWorker方法,这个方法用来添加工作线程(worker),它有两个参数:

    1)       firstTask:Runnable,是一个可执行代码块,是业务代码的包装

    2)       core:boolean,true表示使用线程池核心容量作为上线,false表示使用最大容量作为上限

           addWorker的每一次调用,都会从等待队列中获取正在等待的任务来执行,当然firstTask也可以为null,在这种情况下这个方法不会添加新的任务,而是从等待队列中取出排队的任务来执行。

     1 private boolean addWorker(Runnable firstTask, boolean core) {
     2     //外层循环,确保worker正常添加
     3     retry;
     4     for(;;) {
     5         int c = ctl.get();
     6         //获取线程池运行状态
     7         int rs = runStateOf(c);
     8         //rs >= SHUTDOWN表示非RUNNING的状态
     9         //RUNNING以外的情况是不能被添加到worker的
    10         //firstTask==null,是因为workerCount=0,如果此时线程池关闭了
    11         //工作队列里有内容,那么应当视为合法可以继续执行
    12         if(rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) {
    13             return false;
    14         }
    15         for(;;) {
    16             //workCount不能超过容量,也不能超出指定池的容量
    17             //布尔值core用于确定需要校验的池容量
    18             int wc = workerCountOf(c);
    19             if(wc > CAPACITY || we >= (core ? corePoolSize : maximumPoolSize)) {
    20                 return false;
    21             }
    22             //增加worker count的数量,成功则跳出循环
    23             if(compareAndIncrementWorkerCount(c)) {
    24                 break retry;
    25             }
    26             //worker count数量没有被增加,需要反复重试
    27             c = ctl.get();
    28             if(runStateOf(c) != rs) {
    29                 continue retry;
    30             }
    31         }
    32     }
    33     Worker w = new Worker(firstTask);
    34     Thread t = w.thread;
    35     final ReentrantLock mainLock = this.mainLock;
    36     mainLock.lock();
    37     try {
    38         //持有锁后,重新检查一遍运行状态renstate,如果关闭,那么终止
    39         int c = ctl.get();
    40         int rs = runStateOf(c);
    41         if(t == null || (rs >= SHUTDOWN) && !(rs == SHUTDOWN && firstTask == null)) {
    42             decrementWorkerCount();
    43             tryTerminate();
    44             return false;
    45         }
    46         //在工作队列里添加当前worker
    47         worker.add(w);
    48         //调整处理过的最大工作线程数
    49         int s = workers.size();
    50         if(s > largestPoolSize) {
    51             largestPoolSize = s;
    52         }
    53     } finally {
    54         mainLock.unlock();
    55     }
    56     //执行worker的线程
    57     t.start();
    58     //再检查一次运行状态,如果停止且线程未中断阻塞,那么中断线程阻塞
    59     if(runStateOf(ctl.get()) == STOP && !LisInterrupted()) {
    60         t.interrupt();
    61     }
    62     return true;

         在addWorker方法执行成功后,worker会立刻执行,其执行方式为,worker.thread.start(),这时线程的启动方式,那么worker作为线程和任务的包装,他在这个过程中起到了什么作用呢? 

    5,worker的执行

           worker是AQS的一个子类,同时他又实现了Runnable接口,它的主要作在于:

    a)       实现一个非重入锁,避免工作线程在调用线程池控制方法(比如,setCorePoolSize)时,再次申请锁。

    b)       保护中断状态,让工作线程interrupt不敏感

           addWorker也是通过调用thread的start()方法来启动一个线程执行的,看上去和worker并没有关系,而事实上,新启动的线程会调用worker的run方法来执行具体逻辑。

           需要注意的是,getThreadFactory().newThread(this)会使用一个Runnable对象来创建线程,在这里,Worker本身就是这个Runnable,而非firstTask。

           因此,可以想见,Worker的run方法的实现必然是对firstTask的调用做了必要的处理,这里重点关注ThreadPoolExecutor#runWorker方法。

    6,总结

    ThreadPoolExecutor有五个状态,RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED。ThreadPoolExcutor的执行功能有execute方法提供,它负责线程池不同的状态下,催任务进行对应的处理。

    TheadPoolExecutor的关闭实现有SHUTDOWN和STOP状态来维护

    ThreadPoolExecutor.workerQueue是一个阻塞队列,由用户来决定其具体的实现。

     

  • 相关阅读:
    随笔:我为什么要写博客?
    用纯C语言写的一个植物大战僵尸的外挂
    方法:如何获取操作系统所有分区(逻辑驱动器)
    Srping syntactically incorrect.错误记录
    重建项目报错
    easyui datagrid 跨页选择
    CentOS 6编译安装ipvsadm和keepalived
    CentOS 6下ActiveMQ 5.5安装及使用MySQL
    extjs 4中TreePanel和GridPanel使用
    Linux安装性能问题
  • 原文地址:https://www.cnblogs.com/guanghe/p/13501165.html
Copyright © 2011-2022 走看看