zoukankan      html  css  js  c++  java
  • SpringBoot 异步调用方法并接收返回值

    一、背景  

    项目中肯定会遇到异步调用其他方法的场景,比如有个计算过程,需要计算很多个指标的值,但是每个指标计算的效率快慢不同,如果采用同步执行的方式,运行这一个过程的时间是计算所有指标的时间之和。比如:

      方法A:计算指标x,指标y,指标z的值,其中计算指标x需要1s,计算指标y需要2s,指标z需要3s。最终执行完方法A就是5s。

      现在用异步的方式优化一下

      方法A异步调用方法B,方法C,方法D,方法B,方法C,方法D分别计算指标x,指标y,指标z的值,那么最终执行完方法A的时间则是3s。

    还有一种用途是当一个业务里面需要多个请求时,这时候异步并发请求所得到的回报远远是物有所值的。因为他是异步执行的,话不多说,一下是在springBoot里面使用并发请求;

    二、spring boot中异步并发使用

    2.1、appllication.yml

    #****************集成Async线程池开始*******************
    async: # Async线程池 配置
      executor:
        corepoolsize: 20
        maxpoolsize: 25
        queuecapacity: 40
        keepaliveseconds: 200
        threadnameprefix: appasync
        awaitterminationseconds: 60
    #*****************集成Async线程池结束******************

    2.2、配置线程池

    @Configuration
    @EnableAsync
    public class ExecutorConfig {
    
        @Value("${async.executor.corepoolsize}")
        private Integer corePoolSize;
    
        @Value("${async.executor.maxpoolsize}")
        private Integer maxPoolSize;
    
        @Value("${async.executor.queuecapacity}")
        private Integer queueCapacity;
    
        @Value("${async.executor.keepaliveseconds}")
        private Integer keepAliveSeconds;
    
        @Value("${async.executor.threadnameprefix}")
        private String threadNamePrefix;
    
        @Value("${async.executor.awaitterminationseconds}")
        private Integer awaitTerminationSeconds;
    
        /**
         * 线程池
         *
         * @return
         */
        @Bean(name = "asyncExecutor")
        public Executor asyncExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            // 基础线程数 corePoolSize: 10
            executor.setCorePoolSize(corePoolSize);
            // 最大线程数 maxPoolSize: 15
            executor.setMaxPoolSize(maxPoolSize);
            // 队列长度 queueCapacity: 25
            executor.setQueueCapacity(queueCapacity);
            //  线程池维护线程所允许的空闲时间,单位为秒 keepAliveSeconds: 200
            executor.setKeepAliveSeconds(keepAliveSeconds);
            // 线程名字 threadNamePrefix: appasync
            executor.setThreadNamePrefix(threadNamePrefix);
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            // 等待所有任务都完成再继续销毁其他的Bean
            executor.setWaitForTasksToCompleteOnShutdown(true);
            // 线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住
            executor.setAwaitTerminationSeconds(awaitTerminationSeconds);
            executor.initialize();
            return executor;
        }
    }

    2.3、线程池监控(这个可有可无,主要是为了对线程池参数及时的调优)

    @RestController
    @Slf4j
    @RequestMapping("/pubapi/asyncExecutor")
    public class AsyncExecutorController extends BaseController {
    
        @Resource(name = "asyncExecutor")
        private Executor asyncExecutor;
    
        @PostMapping("/monitor")public ResultBean<Map<String, Object>> getAsyncExecutorData() {
            ResultBean<Map<String, Object>> resultBean = ResultBeanUtil.error500();
            if (asyncExecutor == null) {
                return resultBean;
            }
    
            try {
                ThreadPoolTaskExecutor executorTask = (ThreadPoolTaskExecutor) asyncExecutor;
                ThreadPoolExecutor executor = executorTask.getThreadPoolExecutor();
    
                // 当前排队线程数
                int queueSize = executor.getQueue().size();
                // 当前活动线程数
                int activeCount = executor.getActiveCount();
                // 执行完线程数
                long completedThreadCount = executor.getCompletedTaskCount();
                // 总线程数
                long taskCount = executor.getTaskCount();
                // 初始线程数
                int poolSize = executor.getPoolSize();
                // 核心线程数
                int corePoolSize = executor.getCorePoolSize();
                // 线程池是否终止
                boolean isTerminated = executor.isTerminated();
                // 线城池是否关闭
                boolean isShutdown = executor.isShutdown();
                // 线程空闲时间
                long keepAliveTime = executor.getKeepAliveTime(TimeUnit.MILLISECONDS);
                // 最大允许线程数
                long maximumPoolSize = executor.getMaximumPoolSize();
                // 线程池中存在的最大线程数
                long largestPoolSize = executor.getLargestPoolSize();
    
                Map<String, Object> threadPoolData = new HashMap<>(18);
                threadPoolData.put("当前排队线程数", queueSize);
                threadPoolData.put("当前活动线程数", activeCount);
                threadPoolData.put("执行完线程数", completedThreadCount);
                threadPoolData.put("总线程数", taskCount);
                threadPoolData.put("初始线程数", poolSize);
                threadPoolData.put("核心线程数", corePoolSize);
                threadPoolData.put("线程池是否终止", isTerminated);
                threadPoolData.put("线城池是否关闭", isShutdown);
                threadPoolData.put("线程空闲时间", keepAliveTime);
                threadPoolData.put("最大允许线程数", maximumPoolSize);
                threadPoolData.put("线程池中存在的最大线程数", largestPoolSize);
    
                InetAddress inetAddress = IdWorker.getLocalHostLANAddress();
                Map<String, Object> resultData = new HashMap<>(4);
                resultData.put("ip", inetAddress.getHostAddress());
                resultData.put("threadPoolData", threadPoolData);
    
                resultBean = ResultBeanUtil.success("请求成功!", resultData);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return resultBean;
        }
    
    }

    2.4、代码中使用

    public void getMap(){
            /**
             * 先将耗时的、相互之间无依赖的操作先执行,由于其执行结果暂时不是特别关注,所以
             */
            Future<String> futureA = functionA();
            Future<String> futureB = functionB();
            /**
             * 执行其他的操作,其实functionA(),functionB()也在工作
             */
            aaa();
            /**
             * 获取异步的结果,然后计算
             */
            try {
                String resultA =futureA.get();
                String resuleB = futureB.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            
        }
    
        public Future<String> functionA (){
            Future<String> future = null;
            try {
                Thread.sleep(5000);
                future = new AsyncResult<String>("functionA");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return future;
        }
    
        public Future<String> functionB (){
            Future<String> future = null;
            try {
                Thread.sleep(3000);
                future = new AsyncResult<String>("functionB");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return future;
        }
    
        public void aaa(){
            System.out.println("我是");
        }

    三、使用误区

      在使用时候,

    四、线程池选型

    在全栈的道路上,积极向上、成熟稳重、谦虚好学、怀着炽热的心向前方的走得更远。
  • 相关阅读:
    招聘ASP.NET(C#)开发人员(已经截止,谢谢大家支持)
    VisualStudioCode开发Vue
    全局异常处理机制(Filter拦截器对比)
    工程师
    kubernetes(k8s)里面部署服务器集群并访问项目
    webpack 就是前端模块化打包工具
    Visual Studio Code配置C/C++开发环境
    docker和k8s(Kubernetes)是配套使用
    kettle 多表全删全插同步数据
    wireshark 抓HTTPS 的包 HTTPS = TLS + HTTP TLSv1.2 协议
  • 原文地址:https://www.cnblogs.com/DDgougou/p/11964425.html
Copyright © 2011-2022 走看看