zoukankan      html  css  js  c++  java
  • ThreadPoolExecutor线程池进阶使用

    一、简介 
    线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为: 

    ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
    long keepAliveTime, TimeUnit unit, 
    BlockingQueue workQueue, 
    RejectedExecutionHandler handler) 
    corePoolSize: 线程池维护线程的最少数量 
    maximumPoolSize:线程池维护线程的最大数量 
    keepAliveTime: 线程池维护线程所允许的空闲时间 
    unit: 线程池维护线程所允许的空闲时间的单位 
    workQueue: 线程池所使用的缓冲队列 
    handler: 线程池对拒绝任务的处理策略 

    一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是 Runnable类型对象的run()方法。 

    当一个任务通过execute(Runnable)方法欲添加到线程池时: 

    如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。 
    如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。 
    如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。 
    如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。 

    也就是:处理任务的优先级为: 
    核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。 

    当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。 

    unit可选的参数为java.util.concurrent.TimeUnit中的几个静态属性: 
    NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。 

    workQueue我常用的是:java.util.concurrent.ArrayBlockingQueue 

    handler有四个选择: 
    ThreadPoolExecutor.AbortPolicy() 
    抛出java.util.concurrent.RejectedExecutionException异常 
    ThreadPoolExecutor.CallerRunsPolicy() 
    重试添加当前的任务,他会自动重复调用execute()方法 
    ThreadPoolExecutor.DiscardOldestPolicy() 
    抛弃旧的任务 
    ThreadPoolExecutor.DiscardPolicy() 
    抛弃当前的任务 

    二、一般用法举例 

    1. package demo;
    2. import java.io.Serializable;
    3. import java.util.concurrent.ArrayBlockingQueue;
    4. import java.util.concurrent.ThreadPoolExecutor;
    5. import java.util.concurrent.TimeUnit;
    6. public class TestThreadPool2
    7. {
    8.     private static int produceTaskSleepTime = 2;
    9.     private static int produceTaskMaxNumber = 10;
    10.     public static void main(String[] args)
    11.     {
    12.         // 构造一个线程池
    13.         ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),
    14.                 new ThreadPoolExecutor.DiscardOldestPolicy());
    15.         for (int i = 1; i <= produceTaskMaxNumber; i++)
    16.         {
    17.             try
    18.             {
    19.                 // 产生一个任务,并将其加入到线程池
    20.                 String task = "task@ " + i;
    21.                 System.out.println("put " + task);
    22.                 threadPool.execute(new ThreadPoolTask(task));
    23.                 // 便于观察,等待一段时间
    24.                 Thread.sleep(produceTaskSleepTime);
    25.             }
    26.             catch (Exception e)
    27.             {
    28.                 e.printStackTrace();
    29.             }
    30.         }
    31.     }
    32. }
    33. /**
    34.  * 线程池执行的任务
    35.  */
    36. class ThreadPoolTask implements Runnable, Serializable
    37. {
    38.     private static final long serialVersionUID = 0;
    39.     private static int consumeTaskSleepTime = 2000;
    40.     // 保存任务所需要的数据
    41.     private Object threadPoolTaskData;
    42.     ThreadPoolTask(Object tasks)
    43.     {
    44.         this.threadPoolTaskData = tasks;
    45.     }
    46.     public void run()
    47.     {
    48.         // 处理一个任务,这里的处理方式太简单了,仅仅是一个打印语句
    49.         System.out.println(Thread.currentThread().getName());
    50.         System.out.println("start .." + threadPoolTaskData);
    51.         try
    52.         {
    53.             // //便于观察,等待一段时间
    54.             Thread.sleep(consumeTaskSleepTime);
    55.         }
    56.         catch (Exception e)
    57.         {
    58.             e.printStackTrace();
    59.         }
    60.         threadPoolTaskData = null;
    61.     }
    62.     public Object getTask()
    63.     {
    64.         return this.threadPoolTaskData;
    65.     }
    66. }

    另一个例子: 

    1. package demo;
    2. import java.util.Queue;
    3. import java.util.concurrent.ArrayBlockingQueue;
    4. import java.util.concurrent.ThreadPoolExecutor;
    5. import java.util.concurrent.TimeUnit;
    6. public class ThreadPoolExecutorTest
    7. {
    8.     private static int queueDeep = 4;
    9.     public void createThreadPool()
    10.     {
    11.         /*
    12.          * 创建线程池,最小线程数为2,最大线程数为4,线程池维护线程的空闲时间为3秒,
    13.          * 使用队列深度为4的有界队列,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,
    14.          * 然后重试执行程序(如果再次失败,则重复此过程),里面已经根据队列深度对任务加载进行了控制。
    15.          */
    16.         ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 4, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueDeep),
    17.                 new ThreadPoolExecutor.DiscardOldestPolicy());
    18.         // 向线程池中添加 10 个任务
    19.         for (int i = 0; i < 10; i++)
    20.         {
    21.             try
    22.             {
    23.                 Thread.sleep(1);
    24.             }
    25.             catch (InterruptedException e)
    26.             {
    27.                 e.printStackTrace();
    28.             }
    29.             while (getQueueSize(tpe.getQueue()) >= queueDeep)
    30.             {
    31.                 System.out.println("队列已满,等3秒再添加任务");
    32.                 try
    33.                 {
    34.                     Thread.sleep(3000);
    35.                 }
    36.                 catch (InterruptedException e)
    37.                 {
    38.                     e.printStackTrace();
    39.                 }
    40.             }
    41.             TaskThreadPool ttp = new TaskThreadPool(i);
    42.             System.out.println("put i:" + i);
    43.             tpe.execute(ttp);
    44.         }
    45.         tpe.shutdown();
    46.     }
    47.     private synchronized int getQueueSize(Queue queue)
    48.     {
    49.         return queue.size();
    50.     }
    51.     public static void main(String[] args)
    52.     {
    53.         ThreadPoolExecutorTest test = new ThreadPoolExecutorTest();
    54.         test.createThreadPool();
    55.     }
    56.     class TaskThreadPool implements Runnable
    57.     {
    58.         private int index;
    59.         public TaskThreadPool(int index)
    60.         {
    61.             this.index = index;
    62.         }
    63.         public void run()
    64.         {
    65.             System.out.println(Thread.currentThread() + " index:" + index);
    66.             try
    67.             {
    68.                 Thread.sleep(3000);
    69.             }
    70.             catch (InterruptedException e)
    71.             {
    72.                 e.printStackTrace();
    73.             }
    74.         }
    75.     }
    76. }
  • 相关阅读:
    年轻人的第一个 Spring Boot 应用,太爽了!
    面试问我 Java 逃逸分析,瞬间被秒杀了。。
    Spring Boot 配置文件 bootstrap vs application 到底有什么区别?
    坑爹的 Java 可变参数,把我整得够惨。。
    6月来了,Java还是第一!
    Eclipse 最常用的 10 组快捷键,个个牛逼!
    Spring Cloud Eureka 自我保护机制实战分析
    今天是 Java 诞生日,Java 24 岁了!
    厉害了,Dubbo 正式毕业!
    Spring Boot 2.1.5 正式发布,1.5.x 即将结束使命!
  • 原文地址:https://www.cnblogs.com/jianwei-dai/p/6867362.html
Copyright © 2011-2022 走看看