本文介绍如何使用springboot的sheduled实现任务的定时调度,并将调度的任务实现为并发的方式。
1、定时调度配置scheduled
1)注册定时任务
package com.xiaoju.dqa.sentinel.scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @Component public class Scheduler { private static final Logger logger = LoggerFactory.getLogger(Scheduler.class); @Scheduled(cron = "0 0/2 * * * ?") public void cronTask() { long timestamp = System.currentTimeMillis(); try { Thread thread = Thread.currentThread(); logger.info("cron任务开始, timestamp={}, threadId={}, threadName={}", timestamp, thread.getId(), thread.getName()); Thread.sleep(1000); } catch (InterruptedException e) { } } @Scheduled(fixedRate = 2 * 60 ) public void rateTask() { long timestamp = System.currentTimeMillis(); try { Thread thread = Thread.currentThread(); logger.info("fixedRate任务开始, timestamp={}, threadId={}, threadName={}", timestamp, thread.getId(), thread.getName()); Thread.sleep(1000); } catch (InterruptedException e) { } } }
2)启动定时任务
package com.xiaoju.dqa.sentinel; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; @Configuration @EnableScheduling @EnableAutoConfiguration @ComponentScan public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
这里就介绍两种配置调度时间的方式:
1)cron表达式
2)fixedRate,调度频率也就是调度间隔
如下代码中设置的都是每两分钟调度一次。你只需要将任务用@Scheduled装饰即可。
我这里只写了两个调度任务,而且只sleep1s,如果你sleep 10s的话你就能清晰的看到,两个任务是串行执行的。
springboot中定时任务的执行时串行的!
开始把他改成并行的。
2、定时调度并行化
定时调度的并行化,线程池实现,非常简单,只需要添加一个configuration,实现SchedulingConfigurer接口就可以了。
package com.xiaoju.dqa.sentinel.configuration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.SchedulingConfigurer; import org.springframework.scheduling.config.ScheduledTaskRegistrar; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @Configuration @EnableScheduling public class ScheduleConfiguration implements SchedulingConfigurer { @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { taskRegistrar.setScheduler(taskExecutor()); } @Bean(destroyMethod="shutdown") public Executor taskExecutor() { return Executors.newScheduledThreadPool(100); } }
然后你重启服务,可以看到两个任务并行的执行起来。
3、将任务里的方法设置为异步
package com.xiaoju.dqa.sentinel.scheduler; import com.xiaoju.dqa.sentinel.service.AuditCollect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @Component public class Scheduler { private static final Logger logger = LoggerFactory.getLogger(Scheduler.class); @Autowired private AuditCollect auditCollect; @Scheduled(cron = "0 0/2 * * * ?") public void cronTask() { long timestamp = System.currentTimeMillis(); try { Thread thread = Thread.currentThread(); logger.info("cron任务开始, timestamp={}, threadId={}, threadName={}", timestamp, thread.getId(), thread.getName()); auditCollect.doAuditCollect(); } catch (InterruptedException e) { } } @Scheduled(fixedRate = 2 * 60 ) public void rateTask() { long timestamp = System.currentTimeMillis(); try { Thread thread = Thread.currentThread(); logger.info("fixedRate任务开始, timestamp={}, threadId={}, threadName={}", timestamp, thread.getId(), thread.getName()); auditCollect.doAuditCollect(); } catch (InterruptedException e) { } } }
比如这里有个函数执行的是数据收集,可以把他实现为异步的,并同样扔到线程池里并发的执行。
看看是怎么实现的。
package com.xiaoju.dqa.sentinel.service; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.xiaoju.dqa.sentinel.client.es.ESDao; import com.xiaoju.dqa.sentinel.client.es.entity.ESDocument; import com.xiaoju.dqa.sentinel.client.es.entity.SearchDocument; import com.xiaoju.dqa.sentinel.client.redis.RedisClient; import com.xiaoju.dqa.sentinel.mapper.sentinel.SentinelMapper; import com.xiaoju.dqa.sentinel.model.SentinelClan; import com.xiaoju.dqa.sentinel.utils.ESSQLUtil; import com.xiaoju.dqa.sentinel.utils.GlobalStaticConf; import com.xiaoju.dqa.sentinel.utils.TimeFunction; import org.elasticsearch.search.SearchHit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import java.math.BigDecimal; import java.util.*; @Component public class AuditCollect { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); @Async("sentinelSimpleAsync") public void doAuditCollect(JSONObject clanObject, long currentTime) { JSONArray topicArray = clanObject.getJSONArray("families"); // 遍历所有的topic for (int j = 0; j < topicArray.size(); j++) { JSONObject topicObject = topicArray.getJSONObject(j); audit(clanObject, topicObject, currentTime); } } }
可以看到只是用@Async注释一下,并且加入了异步的executor=sentinelSimpleAsync。
SentinelSimpleAsync是我们自己实现来定制化线程池的。
package com.xiaoju.dqa.sentinel.configuration; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; @Configuration @EnableAsync public class ExecutorConfiguration { @Value("${executor.pool.core.size}") private int corePoolSize; @Value("${executor.pool.max.size}") private int maxPoolSize; @Value("${executor.queue.capacity}") private int queueCapacity; @Bean public Executor sentinelSimpleAsync() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setThreadNamePrefix("SentinelSimpleExecutor-"); executor.initialize(); return executor; } @Bean public Executor sentinelAsync() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setThreadNamePrefix("SentinelSwapExecutor-"); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } }
配置文件如下:
#============== 线程池 ===================
executor.pool.core.size=100
executor.pool.max.size=150
executor.queue.capacity=2000
想让异步生效的话,只需要在application类上加上EnableAsync注释就好了。
package com.xiaoju.dqa.sentinel; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; @Configuration @EnableScheduling @EnableAutoConfiguration @EnableAsync @ComponentScan public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }