zoukankan      html  css  js  c++  java
  • Java 线程池ThreadPoolExecutor(转)

    Java 线程池 ThreadPoolExecutor.

     

    JDK1.5 开始关于多线程加了很多特性。如:

    ConcurrentHashMap: 放弃使用公用锁同步每一个方法,使用了更细化的锁机制,分离锁。对于大数据量的HashMap 同步操作效率有了较大提升。

    CopyOnWriteArrayList: 是同步 List 的一个并发替代品。其线程安全性来源于这样一个事实:只要有效的不可变对象被正确发布,那么访问它将不再需要更多的同步。在每次需要修改时它们会创建并重新发布一个信的容器拷贝,以此来实现可变性。

    增加了 Callable Future Callable runnable 的一个可选替代。我们之前用的 Runnable 是不能返回状态的,而 Callable 是可以返回状态,返回的状态保存在泛型 Future<T> 里。

    JDK1.5 里面还包含了一个重要的特性就是线程池。通过查看代码可以看出主要都是由大师 Doug Lea 来完成的。本文主要介绍线程池 ThreadPoolExecutor 的使用。

     

    JDK1.5 的线程池由 Executor 框架提供。 Executor 框架将处理请求任务的提交和它的执行解耦。可以制定执行策略。在线程池中执行线程可以重用已经存在的线程,而不是创建新的线程,可以在处理多请求时抵消线程创建、消亡产生的开销。如果线程池过大,会导致内存的高使用量,还可能耗尽资源。如果过小,会由于存在很多的处理器资源未工作,对吞吐量造成损失。

    由于内容较多没有一一研究,只看了较常用的 ThreadPoolExecutor ,所以在这里做个介绍。 ThreadPoolExecutor 的继承关系如下。

    Executor->ExecutorService->AbstractExecutorService->ThreadPoolExecutor

    核心池大小 (core pool size) 、最大池的大小 (maximum pool size) 、存活时间 (keep-alive time) 共同管理着线程的创建和销毁。

    线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为:

    /**

    * Creates a new <tt>ThreadPoolExecutor </tt> with the given initial

    * parameters.

    *

    * @param corePoolSizethenumberof threads to keep in the

    * pool, even if they are idle.

    * @param maximumPoolSize the maximum number of threads to allow in the

    * pool.

    * @param keepAliveTimewhenthenumberof threads is greater than

    * the core, this is the maximum time that excess idle threads

    * will wait for new tasks before terminating.

    * @param unit the time unit for the keepAliveTime

    * argument.

    * @param workQueuethequeueto use for holding tasks before they

    * are executed.Thisqueuewillholdonlythe<tt> Runnable </tt>

    * tasks submittedby the <tt>execute </tt> method.

    * @param handler the handler to use when executionis blocked

    * because the thread bounds and queue capacitiesarereached.

    * @throws IllegalArgumentException if corePoolSize,or

    * keepAliveTimelessthanzero,or if maximumPoolSize less than or

    * equal to zero, or if corePoolSizegreaterthanmaximumPoolSize.

    * @throws NullPointerException if <tt>workQueue </tt>

    * or <tt>handler </tt> are null.

    */

    publicThreadPoolExecutor( int corePoolSize,

    intmaximumPoolSize,

    longkeepAliveTime,

    TimeUnit unit,

    BlockingQueue<Runnable> workQueue,

    RejectedExecutionHandler handler) {

    this(corePoolSize, maximumPoolSize, keepAliveTime,unit,workQueue,

    Executors. defaultThreadFactory (), handler);

    }

    corePoolSize :线程池维护线程的最少数量,哪怕是空闲的。

    maximumPoolSize :线程池维护线程的最大数量。

    keepAliveTime :线程池维护线程所允许的空闲时间。

    unit :线程池维护线程所允许的空闲时间的单位。

    workQueue :线程池所使用的缓冲队列,改缓冲队列的长度决定了能够缓冲的最大数量。

    拒绝任务:拒绝任务是指当线程池里面的线程数量达到 maximumPoolSize workQueue 队列已满的情况下被尝试添加进来的任务。

    handler :线程池对拒绝任务的处理策略。在 ThreadPoolExecutor 里面定义了 4 handler 策略,分别是

    1. CallerRunsPolicy :这个策略重试添加当前的任务,他会自动重复调用 execute() 方法,直到成功。

    2. AbortPolicy :对拒绝任务抛弃处理,并且抛出异常。

    3. DiscardPolicy :对拒绝任务直接无声抛弃,没有异常信息。

    4. DiscardOldestPolicy :对拒绝任务不抛弃,而是抛弃队列里面等待最久的一个线程,然后把拒绝任务加到队列。

     

    一个任务通过 execute(Runnable) 方法被添加到线程池,任务就是一个 Runnable 类型的对象,任务的执行方法就是 Runnable 类型对象的 run() 方法。

    当一个任务通过 execute(Runnable) 方法欲添加到线程池时,线程池采用的策略如下:

    1. 如果此时线程池中的数量小于 corePoolSize ,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。

    2. 如果此时线程池中的数量等于 corePoolSize ,但是缓冲队列 workQueue 未满,那么任务被放入缓冲队列。

    3. 如果此时线程池中的数量大于 corePoolSize ,缓冲队列 workQueue 满,并且线程池中的数量小于 maximumPoolSize ,建新的线程来处理被添加的任务。

    4. 如果此时线程池中的数量大于 corePoolSize ,缓冲队列 workQueue 满,并且线程池中的数量等于 maximumPoolSize ,那么通过 handler 所指定的策略来处理此任务。

    处理任务的优先级为:

    核心线程 corePoolSize 、任务队列 workQueue 、最大线程 maximumPoolSize ,如果三者都满了,使用 handler 处理被拒绝的任务。当线程池中的线程数量大于 corePoolSize 时,如果某线程空闲时间超过 keepAliveTime ,线程将被终止。这样,线程池可以动态的调整池中的线程数。

    理解了上面关于 ThreadPoolExecutord 的介绍,应该就基本能了解它的一些使用,不过在 ThreadPoolExocutor 里面有个关键的 Worker 类,所有的线程都要经过 Worker 的包装。这样才能够做到线程可以复用而无需重新创建线程。

    同时 Executors 类里面有 newFixedThreadPool(),newCachedThreadPool()等几个方法,实际上也是间接调用了ThreadPoolExocutor ,不过是传的不同的构造参数。

    下面通过一个例子的执行结果来理解

     

    代码:

     

    import java.io.Serializable;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;

    public class TestThreadPool {

     private static int produceTaskSleepTime = 2;
     private static int consumeTaskSleepTime = 2000;
     private static int produceTaskMaxNumber = 9;

     public static void main(String[] args) {

      // 构造一个线程池
      ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 3, 3,
        TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2),
        new ThreadPoolExecutor.DiscardOldestPolicy());
      for (int i = 1; i <= produceTaskMaxNumber; i++) {
       try {
        // 产生一个任务,并将其加入到线程池
        String task = "task@ " + i;
        System.out.println("put " + task);
        threadPool.execute(new ThreadPoolTask(task));

        // 便于观察,等待一段时间
        Thread.sleep(produceTaskSleepTime);
       } catch (Exception e) {
        e.printStackTrace();
       }
      }
     }

     public static class ThreadPoolTask implements Runnable, Serializable {
      private static final long serialVersionUID = 0;
      // 保存任务所需要的数据
      private Object threadPoolTaskData;

      ThreadPoolTask(Object tasks) {
       this.threadPoolTaskData = tasks;
      }

      public void run() {
       // 处理一个任务,这里的处理方式太简单了,仅仅是一个打印语句
       System.out.println("start .." + threadPoolTaskData);
       try {
        // // 便于观察,等待一段时间
        Thread.sleep(consumeTaskSleepTime);
       } catch (Exception e) {
        e.printStackTrace();
       }
       threadPoolTaskData = null;
      }

      public Object getTask() {
       return this.threadPoolTaskData;
      }
     }
    }

    上面代码定义了一个corePoolSize 2 maximumPoolSize 3 workerQuene 容量为 3 的线程池,也就是说在饱和状态下,只能容纳 6 个线程, 3 个是运行状态, 3 个在队列里面。同时代码又往线程池里面添加了 9 个线程,每个线程会运行 2 秒,这样必然会到达饱和状态。而饱和状态就涉及到对拒绝任务的处理策略,本处采用了ThreadPoolExecutor.DiscardOldestPolicy()运行结果如下:

    put task@ 1

    start ..task@ 1

    put task@ 2

    start ..task@ 2

    put task@ 3

    put task@ 4

    put task@ 5

    start ..task@ 3

    put task@ 6

    put task@ 7

    put task@ 8

    put task@ 9

    start ..task@ 8

    start ..task@ 9

    采用 ThreadPoolExecutor.DiscardOldestPolicy()运行结果如下:

    put task@ 1

    start ..task@ 1

    put task@ 2

    start ..task@ 2

    put task@ 3

    put task@ 4

    put task@ 5

    start ..task@ 3

    put task@ 6

    start ..task@ 6

    start ..task@ 4

    start ..task@ 5

    put task@ 7

    start ..task@ 7

    put task@ 8

    put task@ 9

    start ..task@ 8

    start ..task@ 9

    采用ThreadPoolExecutor.AbortPolicy() 运行结果如下:

    put task@ 1

    start ..task@ 1

    put task@ 2

    start ..task@ 2

    put task@ 3

    put task@ 4

    put task@ 5

    start ..task@ 3

    put task@ 6

    java.util.concurrent.RejectedExecutionException

    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1477)

    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:384)

    at java.util.concurrent.ThreadPoolExecutor.execute( ThreadPoolExecutor.java:867)

    at TestThreadPool.main( TestThreadPool.java:22 )

    put task@ 7

    java.util.concurrent.RejectedExecutionException

    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1477)

    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:384)

    at java.util.concurrent.ThreadPoolExecutor.execute( ThreadPoolExecutor.java:867)

    at TestThreadPool.main( TestThreadPool.java:22 )

    put task@ 8

    java.util.concurrent.RejectedExecutionException

    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1477)

    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:384)

    at java.util.concurrent.ThreadPoolExecutor.execute( ThreadPoolExecutor.java:867)

    at TestThreadPool.main( TestThreadPool.java:22 )

    put task@ 9

    java.util.concurrent.RejectedExecutionException

    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1477)

    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:384)

    at java.util.concurrent.ThreadPoolExecutor.execute( ThreadPoolExecutor.java:867)

    at TestThreadPool.main( TestThreadPool.java:22 )

    start ..task@ 4

    start ..task@ 5

    采用ThreadPoolExecutor.DiscardPolicy() 运行结果如下:

    put task@ 1

    start ..task@ 1

    put task@ 2

    start ..task@ 2

    put task@ 3

    put task@ 4

    put task@ 5

    start ..task@ 3

    put task@ 6

    put task@ 7

    put task@ 8

    put task@ 9

    start ..task@ 4

    start ..task@ 5

    从运行结果可以看出不同的 Handler策略对拒绝任务的处理方式。

    目前还只偏重在使用层面的理解,底层细节的原理还有待日后学习,欢迎交流。

     

  • 相关阅读:
    类函数指针
    resource for machine learning
    蒲丰投针与蒙特卡洛模拟
    IIS5、IIS6、IIS7的ASP.net 请求处理过程比较
    CMD 命令速查手册
    Process, Thread, STA, MTA, COM object
    在托管代码中设置断点(Windbg)
    SidebySide Execution
    .NET Framework 3.5 Architecture
    Overview of the .NET Framework
  • 原文地址:https://www.cnblogs.com/androidxiaoyang/p/2750111.html
Copyright © 2011-2022 走看看