zoukankan      html  css  js  c++  java
  • Springboot定时任务@Scheduled,异步任务@Async

    Springboot定时任务

    一、注解

    @EnableScheduling和@Scheduled

    定时任务Schedule,Spring调度默认则是顺序执行的, 使用场景适用于定时任务为固定周期。(如果要改变周期需要重启项目)

    eg:

    @Scheduled(cron = "0/5 * * * * ?")
    public void test(){
        
    }
    

    二、基于接口

    适用场景为任务周期经常变化,cron表达式来自于数据库获取。

    @Slf4j
    @Configuration
    public class AsyncAndScheduleConf implements SchedulingConfigurer {
        @Override
        public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
            taskRegistrar.addTriggerTask(
                    //1.添加任务内容(Runnable)
                    () -> System.out.println("执行动态定时任务: " + LocalDateTime.now().toLocalTime()),
                    //2.设置执行周期(Trigger)
                    triggerContext -> {
                        //2.1 从数据库获取执行周期
                        String cron = cronMapper.getCron();
                        //2.2 合法性校验.
                        if (StringUtils.isEmpty(cron)) {
                            // Omitted Code ..
                        }
                        //2.3 返回执行周期(Date)
                        return new CronTrigger(cron).nextExecutionTime(triggerContext);
                    }
            );
        }
    }
    

    三、异步任务

    @EnableAsync和@Async
    默认情况下,Spring 使用SimpleAsyncTaskExecutor去执行这些异步方法(此执行器没有限制线程数)。此默认值可以从两个层级进行覆盖。

    如果方法级别或应用级别未配置线程池,在使用SimpleAsyncTaskExecutor因为创建了大量线程极有可能造成OOM,以下贴出部分SimpleAsyncTaskExecutor源码会说明为何创建无限制的线程。

    public void execute(Runnable task) {
            this.execute(task, 9223372036854775807L);
    }
    
    public void execute(Runnable task, long startTimeout) {
            Assert.notNull(task, "Runnable must not be null");
            Runnable taskToUse = this.taskDecorator != null ? this.taskDecorator.decorate(task) : task;
        //判断是否开启限流
            if (this.isThrottleActive() && startTimeout > 0L) {
                //限流入口
                this.concurrencyThrottle.beforeAccess();
                this.doExecute(new SimpleAsyncTaskExecutor.ConcurrencyThrottlingRunnable(taskToUse));
            } else {
                this.doExecute(taskToUse);
            }
    
        }
    

    是否限流判断isThrottleActive()中属性concurrencyLimit是否大于0,默认为-1,所以未进行限流。

    如果开启了限流,进入beforeAccess() 方法会判断线程数是否超过concurrencyLimit,若超过则当前线程wait,其他线程执行完成后当前线程则会notify。

    所以,使用@Async从以下几点入手:

    Case1:使用默认的SimpleAsyncTaskExecutor

    为了防止创建过多线程,配置线程的限制数

    @Configuration
    public class ThreadLimitConfig extends AsyncConfigurerSupport {
        @Override
        public Executor getAsyncExecutor() {
            SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
            //设置允许同时执行的线程数为10
     executor.setConcurrencyLimit(10);
            return executor;
        }
    }
    
    Case2:使用指定的线程池
    @Async("myThreadPoolConfig")
    public void mehtod1(){
        //todo
    }
    
    Case3:全局配置
    
    @Configuration
    @EnableAsync
    public class AsyncConfiguration implements AsyncConfigurer {
        // 声明一个线程池(并指定线程池的名字,默认是方法名称)
        @Bean("taskExecutor")
        public Executor taskExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            //核心线程数5:线程池创建时候初始化的线程数
            executor.setCorePoolSize(5);
            //最大线程数5:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
            executor.setMaxPoolSize(5);
            //缓冲队列大小:用来缓冲执行任务的队列
            executor.setQueueCapacity(500);
            //允许线程的空闲时间60秒:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
            executor.setKeepAliveSeconds(60);
            //线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
            executor.setThreadNamePrefix("线程名-");
     
              //不在新线程中执行任务,而是用调用者所在的线程来执行
            // executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            //执行初始化
            executor.initialize();
     
            return executor;
     
        }
        //异常处理
        @Override
        public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
            return new MyAsyncUncaughtExceptionHandler();
        }
    }
    class MyAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {
        @Override
        public void handleUncaughtException(Throwable ex, Method method, Object... params) {
            System.out.println("class#method: " + method.getDeclaringClass().getName() + "#" + method.getName());
            System.out.println("type        : " + ex.getClass().getName());
            System.out.println("exception   : " + ex.getMessage());
     
        }
    }
    

    当异步方法有返回值时,可以捕获到异常信息,当无返回值时想要捕获异常需要使用以上配置追踪异常信息。

    @Async使用注意:方法不能使用static修饰符,否则注解失效。

    关于@EnableAsync

    默认启动流程:
    1 AnnotationAsyncExecutionInterceptor#getDefaultExecutor方法

    搜索关联的线程池定义:上下文中唯一的 TaskExecutor 实例,或一个名为 taskExecutorjava.util.concurrent.Executor 实例;
    2 如果以上都没找到,则会使用 SimpleAsyncTaskExecutor 处理异步方法调用

    关于springframework提供的类:ThreadPoolTaskScheduler与ThreadPoolTaskExecutor

    一个ThreadPoolTaskExecutor通过它的corePoolSize , maxPoolSize , keepAliveSeconds和queueCapacity属性在线程池中提供细粒度的配置。 诸如ThreadPoolTaskScheduler这样的调度器不提供这样的配置。
    spring中的线程调度类也是juc包中的间接实现。
    因此,在两者之间进行选择归结为以下问题:是否需要执行或计划执行任务?根据不同的用途去选择就可以

    参考:https://stackoverflow.com/questions/33453722/spring-threadpooltaskscheduler-vs-threadpooltaskexecutor

    juc包中还提供了ScheduledThreadPoolExecutor使用,内部包含一个无界阻塞队列,类似这种代码如果没有进行控制 一定会导致oom。

  • 相关阅读:
    LeetCode 81 Search in Rotated Sorted Array II(循环有序数组中的查找问题)
    LeetCode 80 Remove Duplicates from Sorted Array II(移除数组中出现两次以上的元素)
    LeetCode 79 Word Search(单词查找)
    LeetCode 78 Subsets (所有子集)
    LeetCode 77 Combinations(排列组合)
    LeetCode 50 Pow(x, n) (实现幂运算)
    LeetCode 49 Group Anagrams(字符串分组)
    LeetCode 48 Rotate Image(2D图像旋转问题)
    LeetCode 47 Permutations II(全排列)
    LeetCode 46 Permutations(全排列问题)
  • 原文地址:https://www.cnblogs.com/mzc1997/p/14357011.html
Copyright © 2011-2022 走看看