zoukankan      html  css  js  c++  java
  • SpringBoot 异步与多线程

    1. @Async可以开启异步,但是要在 main 中EnableAsync

    2.@Async既可以注解在方法上,也可以注解到类上

    3.使用@Async时,请注意一定要对应bean name,否则或调用系统默认的SampleTaskExecutor,容易造成OOM

    4.本人使用的SpringBoot 2.3.4 ,默认值  maxPoolSize = 2147483647,queueCapacity = 2147483647, 建议在初始化时设置corePoolSize即可(百度到的例子中,大多数没有讲这一块)

    5.线程池对拒绝任务的处理策略处理,默认为 new ThreadPoolExecutor.CallerRunsPolicy(),建议使用 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

    6.如果Executor后台线程池还没有完成Callable的计算,这时调用返回Future对象的get()方法,会阻塞直到计算完成。

    我为什么要在这里重点提第四点和第五点,目前百度到的大多文章都是相互抄的,在定义executor主动定义了queueCapacity ,maxPoolSize  并没有去看源码中对于queueCapacity ,maxPoolSize  的处理。

    我的建议是,这俩值无需自定义,为了提高多线程的并发效率,可以考虑直接放大corePoolSize。

    关于executort的使用代码我就不在此处多讲,各位可以用此代码,测试系统中指定bean的taskExecutor中到底有多少任务在执行。

    getBean见 https://www.jianshu.com/p/3cd2d4e73eb7

    使用方式如下

    @Component
    @Slf4j
    public class TaskSchedule {
    
        @Autowired
        ApplicationContextProvider applicationContextProvider;
    
    //    @Scheduled(fixedRate = 2000L, initialDelay = 5)
        public void getTaskExecutorState(){
            Class<ThreadPoolTaskExecutor> clas = ThreadPoolTaskExecutor.class;
            ThreadPoolTaskExecutor threadPoolTaskExecutor  = applicationContextProvider.getBean("taskExecutor", clas);
            ThreadPoolExecutor threadPoolExecutor = threadPoolTaskExecutor.getThreadPoolExecutor();
            log.info("{}, taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}], MaximumPoolSize[{}], largestPoolSize[{}]",
                    threadPoolTaskExecutor.getThreadNamePrefix(),
                    threadPoolExecutor.getTaskCount(),
                    threadPoolExecutor.getCompletedTaskCount(),
                    threadPoolExecutor.getActiveCount(),
                    threadPoolExecutor.getQueue().size(),
                    threadPoolExecutor.getMaximumPoolSize(),
                    threadPoolExecutor.getLargestPoolSize());
        }
    }

    controller

    @Autowired
    private AsyncTask task;

    @Autowired
    private TaskSchedule taskSchedule;

    @PostMapping("/consume") @ResponseBody public JSONObject consume(@RequestBody JSONObject params) throws InterruptedException, ExecutionException { count ++; JSONObject jsonObject = new JSONObject(); log.info("params flag {} ",params.getString("flag")); log.info("名称 {}", params.getString("loginid")); jsonObject.put("loginidis",params.getString("loginid")); jsonObject.put("count", count); Future<String> task4 = task.task4(count); taskSchedule.getTaskExecutorState(); // task.task4(); // log.info("Future<String> {}", task4.get()); //调用返回Future对象的get()方法,会阻塞直到计算完成 // task.getTest1(); return jsonObject; }
    import cn.hutool.core.util.RandomUtil;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.scheduling.annotation.AsyncResult;
    import org.springframework.stereotype.Component;
    
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 功能描述:异步任务业务类(@Async也可添加在方法上)
     */
    @Component
    @Async("taskExecutor")
    @Slf4j
    public class AsyncTask {
    
        //获取异步结果
        public Future<String> task4(int index) throws InterruptedException {
            log.info("开始执行任务 task4 index:{}",index);
            long begin = System.currentTimeMillis();
    //        Thread.sleep(1000L*60*2);
    //        int sleepTime = RandomUtil.randomInt(1000*60*3, 1000*60*5);
            int sleepTime = RandomUtil.randomInt(1000*30, 1000*60);
            log.info(" sleepTime is {}",sleepTime);
            Thread.sleep(sleepTime);
            long end = System.currentTimeMillis();
            log.info("任务4执行完毕 index:"+index+" 耗时=" + (end - begin));
            return new AsyncResult<String>("任务4");
        }
    
    }

    各位可以在代码中注释掉

            executor.setMaxPoolSize(maxPoolSize);
            executor.setQueueCapacity(queueCapacity);
    或者使用不同的拒绝策略测试效果。
    如本人设置的参数core=3, max=5, queue=10, 通过postman构造对应的请求,会在第16个请求开始阻塞,由接收请求的线程本身http-nio-80-exec负责执行任务,其执行时间即postman请求消耗的时间
    http-nio-80-exec即SpringBoot中tomcat本身默认的executor。
    关于拒绝策略可参考:https://www.jianshu.com/p/f3322daa2ad0
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    import java.util.concurrent.ThreadPoolExecutor;
    
    @Configuration
    @Slf4j
    public class ThreadPoolTaskConfig {
    
        private static final int corePoolSize = 2;               // 核心线程数(默认线程数)线程池创建时候初始化的线程数
        private static final int maxPoolSize = 5;                // 最大线程数 线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
        private static final int keepAliveTime = 10;            // 允许线程空闲时间(单位:默认为秒)当超过了核心线程之外的线程在空闲时间到达之后会被销毁
        private static final int queueCapacity = 10;            // 缓冲队列数 用来缓冲执行任务的队列
        private static final String threadNamePrefix = "Async-Service-"; // 线程池名前缀 方便我们定位处理任务所在的线程池
    
        @Bean("taskExecutor") // bean的名称,默认为首字母小写的方法名
    //    public ThreadPoolTaskExecutor taskExecutor(){
        public ThreadPoolTaskExecutor taskExecutor(){
    //    public AsyncTaskExecutor taskExecutor(){
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(corePoolSize);
            executor.setMaxPoolSize(maxPoolSize);
            executor.setQueueCapacity(queueCapacity);
    //        executor.setKeepAliveSeconds(keepAliveTime);
            executor.setThreadNamePrefix(threadNamePrefix);
    
            // 线程池对拒绝任务的处理策略 采用了CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    //        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
    
    //        executor.setRejectedExecutionHandler(
    //                new RejectedExecutionHandler(){
    //                    @Override
    //                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    //                        try {
    //                            //继续加入阻塞队列执行,可自定义
    //                            log.info("继续加入阻塞队列执行,可自定义");
    //                            executor.getQueue().put(r);
    //                        } catch (InterruptedException e) {
    //                            e.printStackTrace();
    //                        }
    //                    }
    //                }
    //
    //        );
            // 初始化
            executor.initialize();
            return executor;
        }
    
    }
  • 相关阅读:
    callee与caller
    vi/vim使用进阶: 在VIM中使用GDB调试 – 使用vimgdb
    error: No curses/termcap library found的解决办法
    shell变量详解
    在简历中使用STAR法则
    Hive教程之metastore的三种模式
    分布式服务框架 Zookeeper -- 管理分布式环境中的数据
    ZooKeeper典型应用场景
    HBase Java API类介绍
    Spark使用总结与分享
  • 原文地址:https://www.cnblogs.com/huanghongbo/p/13834959.html
Copyright © 2011-2022 走看看