zoukankan      html  css  js  c++  java
  • springBoot服务整合线程池ThreadPoolTaskExecutor与@Async详解使用

    ThreadPoolExecutor:=======这个是java自己实现的线程池执行类,基本上创建线程池都是通过这个类进行的创建。
    ThreadPoolTaskExecutor:========这个是springboot基于ThreadPoolExecutor实现的一个线程池执行类,包装类。

    1、Spring默认的@Async用线程池名字为SimpleAsyncTaskExecutor。

    2、Spring异步线程池的接口类是TaskExecutor,本质还是java.util.concurrent.Executor,没有配置的情况下,默认使用的是simpleAsyncTaskExecutor。

    3、标注@Async注解的方法和调用的方法一定不能在同一个类下,这样的话注解会失效,具体原因不多说

    注意:

    在springboot当中,如果没有配置线程池的话,springboot会自动配置一个ThreadPoolTaskExecutor线程池到bean当中,我们调用只需要 

    @Autowired  

    ThreadPoolTaskExecutor  threadPoolTaskExecutor;

    第一步: 首先在application启动类添加@EnableAsync

    @SpringBootApplication
    @EnableAsync //首先在application启动类添加@EnableAsync
    public class ThreadpoolApplication {
        public static void main(String[] args) {
            SpringApplication.run(ThreadpoolApplication.class, args);
        }
    }

    第二步:配置线程池,不配置的话使用springboot默认的线程池。

    package com.aswatson.csc.task.conf;
    
    import java.util.concurrent.Executor;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.EnableAsync;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    @Configuration
    @EnableAsync
    public class AsyncThreadConfiguration {
    
        @Bean("kafkaThreadExecutor")
        public Executor asyncExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(10);//核心线程数
            executor.setMaxPoolSize(20);//最大线程数
            executor.setKeepAliveSeconds(60);//空闲线程存活时间
            executor.setThreadNamePrefix("kafkaThreadAsync-");
            executor.initialize();
            return executor;
        }
    
    }

    第三步:测试1:在需要异步执行的方法上加上@Async注解。

    @Service
    public class AsyncTest {
        protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    
        @Async
        public void hello(String name){
            customerEventLogMapper.insert(customerEventLog);
            logger.info("异步线程启动 started."+name);  
        }
    }
        

    第四步:测试2:使用注入的模式:

    package com.example.apidemo.completableFutrue;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    import org.springframework.stereotype.Service;
    
    import java.time.LocalDateTime;
    
    @Service
    public class AsyncService {
    
        @Autowired
        ThreadPoolTaskExecutor threadPoolTaskExecutor;
    
        public void addEventLog(String buId, String status){
            CustomerEventLogPO customerEventLog = new CustomerEventLogPO();
            customerEventLog.setUuid(uuid);
            customerEventLog.setStatus(status);
            customerEventLog.setCreated(LocalDateTime.now());
            customerEventLogMapper.insert(customerEventLog);
    
            threadPoolTaskExecutor.submit(new Thread(()->{
                customerEventLogMapper.insert(customerEventLog);
            })); //submit有返回值
    
            threadPoolTaskExecutor.execute(new Thread(()->{
                customerEventLogMapper.insert(customerEventLog);
            })); //execute无返回值
        }
    
    }

    注意: 如果配置多个线程池,该如何指定线程池呢?

    方式1: @Resources("kafkaThreadExecutor")。

    方式2: 如果有多个线程池,但是在@Async注解里面没有指定的话,会默认加载第一个配置的线程池。

    ======================================================================================================================================================================

    另外需要注意的是:关于注解失效需要注意以下几点:

    1. 注解的方法必须是public方法
    2. 方法一定要从另一个类中调用,也就是从类的外部调用,类的内部调用是无效的,因为@Transactional和@Async注解的实现都是基于Spring的AOP,而AOP的实现是基于动态代理模式实现的。那么注解失效的原因就很明显了,有可能因为调用方法的是对象本身而不是代理对象,因为没有经过Spring容器。
    3. 异步方法使用注解@Async的返回值只能为void或者Future

    ==================================================================================================================================================================================================

    // @Bean()的拒绝策略:
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

    拒绝策略:如果(总任务数 - 核心线程数 - 任务队列数)-(最大线程数 - 核心线程数)> 0 的话,则会出现线程拒绝。举例:( 12 - 5 - 2 ) - ( 8 - 5 ) > 0,会出现线程拒绝。线程拒绝又分为 4 种策略,分别为:

      • CallerRunsPolicy():交由调用方线程运行,比如 main 线程。
      • AbortPolicy():直接抛出异常。
      • DiscardPolicy():直接丢弃。
      • DiscardOldestPolicy():丢弃队列中最老的任务。
    路在脚下
  • 相关阅读:
    Python装饰器之functools.wraps的作用
    [转]scala和RDD中的占位符"_"
    Scala,Java,Python 3种语言编写Spark WordCount示例
    CentOS系统安装Python3
    [爬虫]采用Go语言爬取天猫商品页面
    go语言的排序和去重
    go语言字符串的连接和截取
    [转]git commit --amend用法
    KM算法小结
    Protocol Buffers学习教程
  • 原文地址:https://www.cnblogs.com/lgg20/p/15471821.html
Copyright © 2011-2022 走看看