zoukankan      html  css  js  c++  java
  • springboot多线程TaskExecutor的使用,以及使用@Async实现异步调用

    @Async实现异步调用

    pom.xml

    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    

    启动类

    @EnableAsync
    @SpringBootApplication
    public class LearnutilsApplication {
     
       public static void main(String[] args) {
          SpringApplication.run(LearnutilsApplication.class, args);
       }
     
       /**
        * 核心线程数10:线程池创建时初始化的线程数
        * 最大线程数20:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
        * 缓冲队列200:用来缓冲执行任务的队列
        * 允许线程的空闲时间60秒:超过了核心线程数之外的线程,在空闲时间到达之后会被销毁
        * 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
        * 线程池对拒绝任务的处理策略:此处采用了CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在execute方法的调用线程中运行被拒绝的任务;如果执行程序已被关闭,则会丢弃该任务
        * 设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean
        * 设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住
        */
       @EnableAsync
       @Configuration
       class TaskPoolConfig{
          @Bean("taskExecutor")
          public Executor taskExecutor(){
             ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
             executor.setCorePoolSize(10);
             executor.setMaxPoolSize(20);
             executor.setQueueCapacity(200);
             executor.setKeepAliveSeconds(60);
             executor.setThreadNamePrefix("TaskExecutor-");
             executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
             executor.setWaitForTasksToCompleteOnShutdown(true);
             executor.setAwaitTerminationSeconds(60);
             return executor;
          }
       }
     
    }
    

    定义controller

    @RequestMapping(value = "/AsyncController")
    @RestController
    public class AsyncController {
     
        @Autowired
        private AsyncService asyncService;
     
        @Autowired
        private AsyncService2 asyncService2;
     
        @Autowired
        private AsyncService3 asyncService3;
     
        @GetMapping(value = "/sendSms")
        public String sendSms() throws Exception{
           
            Future<String> sms = asyncService.sendSms();
            Future<String> sms2 = asyncService2.sendSms();
            Future<String> sms3 = asyncService3.sendSms();
            int i = 0;
            for (;;) {
                //如果都执行完就跳出循环,isDone方法,如果此线程执行完,true
                if (sms.isDone() && sms2.isDone() && sms3.isDone()) {
                    break;
                }
            }
            //get是获取结果集
            return sms.get()+sms2.get()+sms3.get();
        }
    }
    

    定义接口

    public interface AsyncService {
        Future<String> sendSms();
    }
    

    实现类

    @Service
    public class AsyncServiceImpl implements AsyncService {
        //Future<String> 返回结果 AsyncResult<String>
        @Async("taskExecutor")
        @Override
        public Future<String> sendSms() {
            return new AsyncResult<>("000000");
        }
    }
    

    将isDone换程CountDownLatch来判断线程是否执行完实例化CountDownLatch并且制定线程个数,线程个数就是从本地异步调用的方法个输,并且传入线程任务中,每个线程执行完毕就调用countDown()方法。最后在调用await()方法。这样在线程计数为零之前,线程就会一直等待。

    AsyncResult用来封装结果集,否则结果集无法返回

    @GetMapping(value = "/sendSms2")
    public String sendSms2() throws Exception{
        CountDownLatch downLatch = new CountDownLatch(3);
        Future<String> s = asyncService4.sendSms(downLatch);
        Future<String> s1 = asyncService5.sendSms(downLatch);
        Future<String> s2 = asyncService6.sendSms(downLatch);
        downLatch.await();
        return s.get()+s1.get()+s2.get();
    }
    

    将CountDownLatch传给方法

    public interface AsyncService4 {
        Future<String> sendSms(CountDownLatch downLatch);
    }
    

    方法

    @Service
    public class AsyncService4Impl implements AsyncService4 {
    
        @Async("taskExecutor")
        @Override
        public Future<String> sendSms(CountDownLatch downLatch) {
            downLatch.countDown();
            return new AsyncResult<>("11111");
        }
    }
    

    TaskExecutor的使用

    注册TaskExecutor

    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.task.TaskExecutor;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
     
    import java.util.concurrent.ThreadPoolExecutor;
     
    /**
     * @author yanjun
     * @date 2019/8/1 16:04
     **/
    @Configuration
    public class MainConfiguration {
        @Bean
        public TaskExecutor getTaskExecutor(){
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            // 设置核心线程数
            executor.setCorePoolSize(5);
            // 设置最大线程数
            executor.setMaxPoolSize(10);
            // 设置队列容量
            executor.setQueueCapacity(20);
            // 设置线程活跃时间(秒)
            executor.setKeepAliveSeconds(60);
            // 设置默认线程名称
            executor.setThreadNamePrefix("post-lending-");
            // 设置拒绝策略
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            // 等待所有任务结束后再关闭线程池
            executor.setWaitForTasksToCompleteOnShutdown(true);
            return executor;
        }
    }
    

    使用TaskExecutor

    @Autowired
    private TaskExecutor taskExecutor;
     
    public ResultVO findHandlingRecordByAssociationId(Integer associationId) throws InterruptedException{
        Map<String, Object> map = new HashMap<>(2);
       //线程计数器(等待所有线程执行完统一返回)
        CountDownLatch countDownLatch = new CountDownLatch(10);
        taskExecutor.execute(() -> {
            try {
                //service调用
                map.put("HandlingRecord", legalLitigationService.findHandlingRecordByAssociationId(associationId));
            }finally {
                countDownLatch.countDown();
            }
        });
        taskExecutor.execute(() -> {
            try {
                map.put("CaseBasic", legalLitigationService.findCaseDetailsById(associationId));
            }finally {
                countDownLatch.countDown();
            }
        });
        countDownLatch.await();
        return ResultVO.putSuccess(map);
    }
    
    定位问题原因* 根据原因思考问题解决方案* 实践验证方案有效性* 提交验证结果
  • 相关阅读:
    常用变量的获取
    批出里中常用参数的含义
    利用批处理命令复制指定文件到指定目录下
    跟后台打印程序系统服务通讯时出现错误。请打开服务管理单元,确认后台打印程序服务是否在运行。
    系统日志报错i8042prt无法加载
    删除指定路径下指定天数之前(以文件名中包含的日期字符串为准)的文件:字符串截取
    删除指定路径下指定天数之前(以文件的最后修改日期为准)的文件:BAT + VBS
    Linux学习笔记
    Docker
    Python学习笔记
  • 原文地址:https://www.cnblogs.com/jimoliunian/p/13740010.html
Copyright © 2011-2022 走看看