前言:
之前开发定时任务时,有两种方式:
a、如果是SpringBoot项目,在方法上加上 @Scheduled 注解,然后开配置下cron就可以了。 缺点:不支持通过某种条件来开启任务
b、使用 Executors.newScheduledThreadPool() 启动一个定时线程。缺点:服务重启或者任务失败,线程就结束了
项目中使用了Quartz框架,很完美的解决了以上两个问题。本文主要记录Quartz框架的基本使用
上代码:
以下配置是基于SpringBoot 2.1.0 + Quartz 2.3.0版本
1、pom.xml文件():
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!--定时任务需要依赖context模块 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> </dependency> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> </dependency> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz-jobs</artifactId> </dependency>
2、application.yml配置文件
app: db: host: 127.0.01 port: 3306 dbname: xwj server: port: 18090 spring: application: name: quarts-one datasource: type: com.alibaba.druid.pool.DruidDataSource driver-class-name: net.sf.log4jdbc.DriverSpy url: jdbc:log4jdbc:mysql://${app.db.host}:${app.db.port}/${app.db.dbname}?autoReconnect=true&failOverReadOnly=false&createDatabaseIfNotExist=true&useSSL=false&useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8 username: root password: 123456 logging: level: jdbc: off jdbc.sqltiming: error #记录sql执行的时间 #root: INFO com.xwj: debug
3、Quartz的配置文件 quartz.properties (发现只能用properties文件,如果用yml文件不生效)
org.quartz.scheduler.instanceName = instance_one org.quartz.scheduler.instanceId = instance_id_one org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate org.quartz.jobStore.tablePrefix = QRTZ_ org.quartz.jobStore.isClustered = true org.quartz.jobStore.useProperties = false org.quartz.jobStore.clusterCheckinInterval = 20000 org.quartz.scheduler.idleWaitTime = 5000 org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount = 20 org.quartz.threadPool.threadPriority = 5 org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true
4、Quartz的配置类
/** * Quartz配置类 */ @Configuration public class QuartzConfig { /** * 继承org.springframework.scheduling.quartz.SpringBeanJobFactory 实现任务实例化方式 */ public static class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory implements ApplicationContextAware { private transient AutowireCapableBeanFactory beanFactory; @Override public void setApplicationContext(final ApplicationContext context) { beanFactory = context.getAutowireCapableBeanFactory(); } /** * 将job实例交给spring ioc托管 我们在job实例实现类内可以直接使用spring注入的调用被spring ioc管理的实例 */ @Override protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception { final Object job = super.createJobInstance(bundle); // 将job实例交付给spring ioc beanFactory.autowireBean(job); return job; } } /** * 配置任务工厂实例 */ @Bean public JobFactory jobFactory(ApplicationContext applicationContext) { // 采用自定义任务工厂 整合spring实例来完成构建任务 AutowiringSpringBeanJobFactory jobFactory = new AutowiringSpringBeanJobFactory(); jobFactory.setApplicationContext(applicationContext); return jobFactory; } /** * 配置任务调度器 使用项目数据源作为quartz数据源 * * @param jobFactory 自定义配置任务工厂 * @param dataSource 数据源实例 */ @Bean public SchedulerFactoryBean schedulerFactoryBean(JobFactory jobFactory, DataSource dataSource) throws Exception { SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean(); // 将spring管理job自定义工厂交由调度器维护 schedulerFactoryBean.setJobFactory(jobFactory); // 设置覆盖已存在的任务 schedulerFactoryBean.setOverwriteExistingJobs(true); // 项目启动完成后,等待2秒后开始执行调度器初始化 schedulerFactoryBean.setStartupDelay(2); // 设置调度器自动运行 schedulerFactoryBean.setAutoStartup(true); // 设置数据源,使用与项目统一数据源 schedulerFactoryBean.setDataSource(dataSource); // 设置上下文spring bean name schedulerFactoryBean.setApplicationContextSchedulerContextKey("applicationContext"); // 设置配置文件位置 schedulerFactoryBean.setConfigLocation(new ClassPathResource("/quartz.properties")); return schedulerFactoryBean; } }
5、Quartz工具类
@Slf4j @Component public class MyQuartzScheduler { @Autowired private Scheduler scheduler; // 任务 private final String JOB_NAME_PREFIX = "JOB_"; // 任务名称前缀 /** * 指定时间后执行任务(只会执行一次) * * @param triggerStartTime 指定时间 */ @SneakyThrows public void addJob(Class<? extends Job> jobClass, String jobName, Date triggerStartTime, Map<String, Object> params) { // 使用job类名作为组名 String groupName = jobClass.getSimpleName(); // 创建任务触发器 Trigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, groupName).startAt(triggerStartTime).build(); // 将触发器与任务绑定到调度器内 this.scheduleJob(jobClass, groupName, jobName, params, trigger); } /** * 带触发器的任务(执行多次) * * @param cronExpression 定时任务表达式 */ @SneakyThrows public void addJobWithCron(Class<? extends Job> jobClass, String jobName, String cronExpression, Map<String, Object> params) { // 使用job类名作为组名 String groupName = jobClass.getSimpleName(); // 基于表达式构建触发器 CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression); CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(jobName, groupName).withSchedule(cronScheduleBuilder).build(); // 将触发器与任务绑定到调度器内 this.scheduleJob(jobClass, groupName, jobName, params, cronTrigger); } /** * 带触发器的任务,同时指定时间段(立马执行) * * @param timeoutSeconds 超时时间(秒) * @param cronExpression 定时任务表达式 */ @SneakyThrows public void addJobWithCron(Class<? extends Job> jobClass, String jobName, String cronExpression, long timeoutSeconds,
Map<String, Object> params) { // 使用job类名作为组名 String groupName = jobClass.getSimpleName(); // 计算结束时间 Date endDate = TimeUtil.localDateTime2Date(LocalDateTime.now().plusSeconds(timeoutSeconds)); // 基于表达式构建触发器,同时指定时间段 CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression); CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(jobName, groupName)
.startNow().endAt(endDate)
.withSchedule(cronScheduleBuilder).build(); // 将触发器与任务绑定到调度器内 this.scheduleJob(jobClass, groupName, jobName, params, cronTrigger); } @SneakyThrows private void scheduleJob(Class<? extends Job> jobClass, String groupName, String jobName, Map<String, Object> params, Trigger trigger) { jobName = StringUtils.join(JOB_NAME_PREFIX, jobName); log.info("创建任务,任务名称:{}", jobName); // 创建任务 JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, groupName).build(); // 添加参数 jobDetail.getJobDataMap().putAll(params); // 将触发器与任务绑定到调度器内 scheduler.scheduleJob(jobDetail, trigger); } /** * 删除某个任务 */ @SneakyThrows public boolean deleteJob(String name, String group) { JobKey jobKey = new JobKey(name, group); JobDetail jobDetail = scheduler.getJobDetail(jobKey); if (jobDetail == null) { throw new RuntimeException("任务不存在"); } return scheduler.deleteJob(jobKey); } /** * 修改某个任务的执行时间 */ @SneakyThrows public boolean modifyJob(String name, String group, String time) { Date date = null; TriggerKey triggerKey = new TriggerKey(name, group); CronTrigger cronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey); String oldTime = cronTrigger.getCronExpression(); if (!oldTime.equalsIgnoreCase(time)) { CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(time); CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(name, group).withSchedule(cronScheduleBuilder).build(); date = scheduler.rescheduleJob(triggerKey, trigger); } return date != null; } /** * 获取任务状态 */ @SneakyThrows public TriggerState getJobState(String name, String group) { TriggerKey triggerKey = TriggerKey.triggerKey(name, group); return scheduler.getTriggerState(triggerKey); } /** * 获取任务状态 */ @SneakyThrows public TriggerState getJobState(TriggerKey triggerKey) { return scheduler.getTriggerState(triggerKey); } /** * 暂停所有任务 */ @SneakyThrows public void pauseAllJob() { scheduler.pauseAll(); } /** * 暂停某个任务 */ @SneakyThrows public void pauseJob(String name, String group) { JobKey jobKey = new JobKey(name, group); JobDetail jobDetail = scheduler.getJobDetail(jobKey); if (jobDetail == null) { throw new RuntimeException("任务不存在"); } scheduler.pauseJob(jobKey); } /** * 恢复所有任务 */ @SneakyThrows public void resumeAllJob() { scheduler.resumeAll(); } /** * 恢复某个任务 */ @SneakyThrows public void resumeJob(String name, String group) { JobKey jobKey = new JobKey(name, group); JobDetail jobDetail = scheduler.getJobDetail(jobKey); if (jobDetail == null) { throw new RuntimeException("任务不存在"); } scheduler.resumeJob(jobKey); }/** * 通过group查询有多少个运行的任务 */ @SneakyThrows public long getRunningJobCountByGroup(Class<? extends Job> jobClass) { String groupName = jobClass.getSimpleName(); GroupMatcher<JobKey> matcher = GroupMatcher.jobGroupEquals(groupName); Set<JobKey> jobKeySet = scheduler.getJobKeys(matcher); if (CollectionUtils.isNotEmpty(jobKeySet)) { return jobKeySet.stream().filter(d -> StringUtils.equals(d.getGroup(), groupName)).count(); } return 0; } }
6、新建一个job任务
@Slf4j public class MyJob extends QuartzJobBean { @Override protected void executeInternal(JobExecutionContext context) throws JobExecutionException { JobDetail jobDetail = context.getJobDetail(); JobKey jobKey = jobDetail.getKey(); JobDataMap dataMap = jobDetail.getJobDataMap(); // 接收参数 log.info("执行MyJob任务,任务名称:{},接收参数:{}", jobKey.getName(), dataMap.getString("id")); } }
7、新建一个Controller测试类
@RestController @RequestMapping("/quartz") public class QuartzApiController { @Autowired private MyQuartzScheduler quartzScheduler; /** * 指定时间点触发的任务() */ @RequestMapping("/job/start/{id}") public void startQuartzJob(@PathVariable String id) { // 20s之后执行 LocalDateTime ldt = LocalDateTime.now(); Date date = TimeUtil.localDateTime2Date(ldt.plusSeconds(20)); Map<String, Object> params = new HashMap<String, Object>(); params.put("id", id); quartzScheduler.addJob(MyJob.class, id, date, params); } /** * 定时任务 */ @RequestMapping("/job/cron/{id}") public void cronQuartzJob(@PathVariable String id) { Map<String, Object> params = new HashMap<>(); params.put("id", id); // 每10秒执行一次 quartzScheduler.addJobWithCron(MyJob.class, id, "0/10 * * * * ?", params); } /** * 删除某个任务 */ @RequestMapping(value = "/job/delete") public boolean deleteJob(String name, String group) { return quartzScheduler.deleteJob(name, group); } /** * 修改任务执行时间 */ @RequestMapping("/job/modify") public boolean modifyQuartzJob(String name, String group, String time) { return quartzScheduler.modifyJob(name, group, time); } /** * 暂停某个任务 */ @RequestMapping(value = "/job/pause") public void pauseQuartzJob(String name, String group) { quartzScheduler.pauseJob(name, group); } /** * 暂停所有任务 */ @RequestMapping(value = "/job/pauseAll") public void pauseAllQuartzJob() { quartzScheduler.pauseAllJob(); } }
8、在数据库中执行新建quartz相关表的sql (脚本太长,可自己百度,网上一大堆)
9、启动SpringBoot服务,控制台可以看到Quartz相关的日志信息(表示Quartz配置成功):
打开数据库中的 qrtz_scheduler_state 表,会发现表中多了一条数据(SCHED_NAME和INSTANCE_NAME 分别是quartz.properties配置文件中的instanceName和instanceId):
10、在浏览器请求 http://localhost:18090/quartz/job/start/123,在控制台会看到如下日志:
2020-12-11 21:22:03.635 INFO 13052 --- [io-18090-exec-6] c.x.q.MyQuartzScheduler : 创建任务,任务名称:JOB_123
同时在表 qrtz_job_details 和 qrtz_triggers 中会分别插入一条数据,表示该任务的详细信息
过了20秒之后(上面配置的定时任务是20秒之后执行),可以看到又打印出一条日志:
2020-12-11 21:22:23.667 INFO 13052 --- [ce_one_Worker-2] c.x.q.j.MyJob : 执行MyJob任务,任务名称:JOB_123,接收参数:123
表示任务已经成功执行,并且表 qrtz_job_details 和 qrtz_triggers 的任务信息已经被删除
至此Quartz配置和测试完成,其它复杂的测试,可请求QuartzApiController的方法自行操作
扩展:
使用SpringBoot封装好的Quartz,会比上面的方式简单一些,但是使用方法跟上面一模一样。需要修改的点包括:
1、不使用 quartz.properties 配置文件和 QuartzConfig 配置类,直接在application.yml文件中增加Quartz配置:
spring: quartz: #quartz相关属性配置 properties: org: quartz: scheduler: instanceName: instance_one #调度器实例名称 #instanceId: AUTO #调度器实例编号自动生成 instanceId: instance_id_one #调度器实例id jobStore: class: org.quartz.impl.jdbcjobstore.JobStoreTX #持久化方式配置 driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate #持久化方式配置数据驱动,MySQL数据库 tablePrefix: QRTZ_ #quartz相关数据表前缀名 isClustered: true #开启分布式部署 clusterCheckinInterval: 10000 #分布式节点有效性检查时间间隔,单位:毫秒 useProperties: false #配置是否使用 threadPool: class: org.quartz.simpl.SimpleThreadPool #线程池实现类 threadCount: 10 #执行最大并发线程数量 threadPriority: 5 #线程优先级 threadsInheritContextClassLoaderOfInitializingThread: true #配置是否启动自动加载数据库内的定时任务,默认true #数据库方式 job-store-type: jdbc
Quartz详细说明:Quartz官方中文文档