zoukankan      html  css  js  c++  java
  • ThreadPoolTaskExecutor的简单使用

    项目中,有时会需要多线程来提高处理速度。
    线程重用的核心是,它把Thread.start()给屏蔽起来了(一定不要重复调用),所以要重用Thread,就不能让Thread执行完一个任务后终止,因此就必须阻塞Thread.run()方法,让该方法不停地从任务队列中获取任务并执行。循环在跑的过程中不断检查我们是否有新加入的子Runnable对象,有就调一下我们的run(),其实就一个大run()把其它小run()#1,run()#2,…给串联起来了,基本原理就这么简单
    spring为我们提供了TaskExecutor的抽象,
    spring会默认提供一个taskExecutor的实现,但一般我们需要根据项目的需要来进行自定义。

    	在Spring发行包中预定义了一些TaskExecutor实现。有了它们,你甚至不需要再自行实现了。
    	SimpleAsyncTaskExecutor 类
    	
    	这个实现不重用任何线程,或者说它每次调用都启动一个新线程。但是,它还是支持对并发总数设限,当超过线程并发总数限制时,阻塞新的调用,直到有位置被释放。如果你需要真正的池,请继续往下看。
    	
    	SyncTaskExecutor类
    	
    	这个实现不会异步执行。相反,每次调用都在发起调用的线程中执行。它的主要用处是在不需要多线程的时候,比如简单的test case。
    	
    	ConcurrentTaskExecutor 类
    	
    	这个实现是对Java 5 java.util.concurrent.Executor类的包装。有另一个备选, ThreadPoolTaskExecutor类,它暴露了Executor的配置参数作为bean属性。很少需要使用ConcurrentTaskExecutor, 但是如果ThreadPoolTaskExecutor不敷所需,ConcurrentTaskExecutor是另外一个备选。
    	
    	SimpleThreadPoolTaskExecutor 类
    	
    	这个实现实际上是Quartz的SimpleThreadPool类的子类,它会监听Spring的生命周期回调。当你有线程池,需要在Quartz和非Quartz组件中共用时,这是它的典型用处。
    	
    	ThreadPoolTaskExecutor 类
    	
    	它不支持任何对java.util.concurrent包的替换或者下行移植。Doug Lea和Dawid Kurzyniec对java.util.concurrent的实现都采用了不同的包结构,导致它们无法正确运行。
    	
    	这个实现只能在Java 5环境中使用,但是却是这个环境中最常用的。它暴露的bean properties可以用来配置一个java.util.concurrent.ThreadPoolExecutor,把它包装到一个TaskExecutor中。如果你需要更加先进的类,比如ScheduledThreadPoolExecutor,我们建议你使用ConcurrentTaskExecutor来替代。
    	
    	TimerTaskExecutor类
    	
    	这个实现使用一个TimerTask作为其背后的实现。它和SyncTaskExecutor的不同在于,方法调用是在一个独立的线程中进行的,虽然在那个线程中是同步的。
    	
    	WorkManagerTaskExecutor类
    	
    	CommonJ 是BEA和IBM联合开发的一套规范。这些规范并非Java EE的标准,但它是BEA和IBM的应用服务器实现的共同标准
    	
    	这个实现使用了CommonJ WorkManager作为其底层实现,是在Spring context中配置CommonJ WorkManager应用的最重要的类。和SimpleThreadPoolTaskExecutor类似,这个类实现了WorkManager接口,因此可以直接作为WorkManager使用。
    

    而其中ThreadPoolTaskExecutor是最常用的一种。

    1.ThreadPoolTaskExecutor

    1.1 先看一个常用的配置项

    @Configuration
    @EnableAsync//开启异步任务的支持
    public class TaskExecutorConfig {
    
        @Bean("TaskExecutor")
        public Executor threadPoolTaskExecutor() {
            ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
            //如果池中的实际线程数小于corePoolSize,无论是否其中有空闲的线程,都会给新的任务产生新的线程
            taskExecutor.setCorePoolSize(5);
            //连接池中保留的最大连接数。
            taskExecutor.setMaxPoolSize(15);
            //queueCapacity 线程池所使用的缓冲队列
            taskExecutor.setQueueCapacity(6000);
            //强烈建议一定要给线程起一个有意义的名称前缀,便于分析日志
            taskExecutor.setThreadNamePrefix("demo Thread-");
            taskExecutor.initialize();
            return taskExecutor;
        }
    }
    

    如果在方法上添加@Async,会自动被注入使用ThreadPoolTaskExecutor作为TaskExecutor(线程池),如果配置了多个ThreadPoolTaskExecutor,可以@Async(“ThreadPoolTaskExecutor1”)来指定。

    1.2 重点概念解析

    private final BlockingQueue<Runnable> workQueue;              //任务缓存队列,用来存放等待执行的任务
    private final ReentrantLock mainLock = new ReentrantLock();   //线程池的主要状态锁,对线程池状态(比如线程池大小
                                                                  //、runState等)的改变都要使用这个锁
    private final HashSet<Worker> workers = new HashSet<Worker>();  //用来存放工作集
     
    private volatile long  keepAliveTime;    //线程存货时间   
    private volatile boolean allowCoreThreadTimeOut;   //是否允许为核心线程设置存活时间
    private volatile int   corePoolSize;     //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
    private volatile int   maximumPoolSize;   //线程池最大能容忍的线程数
     
    private volatile int   poolSize;       //线程池中当前的线程数
     
    private volatile RejectedExecutionHandler handler; //任务拒绝策略
     
    private volatile ThreadFactory threadFactory;   //线程工厂,用来创建线程
     
    private int largestPoolSize;   //用来记录线程池中曾经出现过的最大线程数
     
    private long completedTaskCount;   //用来记录已经执行完毕的任务个数
    

    corePoolSize: 线程池维护线程的最少数量

    keepAliveSeconds 线程池维护线程所允许的空闲时间

    maxPoolSize 线程池维护线程的最大数量

    queueCapacity 线程池所使用的缓冲队列

    当一个任务通过execute(Runnable)方法欲添加到线程池时:

    l 如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。

    l 如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。

    l 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。

    l 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程 maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。

    l 当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。 另外MaxPoolSize的设定如果比系统支持的线程数还要大时,会抛出java.lang.OutOfMemoryError: unable to create new native thread 异常

    1.3 拒绝策略解析

    当最大线程也满后,会使用handler来处理被拒绝的任务,默认的四种处理策略为:

    • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 是一个RuntimeException,因此会中断调用者的处理过程,为java线程池默认的阻塞策略,不执行此任务,而且直接抛出一个运行时异常,切记ThreadPoolExecutor.execute需要try catch,否则程序会直接退出。
    • ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
    • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面(最旧)的任务,然后重新尝试执行任务(重复此过程)。
    • ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 。
    • 另外还可以定义拒绝策略,这里提供一种方式:
    taskExecutor.setRejectedExecutionHandler((Runnable r, ThreadPoolExecutor executor) -> {
                        if (!executor.isShutdown()) {
                            try {
                                executor.getQueue().put(r);
                            } catch (InterruptedException e) {
                                logger.error(e.toString(), e);
                                Thread.currentThread().interrupt();
                            }
                        }
                    }
            );
    

    这里的executor.getQueue()会得到BlockingQueue,
    BlockingQueue的核心方法:
    放入数据:
      offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,
        则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)
      offer(E o, long timeout, TimeUnit unit),可以设定等待的时间,如果在指定的时间内,还不能往队列中
        加入BlockingQueue,则返回失败。
      put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断
        直到BlockingQueue里面有空间再继续.
    获取数据:
      poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,
        取不到时返回null;
      poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,
        队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
      take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到
        BlockingQueue有新的数据被加入;
      drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),
        通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
    我们可以利用put方法来阻塞调用线程,来避免默认拒绝策略的丢弃任务或者抛出异常。

    用ThreadPoolExecutor自定义线程池,看线程是的用途,如果任务量不大,可以用无界队列,如果任务量非常大,要用有界队列,防止OOM。
    如果任务量很大,还要求每个任务都处理成功,要对提交的任务进行阻塞提交,重写拒绝机制,改为阻塞提交。保证不抛弃一个任务。
    最大线程数一般设为2N+1最好,N是CPU核数。
    核心线程数,看应用,如果是任务,一天跑一次,设置为0,合适,因为跑完就停掉了,如果是常用线程池,看任务量,是保留一个核心还是几个核心线程数

    源码分析参考:https://www.cnblogs.com/sessionbest/articles/8689220.html

    1.4 提交任务

    无返回值的任务使用execute(Runnable)
    有返回值的任务使用submit(Runnable)

  • 相关阅读:
    C++——string转char[]
    Ackerman的非递归算法(未解决)
    单链表——递归求最大整数、节点个数、平均值
    队列——以数组Q[m]存放循环队列元素,设置一个标志tag,以tag=0和tag=1来区别在头指针和尾指针相等时,队列为空或满
    队列——假设以带头结点的循环链表表示队列,并且只设一个指针指向队尾元素结点(注意:不设头指针), * 试编写相应的置空队列、判断队列是否为空、入队和出队等算法。
    栈——判断回文
    栈——表达式求值
    栈——匹配()[]
    栈——十进制转八进制
    动态获取导航栏
  • 原文地址:https://www.cnblogs.com/seasail/p/12179401.html
Copyright © 2011-2022 走看看