zoukankan      html  css  js  c++  java
  • 深入理解java线程池

    1、线程池解决的问题

    在线程的使用过程中,直接创建一个线程来执行,这样实现起来非常简单方便,但是会存在问题:

    1.如果并发的线程数量太多,并且每个线程都是执行很短一个时间就会结束,这样频繁的创建线程会大大降低系统的效率,因为创建线程和摧毁线程需要时间。

    2.另一方面如果每个线程都需要执行很长时间消耗很多的资源,如果频繁不加控制的创建线程会造成系统资源耗尽。

    java中可以用线程池来解决上面的问题,使用线程池主要有以下的意义:

    1.资源的控制,如并发量限制,像连接池这种对数据库资源的保护

    2.资源的有效利用,如线程的复用,避免频繁创建线程和线程上下文切换

    2、Java中的ThreadPoolExecutor类

    java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。

    在ThreadPoolExecutor类中提供了四个构造方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public class ThreadPoolExecutor extends AbstractExecutorService {
        .....
        public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
                BlockingQueue<Runnable> workQueue);
     
        public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
                BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
     
        public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
                BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
     
        public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
        ...
    }

    从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。

    下面解释下一下构造器中各个参数的含义:

    • corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;(解释:假如有一个工厂,工厂里面有10个工人,每个工人同时只能做一件任务。因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;当10个工人都有任务在做时,如果还来了任务,就把任务进行排队等待;如果说新任务数目增长的速度远远大于工人做任务的速度,那么此时工厂主管可能会想补救措施,比如重新招4个临时工人进来;然后就将任务也分配给这4个临时工人做;如果说着14个工人做任务的速度还是不够,此时工厂主管可能就要考虑不再接收新的任务或者抛弃前面的一些任务了。当这14个工人当中有人空闲时,而新任务增长的速度又比较缓慢,工厂主管可能就考虑辞掉4个临时工了,只保持原来的10个工人,毕竟请额外的工人是要花钱的。这个例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。也就是说corePoolSize就是线程池大小,maximumPoolSize在我看来是线程池的一种补救措施,即任务量突然过大时的一种补救措施。

    • maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
    • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
    • unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
    1
    2
    3
    4
    5
    6
    7
    TimeUnit.DAYS;               //天
    TimeUnit.HOURS;             //小时
    TimeUnit.MINUTES;           //分钟
    TimeUnit.SECONDS;           //秒
    TimeUnit.MILLISECONDS;      //毫秒
    TimeUnit.MICROSECONDS;      //微妙
    TimeUnit.NANOSECONDS;       //纳秒
    • workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
    1
    2
    3
    ArrayBlockingQueue;
    LinkedBlockingQueue;
    SynchronousQueue;

    ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。

    • threadFactory:线程工厂,主要用来创建线程;
    • handler:表示当拒绝处理任务时的策略,有以下四种取值:
    1
    2
    3
    4
    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

    具体参数的配置与线程池的关系将在下一节讲述。

    从上面给出的ThreadPoolExecutor类的代码可以知道,ThreadPoolExecutor继承了AbstractExecutorService,我们来看一下AbstractExecutorService的实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public abstract class AbstractExecutorService implements ExecutorService {
     
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
        public Future<?> submit(Runnable task) {};
        public <T> Future<T> submit(Runnable task, T result) { };
        public <T> Future<T> submit(Callable<T> task) { };
        private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                                boolean timed, long nanos)
            throws InterruptedException, ExecutionException, TimeoutException {
        };
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException {
        };
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                               long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
        };
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
        };
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                             long timeout, TimeUnit unit)
            throws InterruptedException {
        };
    }

    AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。

    我们接着看ExecutorService接口的实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public interface ExecutorService extends Executor {
     
        void shutdown();
        boolean isShutdown();
        boolean isTerminated();
        boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;
        <T> Future<T> submit(Callable<T> task);
        <T> Future<T> submit(Runnable task, T result);
        Future<?> submit(Runnable task);
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException;
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                      long timeout, TimeUnit unit)
            throws InterruptedException;
     
        <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;
        <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                        long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }

    而ExecutorService又是继承了Executor接口,我们看一下Executor接口的实现:

    1
    2
    3
    public interface Executor {
        void execute(Runnable command);
    }

    到这里,大家应该明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系了。

    Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;

    然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

    抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;

    然后ThreadPoolExecutor继承了类AbstractExecutorService。

    在ThreadPoolExecutor类中有几个非常重要的方法:

    1
    2
    3
    4
    execute()
    submit()
    shutdown()
    shutdownNow()

    execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。

    submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。

    shutdown()和shutdownNow()是用来关闭线程池的。

    还有很多其他的方法:

    比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法,有兴趣的朋友可以自行查阅API。

     3、ThreadPoolExecutor源码分析(转自 http://www.cnblogs.com/leesf456/p/5585627.html)

    3.1 类的继承关系

    public class ThreadPoolExecutor extends AbstractExecutorService {}

      说明:ThreadPoolExecutor继承自AbstractExecutorService,AbstractExecuetorService提供了ExecutorService执行方法的默认实现。

      3.2 类的内部类

      ThreadPoolExecutor的核心内部类为Worker,其对资源进行了复用,减少创建线程的开销,还有若干个策略类。内部类的类图如下

      说明:可以看到Worker继承了AQS抽象类并且实现了Runnable接口,其是ThreadPoolExecutor的核心内部类。而对于AbortPolicy,用于被拒绝任务的处理程序,它将抛出 RejectedExecutionException、CallerRunsPolicy,用于被拒绝任务的处理程序,它直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务、DiscardPolicy,用于被拒绝任务的处理程序,默认情况下它将丢弃被拒绝的任务、DiscardOldestPolicy,用于被拒绝任务的处理程序,它放弃最旧的未处理请求,然后重试 execute;如果执行程序已关闭,则会丢弃该任务。这些都是拒绝任务提交时的所采用的不同策略。

      ① Worker类

      1. 类的继承关系  

    private final class Worker 
        extends AbstractQueuedSynchronizer 
        implements Runnable {}

       说明:Worker继承了AQS抽象类,其重写了AQS的一些方法,并且其也可作为一个Runnable对象,从而可以创建线程Thread。

      2. 类的属性

     View Code

      说明:Worker属性中比较重要的属性如下,Thread类型的thread属性,用来封装worker(因为worker为Runnable对象),表示一个线程;Runnable类型的firstTask,其表示该worker所包含的Runnable对象,即用户自定义的Runnable对象,完成用户自定义的逻辑的Runnable对象;volatile修饰的long类型的completedTasks,表示已完成的任务数量。

      3. 类的构造函数

     View Code

      说明:用于构造一个worker对象,并设置AQS的state为-1,同时初始化了对应的域。

      4. 核心函数分析

     View Code

      说明:Worker的函数主要是重写了AQS的相应函数和重写了Runnable的run函数,重写的函数比较简单,具体的可以参见AQS的分析,这里不再累赘。

      3.3 类的属性  

     View Code

      说明:这里着重讲解一下AtomicInteger类型的ctl属性,ctl为线程池的控制状态,用来表示线程池的运行状态(整形的高3位)和运行的worker数量(低29位)),其中,线程池的运行状态有如下几种

    复制代码
        /**
        * RUNNING    :    接受新任务并且处理已经进入阻塞队列的任务
        * SHUTDOWN    :    不接受新任务,但是处理已经进入阻塞队列的任务
        * STOP        :    不接受新任务,不处理已经进入阻塞队列的任务并且中断正在运行的任务
        * TIDYING    :    所有的任务都已经终止,workerCount为0, 线程转化为TIDYING状态并且调用terminated钩子函数
        * TERMINATED:    terminated钩子函数已经运行完成
        **/
        private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;
    复制代码

      说明:由于有5种状态,最少需要3位表示,所以采用的AtomicInteger的高3位来表示,低29位用来表示worker的数量,即最多表示2^29 - 1。

      3.4 类的构造函数

      1. ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue<Runnable>)型构造函数

     View Code

      说明:该构造函数用给定的初始参数和默认的线程工厂及被拒绝的执行处理程序创建新的 ThreadPoolExecutor。

       2. ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue<Runnable>, ThreadFactory)型构造函数  

     View Code

      说明:该构造函数用给定的初始参数和默认被拒绝的执行处理程序创建新的 ThreadPoolExecutor

       3. ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue<Runnable>, RejectedExecutionHandler)型构造函数

     View Code

      说明:该构造函数用给定的初始参数和默认的线程工厂创建新的 ThreadPoolExecutor

       4. ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue<Runnable>, ThreadFactory, RejectedExecutionHandler)型构造函数

     View Code

      说明:该构造函数用给定的初始参数创建新的 ThreadPoolExecutor,其他的构造函数都会调用到此构造函数。

      3.5 核心函数分析

      1. execute函数  

     View Code

       说明:当在客户端调用submit时,之后会间接调用到execute函数,其在将来某个时间执行给定任务,此方法中并不会直接运行给定的任务。此方法中主要会调用到addWorker函数,其中,addWorker函数源码如下  

     View Code

      说明:此函数可能会完成如下几件任务

      ① 原子性的增加workerCount。

      ② 将用户给定的任务封装成为一个worker,并将此worker添加进workers集合中。

      ③ 启动worker对应的线程,并启动该线程,运行worker的run方法。

      ④ 回滚worker的创建动作,即将worker从workers集合中删除,并原子性的减少workerCount。

      2. runWorker函数  

     View Code

      说明:此函数中会实际执行给定任务(即调用用户重写的run方法),并且当给定任务完成后,会继续从阻塞队列中取任务,直到阻塞队列为空(即任务全部完成)。在执行给定任务时,会调用钩子函数,利用钩子函数可以完成用户自定义的一些逻辑。在runWorker中会调用到getTask函数和processWorkerExit钩子函数,其中,getTask函数源码如下  

     View Code

      说明:此函数用于从workerQueue阻塞队列中获取Runnable对象,由于是阻塞队列,所以支持有限时间等待(poll)和无限时间等待(take)。在该函数中还会响应shutDown和、shutDownNow函数的操作,若检测到线程池处于SHUTDOWN或STOP状态,则会返回null,而不再返回阻塞队列中的Runnalbe对象。

      processWorkerExit函数是在worker退出时调用到的钩子函数,而引起worker退出的主要因素如下

      ① 阻塞队列已经为空,即没有任务可以运行了。

      ② 调用了shutDown或shutDownNow函数

      processWorkerExit的源码如下  

     View Code

      说明:此函数会根据是否中断了空闲线程来确定是否减少workerCount的值,并且将worker从workers集合中移除并且会尝试终止线程池。

      3. shutdown函数  

     View Code

      说明:此函数会按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务。首先会检查是否具有shutdown的权限,然后设置线程池的控制状态为SHUTDOWN,之后中断空闲的worker,最后尝试终止线程池。
    尝试终止线程池tryTerminate的源码如下

     View Code

      说明:如果线程池的状态为SHUTDOWN并且线程池和阻塞队列都为空或者状态为STOP并且线程池为空,则将线程池控制状态转化为TERMINATED;否则,将中断一个空闲的worker,其中,interruptIdleWorkers的源码如下 

     View Code

      说明:此函数将会中断正在等待任务的空闲worker。

      shutdownNow函数与shutdown函数相似,shutdownNow会尝试停止所有的活动执行任务、暂停等待任务的处理,并返回等待执行的任务列表,但是其会终止所有的worker,而并非空闲的worker。

      对于其他的函数,有兴趣的读者可以自行分析,下面通过一个示例来详细讲解ThreadPoolExecutor的内部工作机制。

    四、示例

      通过上面的分析,对于一些重要的函数有了一个整体的认识,下面通过一个示例,看看这些函数之间是如何串联起来的,并且分析分析ThreadPoolExecutor的工作机制。

     View Code

      运行结果(某一次)

     View Code

      说明:在程序中,使用了一个FixedThreadPool线程池(即corePoolSize与maximumPoolSize相等,且为2),之后在线程池提交了3个线程(Runnalbe对象),之后调用了shutdown来关闭线程池。

      ① 执行es.submit(mr1),其主要的函数调用如下

      说明:在调用了es.submit(mr1)后,最终线程池中会新建一个worker,并且此时workQueue阻塞队列为空(没有元素),并且值得注意的是,在runWorker函数中,有一个while循环,当某个任务完成后,会从workQueue阻塞队列中取下一个任务。

      ② 执行es.submit(mr2),其主要的函数调用与执行es.submit(mr1)相同,但是此时的线程池状态有所不同,其状态如下

      说明:此时,线程池会有两个worker,两个worker会分别封装mr1和mr2,并且workQueue阻塞队列还是为空(没有元素)。

      ③ 执行es.submit(mr3),其主要的函数调用如下

      说明:此时,由于线程池的worker的数量已经达到了corePoolSize大小,所以,此时会将mr3放入到workQueue阻塞队列中,此时,线程池还是只有两个worker,并且阻塞队列已经存在一个mr3元素。

      ④ mr2定义的逻辑运行完成,则会从workQueue中取下一个任务(mr3)。主要的函数调用如下(从runWorker开始)

      说明:此时,会运行用户再mr3中自定义的逻辑。此时,线程池中还是有两个worker,并且workQueue的大小为0,没有元素。

      ⑤ mr1定义的逻辑运行完成,则还是会从workQueue中取下一个任务(null)。主要的函数调用如下(从runWorker开始)

      说明:此时,由于是阻塞队列,并且队列中没有元素,所以调用take会使当前线程(worker对应的Thread)被阻塞。

      ⑥ mr3定义的逻辑运行完成,其过程和mr1完成时相同,会使另外一个worker对应的Thread被阻塞。

      ⑦ 执行es.shutdown,则主要的函数调用如下

      说明:在执行shutdown后,会中断两个worker对应的Thread线程。由于中断了worker对应的Thread线程,则之前由于take操作(响应中断)而阻塞也会被中断。

      ⑧ 其中一个worker对应的线程响应中断,从getTask函数开始(因为在getTask中被阻塞)。

      说明:此时,在getTask函数中,会将workerCount的值减一,并且返回null。接着在runWorker函数中退出while循环,并进入processWorkerExit函数进行worker退出线程池的处理,之后会再次调用addWorker,但是此时,不会添加成功。此时,线程池只有一个worker,并且workQueue的大小还是为0。

      ⑨ 另外一个worker对应的线程响应中断,从getTask函数开始(因为在getTask中被阻塞)。与上一个worker的处理过程相同,不再累赘。线程池的状态如下

      说明:之后整个程序就运行结束了,最后的状态为workQueue阻塞队列大小为0,线程池没有worker,workerCount为0。

      最后,给出ThreadPoolExecutor的示意图

      说明:用户自定义的任务会进入阻塞队列或者直接进入线程池(进入线程池后,新建线程直接运行),worker会从阻塞队列中不断的取任务,直到阻塞队列中没有任务。

      关于ThreadPoolExecutor还有如下几点需要注意的

      ① corePoolSize,表示核心大小,如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。

      ② maxPoolSzie,表示阻塞队列的大小,如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当阻塞队列满时才创建新线程。如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池(如本例的FixThreadPool)。如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。

      ③ largestPoolSize,表示曾经同时存在在线程池的worker的大小,为workers集合(维护worker)的大小。

      ④ 关于shutdown函数和shutdownNow函数的区别,shutdown会设置线程池的运行状态为SHUTDOWN,并且中断所有空闲的worker,由于worker运行时会进行相应的检查,所以之后会退出线程池,并且其会继续运行之前提交到阻塞队列中的任务,不再接受新任务。而shutdownNow则会设置线程池的运行状态为STOP,并且中断所有的线程(包括空闲和正在运行的线程),在阻塞队列中的任务将不会被运行,并且会将其转化为List<Runnable>返回给调用者,也不再接受新任务,其不会停止用户任务(只是发出了中断信号),若需要停止,需要用户自定义停止逻辑。

  • 相关阅读:
    KM匹配模板
    BestCoder 1st Anniversary 1002-1005
    SGU 106 The equation
    sgu 104 Little shop of flowers
    SGU Magic Pairs
    关于 “'sqlite3' 不是内部或外部命令.....”问题
    通过django 速成 blog
    windows 通过appache链接cgi程序
    A Lot of Games(Trie树 + 博弈)
    树的点分治 (poj 1741, 1655(树形dp))
  • 原文地址:https://www.cnblogs.com/yixianyixian/p/7722470.html
Copyright © 2011-2022 走看看