zoukankan      html  css  js  c++  java
  • ThreadPoolExecutor

    ThreadPoolExecutor

    线程池的意义

      提升性能:它们通常在执行大量异步任务时,由于减少了每个任务的调用开销,并且它们提供了一种限制和管理资源(包括线程)的方法,使得性能提升明显;

      统计信息:每个ThreadPoolExecutor保持一些基本的统计信息,例如完成的任务数量。

    ThreadPoolExecutor系列重载方法

    源码

     1 public ThreadPoolExecutor(int corePoolSize, // 1
     2                               int maximumPoolSize,  // 2
     3                               long keepAliveTime,  // 3
     4                               TimeUnit unit,  // 4
     5                               BlockingQueue<Runnable> workQueue, // 5
     6                               ThreadFactory threadFactory,  // 6
     7                               RejectedExecutionHandler handler ) { //7
     8         if (corePoolSize < 0 ||
     9             maximumPoolSize <= 0 ||
    10             maximumPoolSize < corePoolSize ||
    11             keepAliveTime < 0)
    12             throw new IllegalArgumentException();
    13         if (workQueue == null || threadFactory == null || handler == null)
    14             throw new NullPointerException();
    15         this.corePoolSize = corePoolSize;
    16         this.maximumPoolSize = maximumPoolSize;
    17         this.workQueue = workQueue;
    18         this.keepAliveTime = unit.toNanos(keepAliveTime);
    19         this.threadFactory = threadFactory;
    20         this.handler = handler;
    21     }

    ThreadPoolExecutor参数

    参数详解

    1,corePoolSize和maximumPoolSize

      线程池执行器将会根据corePoolSize和maximumPoolSize自动地调整线程池大小。

      当在execute(Runnable)方法中提交新任务并且少于corePoolSize线程正在运行时,即使其他工作线程处于空闲状态,也会创建一个新线程来处理该请求。 如果有多于corePoolSize但小于maximumPoolSize线程正在运行,则仅当队列已满时才会创建新线程。 通过设置corePoolSize和maximumPoolSize相同,您可以创建一个固定大小的线程池。 通过将maximumPoolSize设置为基本上无界的值,例如Integer.MAX_VALUE,您可以允许池容纳任意数量的并发任务。 通常,核心和最大池大小仅在构建时设置,但也可以使用setCorePoolSizesetMaximumPoolSize进行动态更改。

    2,keepAliveTime和unit

      如果线程池当前拥有超过corePoolSize的线程,那么多余的线程在空闲时间超过keepAliveTime时会被终止。这提供了一种在不积极使用线程池时减少资源消耗的方法。如果池在以后变得更加活跃,则应构建新线程。 也可以使用方法setKeepAliveTime(long,TimeUnit)进行动态调整。

      防止空闲线程在关闭之前终止,可以使用如下方法:setKeepAliveTime(Long.MAX_VALUE,TimeUnit.NANOSECONDS);

      默认情况下,keep-alive策略仅适用于存在超过corePoolSize线程的情况。 但是,只要keepAliveTime值不为零,方法allowCoreThreadTimeOut(boolean)也可用于将此超时策略应用于核心线程。

    3,workQueue

      BlockingQueu用于存放提交的任务,队列的实际容量与线程池大小相关联。

    Direct handoffs 直接握手队列

      Direct handoffs 的一个很好的默认选择是 SynchronousQueue,它将任务交给线程而不需要保留。这里,如果没有线程立即可用来运行它,那么排队任务的尝试将失败,因此将构建新的线程。此策略在处理可能具有内部依赖关系的请求集时避免锁定。Direct handoffs 通常需要无限制的maximumPoolSizes来避免拒绝新提交的任务。 但得注意,当任务持续以平均提交速度大余平均处理速度时,会导致线程数量会无限增长问题。

    Unbounded queues 无界队列

      当所有corePoolSize线程繁忙时,使用无界队列(例如,没有预定义容量的LinkedBlockingQueue)将导致新任务在队列中等待,从而导致maximumPoolSize的值没有任何作用。当每个任务互不影响,完全独立于其他任务时,这可能是合适的; 例如,在网页服务器中, 这种队列方式可以用于平滑瞬时大量请求。但得注意,当任务持续以平均提交速度大余平均处理速度时,会导致队列无限增长问题。

    Bounded queues 有界队列

      一个有界的队列(例如,一个ArrayBlockingQueue)和有限的maximumPoolSizes配置有助于防止资源耗尽,但是难以控制。队列大小和maximumPoolSizes需要 相互权衡:

      使用大队列和较小的maximumPoolSizes可以最大限度地减少CPU使用率,操作系统资源和上下文切换开销,但会导致人为的低吞吐量。如果任务经常被阻塞(比如I/O限制),那么系统可以调度比我们允许的更多的线程。

      使用小队列通常需要较大的maximumPoolSizes,这会使CPU更繁忙,但可能会遇到不可接受的调度开销,这也会降低吞吐量。

    4,threadFactory线程工厂

      新线程使用ThreadFactory创建。 如果未另行指定,则使用Executors.defaultThreadFactory默认工厂,使其全部位于同一个ThreadGroup中,并且具有相同的NORM_PRIORITY优先级和非守护进程状态。

      通过提供不同的ThreadFactory,您可以更改线程的名称,线程组,优先级,守护进程状态等。如果ThreadCactory在通过从newThread返回null询问时未能创建线程,则执行程序将继续,但可能无法执行任何任务。

      线程应该有modifyThread权限。 如果工作线程或使用该池的其他线程不具备此权限,则服务可能会降级:配置更改可能无法及时生效,并且关闭池可能会保持可终止但尚未完成的状态。

    5,handler(Rejected tasks 拒绝任务)

      拒绝任务有两种情况:1. 线程池已经被关闭;2. 任务队列已满且maximumPoolSizes已满;

      无论哪种情况,都会调用RejectedExecutionHandler的rejectedExecution方法。预定义了四种处理策略:

      AbortPolicy:默认测策略,抛出RejectedExecutionException运行时异常;

      CallerRunsPolicy:这提供了一个简单的反馈控制机制,可以减慢提交新任务的速度;

      DiscardPolicy:直接丢弃新提交的任务;

      DiscardOldestPolicy:如果执行器没有关闭,队列头的任务将会被丢弃,然后执行器重新尝试执行任务(如果失败,则重复这一过程);

      我们可以自己定义RejectedExecutionHandler,以适应特殊的容量和队列策略场景中。

    预定义线程池解析

    1,FixedThreadPool

      适用场景:可用于Web服务瞬时削峰,但需注意长时间持续高峰情况造成的队列阻塞。

    1  public static ExecutorService newFixedThreadPool(int nThreads) {
    2         return new ThreadPoolExecutor(nThreads, nThreads,
    3                                       0L, TimeUnit.MILLISECONDS,
    4                                       new LinkedBlockingQueue<Runnable>());
    5     }

      corePoolSize与maximumPoolSize相等,即其线程全为核心线程,是一个固定大小的线程池,是其优势;

      keepAliveTime = 0 该参数默认对核心线程无效,而FixedThreadPool全部为核心线程;

      workQueue 为LinkedBlockingQueue(无界阻塞队列),队列最大值为Integer.MAX_VALUE。如果任务提交速度持续大余任务处理速度,会造成队列大量阻塞。因为队列很大,很有可能在拒绝策略前,内存溢出。是其劣势;

      FixedThreadPool的任务执行是无序的;

    2,CachedThreadPool

      适用场景:快速处理大量耗时较短的任务,如Netty的NIO接受请求时,可使用CachedThreadPool。

    1  public static ExecutorService newCachedThreadPool() {
    2         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    3                                       60L, TimeUnit.SECONDS,
    4                                       new SynchronousQueue<Runnable>());
    5     }

      corePoolSize = 0,maximumPoolSize = Integer.MAX_VALUE,即线程数量几乎无限制;

      keepAliveTime = 60s,线程空闲60s后自动结束。

      workQueue 为 SynchronousQueue 同步队列,这个队列类似于一个接力棒,入队出队必须同时传递,因为CachedThreadPool线程创建无限制,不会有队列等待,所以使用SynchronousQueue;

    3,SingleThreadExecutor

    1 public static ExecutorService newSingleThreadExecutor() {
    2         return new FinalizableDelegatedExecutorService
    3             (new ThreadPoolExecutor(1, 1,
    4                                     0L, TimeUnit.MILLISECONDS,
    5                                     new LinkedBlockingQueue<Runnable>()));
    6     }

      FixedThreadPool可以向下转型为ThreadPoolExecutor,并对其线程池进行配置,而SingleThreadExecutor被包装后,无法成功向下转型。因此,SingleThreadExecutor被定以后,无法修改,做到了真正的Single。

    4,ScheduledThreadPool

    1  public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    2         return new ScheduledThreadPoolExecutor(corePoolSize);
    3     }

      newScheduledThreadPool调用的是ScheduledThreadPoolExecutor的构造方法,而ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,构造是还是调用了其父类的构造方法。

    1  public ScheduledThreadPoolExecutor(int corePoolSize) {
    2         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
    3               new DelayedWorkQueue());
    4     }

    自定义线程池

     1 package io.guangsoft.erp;
     2 
     3 import java.util.concurrent.*;
     4 import java.util.concurrent.atomic.AtomicInteger;
     5 
     6 public class ThreadPoolTest {
     7 
     8     static class NameTreadFactory implements ThreadFactory {
     9 
    10         private final AtomicInteger mThreadNum = new AtomicInteger(1);
    11 
    12         @Override
    13         public Thread newThread(Runnable r) {
    14             Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
    15             System.out.println(t.getName() + " has been created");
    16             return t;
    17         }
    18     }
    19 
    20     public static class MyIgnorePolicy implements RejectedExecutionHandler {
    21 
    22         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    23             doLog(r, e);
    24         }
    25 
    26         private void doLog(Runnable r, ThreadPoolExecutor e) {
    27             System.err.println( r.toString() + " rejected");
    28             System.out.println("completedTaskCount: " + e.getCompletedTaskCount());
    29         }
    30     }
    31 
    32     static class MyTask implements Runnable {
    33         private String name;
    34 
    35         public MyTask(String name) {
    36             this.name = name;
    37         }
    38 
    39         @Override
    40         public void run() {
    41             try {
    42                 System.out.println(this.toString() + " is running!");
    43                 Thread.sleep(3000); //让任务执行慢点
    44             } catch (InterruptedException e) {
    45                 e.printStackTrace();
    46             }
    47         }
    48 
    49         public String getName() {
    50             return name;
    51         }
    52 
    53         @Override
    54         public String toString() {
    55             return "MyTask [name=" + name + "]";
    56         }
    57     }
    58 
    59     public static void main(String[] args) throws Exception {
    60         int corePoolSize = 2;
    61         int maximumPoolSize = 4;
    62         long keepAliveTime = 10;
    63         TimeUnit unit = TimeUnit.SECONDS;
    64         BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
    65         ThreadFactory threadFactory = new NameTreadFactory();
    66         RejectedExecutionHandler handler = new MyIgnorePolicy();
    67         ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
    68                 workQueue, threadFactory, handler);
    69         executor.prestartAllCoreThreads(); // 预启动所有核心线程
    70 
    71         for (int i = 1; i <= 10; i++) {
    72             MyTask task = new MyTask(String.valueOf(i));
    73             executor.execute(task);
    74         }
    75 
    76         System.in.read(); //阻塞主线程
    77     }
    78 }
  • 相关阅读:
    Java实现 LeetCode 343 整数拆分(动态规划入门经典)
    Java实现 LeetCode 342 4的幂
    Java实现 LeetCode 342 4的幂
    Java实现 LeetCode 342 4的幂
    Java实现 LeetCode 341 扁平化嵌套列表迭代器
    Java实现 LeetCode 341 扁平化嵌套列表迭代器
    Java实现 LeetCode 341 扁平化嵌套列表迭代器
    Java实现 LeetCode 338 比特位计数
    H264(NAL简介与I帧判断)
    分享一段H264视频和AAC音频的RTP封包代码
  • 原文地址:https://www.cnblogs.com/guanghe/p/10775353.html
Copyright © 2011-2022 走看看