该功能主要是基于 TaskScheduler 和 CronTask两个类来实现。
直接干代码
1. 创建一个springboot 工程,依赖如下
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.4.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>qinfeng.zheng</groupId> <artifactId>job-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>job-demo</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.3.2</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.47</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
2. 数据库配置
spring.datasource.driver-class-name = com.mysql.jdbc.Driver spring.datasource.url= jdbc:mysql://120.79.34.98:3306/test?useUnicode=yes&characterEncoding=UTF-8&useSSL=false spring.datasource.username = root spring.datasource.password = 123456
3. 代码
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @Configuration public class SchedulingConfig { @Bean public TaskScheduler taskScheduler() { ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); // 定时任务执行线程池核心线程数 taskScheduler.setPoolSize(4); taskScheduler.setRemoveOnCancelPolicy(true); taskScheduler.setThreadNamePrefix("TaskSchedulerThreadPool-"); return taskScheduler; } }
import java.util.concurrent.ScheduledFuture; public final class ScheduledTask { volatile ScheduledFuture future; /** * 取消定时任务 */ public void cancel() { ScheduledFuture future = this.future; if (future != null) { future.cancel(true); } } }
import lombok.extern.slf4j.Slf4j; import org.springframework.util.ReflectionUtils; import org.springframework.util.StringUtils; import java.lang.reflect.Method; import java.util.Objects; /** * 通过反射的方式 调用真定需要执行的方法 */ @Slf4j public class SchedulingRunnable implements Runnable { private String beanName; private String methodName; private String params; public SchedulingRunnable(String beanName, String methodName, String params) { this.beanName = beanName; this.methodName = methodName; this.params = params; } @Override public void run() { log.info("定时任务开始执行 - bean:{},方法:{},参数:{}", beanName, methodName, params); long startTime = System.currentTimeMillis(); try { Object target = SpringContextUtils.getBean(beanName); Method method = null; if (!StringUtils.isEmpty(params)) { method = target.getClass().getDeclaredMethod(methodName, String.class); } else { method = target.getClass().getDeclaredMethod(methodName); } ReflectionUtils.makeAccessible(method); if (!StringUtils.isEmpty(params)) { method.invoke(target, params); } else { method.invoke(target); } } catch (Exception ex) { log.error("定时任务执行异常 - bean:{},方法:{},参数:{} ", beanName, methodName, params, ex); } long times = System.currentTimeMillis() - startTime; log.info("定时任务执行结束 - bean:{},方法:{},参数:{},耗时:{} 毫秒", beanName, methodName, params, times); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; SchedulingRunnable that = (SchedulingRunnable) o; if (params == null) { return beanName.equals(that.beanName) && methodName.equals(that.methodName) && that.params == null; } return beanName.equals(that.beanName) && methodName.equals(that.methodName) && params.equals(that.params); } @Override public int hashCode() { if (params == null) { return Objects.hash(beanName, methodName); } return Objects.hash(beanName, methodName, params); } }
import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; @Component public class SpringContextUtils implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { SpringContextUtils.applicationContext = applicationContext; } public static Object getBean(String name) { return applicationContext.getBean(name); } }
定时任务注册类
import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.config.CronTask; import org.springframework.scheduling.config.ScheduledTaskRegistrar; import org.springframework.stereotype.Component; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * 可参考spring 提供的定时任务注册类 {@link ScheduledTaskRegistrar} */ @Component public class CronTaskRegistrar implements DisposableBean { /** * 缓存 */ private final Map<Runnable, ScheduledTask> scheduledTasks = new ConcurrentHashMap<>(16); @Autowired private TaskScheduler taskScheduler; /** * 添加一个定时任务 * 其核心就是靠spring提供的 CronTask 类来实现 * * @param task * @param cronExpression */ public void addCronTask(Runnable task, String cronExpression) { addCronTask(new CronTask(task, cronExpression)); } public void addCronTask(CronTask cronTask) { if (cronTask != null) { Runnable task = cronTask.getRunnable(); if (this.scheduledTasks.containsKey(task)) { removeCronTask(task); } this.scheduledTasks.put(task, scheduleCronTask(cronTask)); } } public void removeCronTask(Runnable task) { ScheduledTask scheduledTask = this.scheduledTasks.remove(task); if (scheduledTask != null) scheduledTask.cancel(); } public ScheduledTask scheduleCronTask(CronTask cronTask) { ScheduledTask scheduledTask = new ScheduledTask(); scheduledTask.future = this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger()); return scheduledTask; } @Override public void destroy() { for (ScheduledTask task : this.scheduledTasks.values()) { task.cancel(); } this.scheduledTasks.clear(); } }
下面是将job信息记录到数据库,以便于启动项目时,就可以运行定时任务
CREATE TABLE `sys_job` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `jobId` bigint(20) DEFAULT NULL, `beanName` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL, `methodParams` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL, `cronExpression` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL, `jobStatus` char(1) COLLATE utf8_unicode_ci DEFAULT NULL, `remark` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL, `createTime` date DEFAULT NULL, `updateTime` date DEFAULT NULL, `methodName` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
import lombok.Data; import java.util.Date; @Data public class SysJobPO { /** * 任务ID */ private Integer jobId; /** * bean名称 , 如示例中的: demoTask */ private String beanName; /** * 方法名称 */ private String methodName; /** * 方法参数 */ private String methodParams; /** * cron表达式 */ private String cronExpression; /** * 状态(1正常 0暂停) */ private Integer jobStatus; /** * 备注 */ private String remark; /** * 创建时间 */ private Date createTime; /** * 更新时间 */ private Date updateTime; }
import org.apache.ibatis.annotations.*; import qinfeng.zheng.jobdemo.SysJobPO; import java.util.List; @Mapper public interface SysJobMapper { @Insert("INSERT INTO sys_job(jobId,beanName,methodParams,cronExpression,jobStatus,remark,createTime,updateTime,methodName) values (#{jobId}," + "#{beanName},#{methodParams},#{cronExpression},#{jobStatus},#{remark},#{createTime},#{updateTime},#{methodName})") boolean addSysJob(SysJobPO sysJob); @Select("select * from sys_job where jobStatus = #{jobStatus}") List<SysJobPO> getSysJobListByStatus(Integer jobStatus); @Select("select * from sys_job where jobId = #{jobId}") SysJobPO findTaskJobByJobId(Integer jobId); @Delete("delete from sys_job where jobId = #{jobId}") boolean deleteTaskJobByJobId(Integer jobId); /** * 这儿只是修改corn表达式 和 状态。 测试使用 * * @param sysJobPO * @return */ @Update("update sys_job set cronExpression = #{cronExpression} , jobStatus = #{jobStatus} where jobId= #{jobId}") boolean updateTaskJob(SysJobPO sysJobPO); }
/** * job状态控制 */ public enum SysJobStatus { NORMAL("正常", 1), SUSPEND("暂停", 0); private String desc; private Integer index; private SysJobStatus(String desc, Integer index) { this.desc = desc; this.index = index; } public String desc() { return this.desc; } public Integer index() { return this.index; } }
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import qinfeng.zheng.jobdemo.CronTaskRegistrar; import qinfeng.zheng.jobdemo.SchedulingRunnable; import qinfeng.zheng.jobdemo.SysJobPO; import qinfeng.zheng.jobdemo.SysJobStatus; import qinfeng.zheng.jobdemo.mapper.SysJobMapper; import java.util.List; @Service public class SysJobRunner implements CommandLineRunner { private static final Logger logger = LoggerFactory.getLogger(SysJobRunner.class); @Autowired private SysJobMapper sysJobMapper; @Autowired private CronTaskRegistrar cronTaskRegistrar; @Override public void run(String... args) { // 初始加载数据库里状态为正常的定时任务 List<SysJobPO> jobList = sysJobMapper.getSysJobListByStatus(SysJobStatus.NORMAL.index()); if (!CollectionUtils.isEmpty(jobList)) { for (SysJobPO job : jobList) { SchedulingRunnable task = new SchedulingRunnable(job.getBeanName(), job.getMethodName(), job.getMethodParams()); cronTaskRegistrar.addCronTask(task, job.getCronExpression()); } logger.info("定时任务已加载完毕..."); } } }
定时job测试类
import org.springframework.stereotype.Component; /** * 该类用于测试定时任务 */ @Component("demoTask") public class DemoTask { public void taskWithParams(String params) { System.out.println("执行有参示例任务:" + params); } public void taskNoParams() { System.out.println("执行无参示例任务"); } }
启动springboot项目时,就会通过 SysJobRunner 加载数据库表中job信息,从而启动定时任务。
下面再写一个controller控制器,实现与前端交互,从而实现定时job的增、删、改,切换
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; import qinfeng.zheng.jobdemo.CronTaskRegistrar; import qinfeng.zheng.jobdemo.SchedulingRunnable; import qinfeng.zheng.jobdemo.SysJobPO; import qinfeng.zheng.jobdemo.SysJobStatus; import qinfeng.zheng.jobdemo.mapper.SysJobMapper; @RestController public class TaskController { @Autowired private SysJobMapper sysJobMapper; @Autowired private CronTaskRegistrar cronTaskRegistrar; @PostMapping("/addTask") public String addTask(SysJobPO sysJob) { boolean success = sysJobMapper.addSysJob(sysJob); if (!success) return "新增失败"; else { if (sysJob.getJobStatus().equals(SysJobStatus.NORMAL.index())) { SchedulingRunnable task = new SchedulingRunnable(sysJob.getBeanName(), sysJob.getMethodName(), sysJob.getMethodParams()); // 注册定时任务 cronTaskRegistrar.addCronTask(task, sysJob.getCronExpression()); } } return "SUCCESS"; } @DeleteMapping("/deleteTask/{jobId}") public String deleteTask(@PathVariable Integer jobId) { SysJobPO existJob = sysJobMapper.findTaskJobByJobId(jobId); boolean success = sysJobMapper.deleteTaskJobByJobId(jobId); if (!success) return "删除失败"; else { if (existJob.getJobStatus().equals(SysJobStatus.NORMAL.index())) { SchedulingRunnable task = new SchedulingRunnable(existJob.getBeanName(), existJob.getMethodName(), existJob.getMethodParams()); // 删除定时任务 cronTaskRegistrar.removeCronTask(task); } } return "SUCCESS"; } /** * 修改定时任务 * * @param sysJob * @return */ @PostMapping("/updateTask") public String updateTaskJob(SysJobPO sysJob) { SysJobPO existJob = sysJobMapper.findTaskJobByJobId(sysJob.getJobId()); boolean success = sysJobMapper.updateTaskJob(sysJob); if (!success) return "修改成功"; else { // 1. 先删除原来的定时任务(Map缓存) if (existJob.getJobStatus().equals(SysJobStatus.NORMAL.index())) { SchedulingRunnable task = new SchedulingRunnable(existJob.getBeanName(), existJob.getMethodName(), existJob.getMethodParams()); cronTaskRegistrar.removeCronTask(task); } //2. 新增定时任务(放到Map缓存中) if (sysJob.getJobStatus().equals(SysJobStatus.NORMAL.index())) { SchedulingRunnable task = new SchedulingRunnable(sysJob.getBeanName(), sysJob.getMethodName(), sysJob.getMethodParams()); cronTaskRegistrar.addCronTask(task, sysJob.getCronExpression()); } } return "SUCCESS"; } /** * 启,停定时任务的状态切换 */ @GetMapping("/trigger/{jobId}") public String triggerTaskJob(@PathVariable Integer jobId) { SysJobPO existJob = sysJobMapper.findTaskJobByJobId(jobId); // 1.如果原先是启动状态 ,那么就停止吧(从Map缓存中删除, 并将表中状态置为0) if (existJob.getJobStatus().equals(SysJobStatus.NORMAL.index())) { SchedulingRunnable task = new SchedulingRunnable(existJob.getBeanName(), existJob.getMethodName(), existJob.getMethodParams()); cronTaskRegistrar.removeCronTask(task); existJob.setJobStatus(0); sysJobMapper.updateTaskJob(existJob); } else { SchedulingRunnable task = new SchedulingRunnable(existJob.getBeanName(), existJob.getMethodName(), existJob.getMethodParams()); cronTaskRegistrar.addCronTask(task, existJob.getCronExpression()); existJob.setJobStatus(1); sysJobMapper.updateTaskJob(existJob); } return "SUCCESS"; } }
好,代码到此为此。
最后来测试一把,首先启动springboot项目
可以看到,项目一启动,就将数据库表中记录的一个定时任务启动了。
然后,我们调用addTask接口,新增一个定时job
可以看到,现在启动了两个job.
数据库也有两条job数据, 下次启动项目时,两个job就会同时启动了。
+----+-------+----------+--------------+----------------+-----------+--------------------------+------------+------------+----------------+ | id | jobId | beanName | methodParams | cronExpression | jobStatus | remark | createTime | updateTime | methodName | +----+-------+----------+--------------+----------------+-----------+--------------------------+------------+------------+----------------+ | 7 | 1 | demoTask | NULL | */5 * * * * ? | 1 | 测试无参定时任务 | 2020-02-15 | 2020-02-15 | taskNoParams | | 15 | 10 | demoTask | 123456 | */10 * * * * ? | 1 | NULL | NULL | NULL | taskWithParams | +----+-------+----------+--------------+----------------+-----------+--------------------------+------------+------------+----------------+