zoukankan      html  css  js  c++  java
  • ThreadPoolExecutor之一:使用基本介绍

    一、concurrent包中的线程池的简单介绍

    线程池按照线程数量可以分为:一是固定线程数量的线程池;二是可变数量的线程池。

    线程池按照执行时间可以分为:一是立即执行线程池;二是延时线程池。


    ThreadPoolExecutor是ExecutorService的一个实现类,它使用可能的几个池线程之一执行每个提交的任务,通常使用 Executors 工厂方法配置。每个ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数。为了便于跨大量上下文使用,此类提供了很多可调整的参数和扩展钩子 (hook)。
    但是,强烈建议程序员使用较为方便的 Executors 工厂方法

    •  Executors.newCachedThreadPool()无界线程池,可以进行自动线程回收)--可变数量的线程池
    • Executors.newFixedThreadPool(int)固定大小线程池)                                --固定数量的线程池
    •  Executors.newSingleThreadExecutor()单个后台线程),                       --固定数量的线程池
    • Executors.newScheduledThreadPool(int corePoolSize)                                  --延迟执行线程池
    • Executors.newSingleScheduledExecutor()                                                  --延迟执行线程池[一个重要的方法就是:schedule(Runnable command, long delay, TimeUnit unit)该方法返回了一个 ScheduledFuture。]

    在JDK帮助文档中,有如此一段话:“强烈建议程序员使用较为方便的 Executors 工厂方法 Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)Executors.newSingleThreadExecutor()(单个后台线程)”它们均为大多数使用场景预定义了设置。

    二、ThreadPoolExecutor类可设置的参数

    在ThreadPoolExecutor类中提供了四个构造方法:

    public class ThreadPoolExecutor extends AbstractExecutorService {
        .....
        public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
                BlockingQueue<Runnable> workQueue);
     
        public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
                BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
     
        public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
                BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
     
        public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
        ...
    }

    如果想自己扩展实现的话,在手动配置和调整此类时,下面几个参数可以参考。

    1、corePoolSize

      核心线程数,核心线程会一直存活,即使没有任务需要处理。当线程数小于核心线程数时,即使现有的线程空闲,线程池也会优先创建新线程来处理任务,而不是直接交给现有的线程处理。

      核心线程在allowCoreThreadTimeout被设置为true时会超时退出,默认情况下不会退出。

    2、maxPoolSize

      当线程数大于或等于核心线程,且任务队列已满时,线程池会创建新的线程,直到线程数量达到maxPoolSize。如果线程数已等于maxPoolSize,且任务队列已满,则已超出线程池的处理能力,线程池会拒绝处理任务而抛出异常。

    ThreadPoolExecutor调节线程的原则是:先调整到最小线程,最小线程用完后,他会将优先将任务放入缓存队列(offer(task)),等缓冲队列用完了,才会向最大线程数调节。这似乎与我们所理解的线程池模型有点不同。我们一般采用增加到最大线程后,才会放入缓冲队列中,以达到最大性能。ThreadPoolExecutor代码段:

        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
    
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }

    线程池执行任务的规则:

        1. 当线程数小于核心线程数时,创建线程。
        2. 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
        3. 当线程数大于等于核心线程数,且任务队列已满:
        4. 3.1.若线程数小于最大线程数,创建线程
          3.2.若线程数等于最大线程数,抛出异常,拒绝任务

    3、keepAliveTime

      当线程空闲时间达到keepAliveTime,该线程会退出,直到线程数量等于corePoolSize。如果allowCoreThreadTimeout设置为true,则所有线程均会退出直到线程数量为0。

    4、allowCoreThreadTimeout

      是否允许核心线程空闲退出,默认值为false。

    5、unit

      有7种取值,在TimeUnit类中有7种静态属性:

    TimeUnit.DAYS;               //
    TimeUnit.HOURS;             //小时
    TimeUnit.MINUTES;           //分钟
    TimeUnit.SECONDS;           //
    TimeUnit.MILLISECONDS;      //毫秒
    TimeUnit.MICROSECONDS;      //微妙
    TimeUnit.NANOSECONDS;       //纳秒

    6、workQueue

      一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:

    ArrayBlockingQueue;
    LinkedBlockingQueue;
    SynchronousQueue;

    所有 BlockingQueue 都可用于传输和保持提交的任务。可以使用此队列与池大小进行交互:

    • 如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。(什么意思?如果当前运行的线程小于corePoolSize,则任务根本不会存放,添加到queue中,而是直接抄家伙(new thread)开始运行)
    • 如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程
    • 如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。

    排队有三种通用策略:

    1. 直接提交工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
    2. 无界队列使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
    3. 有界队列当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。  

    7、queueCapacity

      任务队列容量。从maxPoolSize的描述上可以看出,任务队列的容量会影响到线程的变化,因此任务队列的长度也需要恰当的设置。

    8、handler(回绝任务)

      执行回绝的场景,当 Executor 已经关闭,并且 Executor 将有限边界用于最大线程和工作队列容量,且已经饱和时,在方法execute(java.lang.Runnable) 中提交的新任务将被拒绝。在以上两种情况下,execute 方法都将调用其RejectedExecutionHandler 的 RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。下面提供了四种预定义的处理程序策略:

    A. 在默认的 ThreadPoolExecutor.AbortPolicy 中,(默认)回绝任务并抛出异常 RejectedExecutionException。
    B. 在 ThreadPoolExecutor.CallerRunsPolicy 中,调用execute()的线程自己来处理该任务,绝大部分情况下是主线程。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。

    注意:由于主线程执行这个任务,那么新到来的任务就不会被提交到线程池中执行(而是提交到TCP层的队列,TCP层队列满了,就开始拒绝,此时性能已经很低了),直到主线程执行完这个任务。
    C. 在 ThreadPoolExecutor.DiscardPolicy 中,不能被执行的任务会直接被扔掉。
    D. 在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果执行程序尚未关闭,如果executor没有被关闭,队列头部的任务将会被丢弃,然后将该任务加到队尾,然后重试执行程序(如果再次失败,则重复此过程)。
    定义和使用其他种类的 RejectedExecutionHandler 类也是可能的,但这样做需要非常小心,尤其是当策略仅用于特定容量或排队策略时。

    注意1:AbortPolicy,CallerRunsPolicy,DiscardPolicy和DiscardOldestPolicy都是rejectedExecution的一种实现。
        当然也可以自己定义个rejectedExecution实现。

     详细见:《juc线程池原理(二)

    系统负载

    参数的设置跟系统的负载有直接的关系,下面为系统负载的相关参数:

    • tasks,每秒需要处理的最大任务数量
    • tasktime,处理第个任务所需要的时间
    • responsetime,系统允许任务最大的响应时间,比如每个任务的响应时间不得超过2秒。

    钩子 (hook) 方法
        此类提供 protected是可重写的:

    protected void beforeExecute(Thread t, Runnable r) { }
    protected void afterExecute(Runnable r, Throwable t) { }
    protected void terminated() { }

     beforeExecute(java.lang.Thread, java.lang.Runnable)和 afterExecute(java.lang.Runnable, java.lang.Throwable) 方法,这两种方法分别在执行每个任务之前和之后调用。

     它们可用于操纵执行环境;例如,重新初始化 ThreadLocal、搜集统计信息或添加日志条目等。
     此外,还可以重写方法 terminated() 来执行 Executor 完全终止后需要完成的所有特殊处理。
     如果钩子 (hook) 或回调方法抛出异常,则ThreadPoolExecutor的所有线程将依次失败并突然终止。 


    终止
        如果ThreadPoolExecutor在程序中没有任何引用且没有任何活动线程,它也不会自动 shutdown。
        如果希望确保回收线程(即使用户忘记调用 shutdown()),则必须安排未使用的线程最终终止:
    设置适当保持活动时间,使用0核心线程的下边界和/或设置 allowCoreThreadTimeOut(boolean)。
     

    三、ThreadPoolExecutor的函数说明

    主要成员函数
    public void execute(Runnable command)
        在将来某个时间执行给定任务。可以在新线程中或者在现有池线程中执行该任务。 如果无法将任务提交执行,或者因为此执行程序已关闭,或者因为已达到其容量,则该任务由当前 RejectedExecutionHandler 处理。
        参数:
            command - 要执行的任务。 
        抛出:
            RejectedExecutionException - 如果无法接收要执行的任务,则由 RejectedExecutionHandler 决定是否抛出 RejectedExecutionException 
            NullPointerException - 如果命令为 null
    public void shutdown()
        按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务。如果已经关闭,则调用没有其他作用。
        抛出:
            SecurityException - 如果安全管理器存在并且关闭此 ExecutorService 可能操作某些不允许调用者修改的线程(因为它没有 RuntimePermission("modifyThread")),或者安全管理器的 checkAccess 方法拒绝访问。
    public List<Runnable> shutdownNow()
        尝试停止所有的活动执行任务、暂停等待任务的处理,并返回等待执行的任务列表。在从此方法返回的任务队列中排空(移除)这些任务。
        并不保证能够停止正在处理的活动执行任务,但是会尽力尝试。 此实现通过 Thread.interrupt() 取消任务,所以无法响应中断的任何任务可能永远无法终止。
        返回:
            从未开始执行的任务的列表。 
        抛出:
            SecurityException - 如果安全管理器存在并且关闭此 ExecutorService 
            可能操作某些不允许调用者修改的线程(因为它没有 RuntimePermission("modifyThread")),
            或者安全管理器的 checkAccess 方法拒绝访问。
    public int prestartAllCoreThreads()
        启动所有核心线程,使其处于等待工作的空闲状态。仅当执行新任务时,此操作才重写默认的启动核心线程策略。
        返回:
            已启动的线程数
    public boolean allowsCoreThreadTimeOut()
        如果此池允许核心线程超时和终止,如果在 keepAlive 时间内没有任务到达,新任务到达时正在替换(如果需要),则返回 true。当返回 true 时,适用于非核心线程的相同的保持活动策略也同样适用于核心线程。当返回 false(默认值)时,由于没有传入任务,核心线程不会终止。
        返回:
            如果允许核心线程超时,则返回 true;否则返回 false
    public void allowCoreThreadTimeOut(boolean value)
        如果在保持活动时间内没有任务到达,新任务到达时正在替换(如果需要),则设置控制核心线程是超时还是终止的策略。当为 false(默认值)时,由于没有传入任务,核心线程将永远不会中止。当为 true 时,适用于非核心线程的相同的保持活动策略也同样适用于核心线程。为了避免连续线程替换,保持活动时间在设置为 true 时必须大于 0。通常应该在主动使用该池前调用此方法。
        参数:
            value - 如果应该超时,则为 true;否则为 false 
        抛出:
            IllegalArgumentException - 如果 value 为 true 并且当前保持活动时间不大于 0。
    public boolean remove(Runnable task)
        从执行程序的内部队列中移除此任务(如果存在),从而如果尚未开始,则让其不再运行。
        此方法可用作取消方案的一部分。它可能无法移除在放置到内部队列之前已经转换为其他形式的任务。
        例如,使用 submit 输入的任务可能被转换为维护 Future 状态的形式。但是,在此情况下,purge() 方法可用于移除那些已被取消的 Future。
        参数:
            task - 要移除的任务 
        返回:
            如果已经移除任务,则返回 true
    public void purge()
        尝试从工作队列移除所有已取消的 Future 任务。此方法可用作存储回收操作,它对功能没有任何影响。
        取消的任务不会再次执行,但是它们可能在工作队列中累积,直到worker线程主动将其移除。
        调用此方法将试图立即移除它们。但是,如果出现其他线程的干预,那么此方法移除任务将失败。
    当然它还实现了的ExecutorServicesubmit系列接口

    abstract <T> Future<T> submit(Runnable task, T result) --如果执行成功就返回T result
    abstract <T> Future<T> submit(Callable<T> task)--Submits a value-returning task for execution and returns a Future representing the pending results of the task.
    abstract Future<?> submit(Runnable task) --Submits a Runnable task for execution and returns a Future representing that task.

    四、Executor 的执行逻辑

  • 相关阅读:
    nginx连接php fastcgi配置
    zabbix企业级监控概述和部署
    zabbix配置文件详解
    zabbix自定义键值原理
    ipvsadm命令
    lvs持久连接
    TCP协议的3次握手与4次挥手
    TCP协议的3次握手与4次挥手
    设计模式-模板模式
    设计模式-模板模式
  • 原文地址:https://www.cnblogs.com/duanxz/p/5050684.html
Copyright © 2011-2022 走看看