zoukankan      html  css  js  c++  java
  • Quartz实现动态定时任务

    目标:定时任务持久化到数据库,动态调整数据库里保存的cron表达式使定时任务可以跟随变化。

    1、核心依赖

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-quartz</artifactId>
            </dependency>

    2、数据库表

    -- ----------------------------
    -- 任务调度表  job_info
    -- ----------------------------
    CREATE TABLE `job_info`
    (
        `id`              varchar(20)  NOT NULL COMMENT 'ID',
        `job_name`        varchar(50)  NOT NULL COMMENT '名称',
        `cron_expression` varchar(255) NOT NULL COMMENT 'java cron表达式(秒分时日月周[年])',
        `bean_name`       varchar(255) NOT NULL COMMENT 'spring bean名称,被ioc容器管理的,需要执行调度任务的bean',
        `method_name`     varchar(255) NOT NULL COMMENT 'bean里需要执行方法名',
        `params`          varchar(255) NULL COMMENT '方法参数',
        `enable`          tinyint(1)   NOT NULL default 1 COMMENT '是否开启,1开启,0关闭',
        `remark`          varchar(255) NULL COMMENT '说明',
        `create_time`     datetime     NULL     DEFAULT NULL COMMENT '创建日期',
        `update_time`     datetime     NULL     DEFAULT NULL COMMENT '更新日期',
        PRIMARY KEY (`id`) USING BTREE
    ) ENGINE = InnoDB
      CHARACTER SET = utf8mb4 COMMENT = '任务调度表';
    
    insert into job_info
    values ('3', '用于测试定时任务触发run', '0/30 * * * * ?', 'testJob', 'run', null, false, '用于测试定时任务工作情况run', now(), null);
    insert into job_info
    values ('4', '用于测试定时任务触发call', '0/20 * * * * ?', 'testJob', 'call', null, false, '用于测试定时任务工作情况call', now(), null);
    
    
    -- ----------------------------
    -- 任务日志表  job_log
    -- ----------------------------
    CREATE TABLE `job_log`
    (
        `id`              varchar(20)  NOT NULL COMMENT 'ID',
        `job_name`        varchar(50)  NOT NULL COMMENT '名称',
        `cron_expression` varchar(255) NOT NULL COMMENT 'java cron表达式(秒分时日月周[年])',
        `bean_name`       varchar(255) NOT NULL COMMENT 'spring bean名称,被ioc容器管理的,需要执行调度任务的bean',
        `method_name`     varchar(255) NOT NULL COMMENT 'bean里需要执行方法名',
        `params`          varchar(255) NULL COMMENT '方法参数',
        `success`         tinyint(1)   NOT NULL default 1 COMMENT '是否成功,1成功,0失败',
        `time`            bigint(20)   NOT NULL default 0 COMMENT '执行耗时,毫秒',
        `detail`          text         NULL COMMENT '详细内容',
        `create_time`     datetime     NULL     DEFAULT NULL COMMENT '创建日期',
        PRIMARY KEY (`id`) USING BTREE,
        KEY `idx_job_log_create_time` (`create_time`) USING BTREE
    ) ENGINE = InnoDB
      CHARACTER SET = utf8mb4 COMMENT = '任务日志表';

    从job_info表和job_log表构建两个对应的实体类:JobInfo和JobLog

    3、通过反射的方式调用定时任务,这样就不用手动生成每个Quartz的Job

    public class CallMethod {
        private Object target;
        private Method method;
        // 单个字符串参数,如有需要可以改成数组参数
        private String params;
    
        public CallMethod(String beanName, String methodName, String params) throws NoSuchMethodException {
            this.target = SpringContextHolder.getBean(beanName);
            this.params = params;
    
            if (StringUtils.isNotBlank(params)) {
                this.method = target.getClass().getDeclaredMethod(methodName, String.class);
            } else {
                this.method = target.getClass().getDeclaredMethod(methodName);
            }
        }
    
        public void call() throws InvocationTargetException, IllegalAccessException {
            ReflectionUtils.makeAccessible(method);
            if (StringUtils.isNotBlank(params)) {
                method.invoke(target, params);
            } else {
                method.invoke(target);
            }
        }
    }

    读取JobExecutionContext上下文里JobDataMap,读取bean或者类,然后通过反射调用定时的方法

    @Slf4j
    @DisallowConcurrentExecution
    public class QuartzSerialJob extends QuartzJobBean {
    
        @Override
        protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
            JobInfo jobInfo = (JobInfo) context.getMergedJobDataMap().get(JobConstant.JOB_KEY);
            JobLogService jobLogService = SpringContextHolder.getBean(JobLogService.class);
    
            JobLog jobLog = new JobLog();
            BeanUtils.copyProperties(jobInfo, jobLog);
            jobLog.setCreateTime(new Date());
            long startTime = System.currentTimeMillis();
            try {
                log.info("定时任务准备执行,任务名称:{}", jobInfo.getJobName());
                CallMethod callMethod = new CallMethod(jobInfo.getBeanName(), jobInfo.getMethodName(), jobInfo.getParams());
                callMethod.call();
                jobLog.setSuccess(true);
                long times = System.currentTimeMillis() - startTime;
                log.info("定时任务执行完毕,任务名称:{} 总共耗时:{} 毫秒", jobLog.getJobName(), times);
            } catch (Exception e) {
                log.error("定时任务执行失败,任务名称:"+ jobInfo.getJobName(), e);
                jobLog.setSuccess(false);
                jobLog.setDetail(e.toString());
            } finally {
                long times = System.currentTimeMillis() - startTime;
                jobLog.setTime(times);
                jobLogService.insert(jobLog);
            }
        }
    }
    ps:@DisallowConcurrentExecution注解是为了串行执行同样的定时任务


    SpringContextHolder方法的实现:
    @Slf4j
    public class SpringContextHolder implements ApplicationContextAware, DisposableBean {
    
        private static ApplicationContext applicationContext = null;
    
       
        public static <T> T getBean(String name) {
            assertContextInjected();
            return (T) applicationContext.getBean(name);
        }
    
       
        public static <T> T getBean(Class<T> requiredType) {
            assertContextInjected();
            return applicationContext.getBean(requiredType);
        }
    
        /**
         * 检查ApplicationContext不为空.
         */
        private static void assertContextInjected() {
            if (applicationContext == null) {
                throw new IllegalStateException("applicaitonContext属性未注入, 请在applicationContext" +
                        ".xml中定义SpringContextHolder或在SpringBoot启动类中注册SpringContextHolder.");
            }
        }
    
        /**
         * 清除SpringContextHolder中的ApplicationContext为Null.
         */
        private static void clearHolder() {
            log.debug("清除SpringContextHolder中的ApplicationContext:"
                    + applicationContext);
            applicationContext = null;
        }
    
        @Override
        public void destroy(){
            SpringContextHolder.clearHolder();
        }
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            if (SpringContextHolder.applicationContext != null) {
                log.warn("SpringContextHolder中的ApplicationContext被覆盖, 原有ApplicationContext为:" + SpringContextHolder.applicationContext);
            }
            SpringContextHolder.applicationContext = applicationContext;
        }
    }
    
    

    4、实现定时任务增删的基本操作

    @Slf4j
    @Service("quartzJobService")
    public class QuartzJobService {
        private static final String JOB_NAME = "TASK_";
    
        @Resource
        private Scheduler scheduler;
    
        /**
         * 创建串行定时任务
         */
        public void addSerialJob(JobInfo jobInfo){
            if(!jobInfo.getEnable()) return;
            JobDetail jobDetail = JobBuilder.newJob(QuartzSerialJob.class)
                    .storeDurably()
                    .withIdentity(JOB_NAME + jobInfo.getId())
                    .build();
    
            CronTrigger cronTrigger = TriggerBuilder.newTrigger()
                    .withIdentity(JOB_NAME + jobInfo.getId())
                    .startNow()
                    .withSchedule(CronScheduleBuilder.cronSchedule(jobInfo.getCronExpression()))
                    .build();
            cronTrigger.getJobDataMap().put(JobConstant.JOB_KEY,jobInfo);
    
            try {
                scheduler.scheduleJob(jobDetail,cronTrigger);
            } catch (SchedulerException e) {
                log.error("创建定时任务错误");
                e.printStackTrace();
            }
        }
    
        /**
         * 重启job
         */
        public void ResetSerialJob(JobInfo jobInfo){
            deleteJob(jobInfo);
            addSerialJob(jobInfo);
        }
    
        /**
         * 删除job
         */
        public void deleteJob(JobInfo jobInfo){
            try {
                JobKey jobKey = JobKey.jobKey(JOB_NAME + jobInfo.getId());
                scheduler.pauseJob(jobKey);
                scheduler.deleteJob(jobKey);
            } catch (Exception e){
                log.error("删除定时任务失败", e);
                throw new CommonException(StatusCode.ERROR,"删除定时任务失败");
            }
        }
    
        /**
         * 恢复job
         * 慎用
         */
        public void resumeJob(JobInfo jobInfo){
            try {
                TriggerKey triggerKey = TriggerKey.triggerKey(JOB_NAME + jobInfo.getId());
                CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
                // 如果不存在则创建一个定时任务
                if(trigger == null) {
                    addSerialJob(jobInfo);
                }
                JobKey jobKey = JobKey.jobKey(JOB_NAME + jobInfo.getId());
                scheduler.resumeJob(jobKey);
            } catch (Exception e){
                log.error("恢复定时任务失败", e);
                throw new CommonException(StatusCode.ERROR,"恢复定时任务失败");
            }
        }
    
        /**
         * 暂停job
         * 慎用
         */
        public void pauseJob(JobInfo jobInfo){
            try {
                JobKey jobKey = JobKey.jobKey(JOB_NAME + jobInfo.getId());
                scheduler.pauseJob(jobKey);
            } catch (Exception e){
                log.error("定时任务暂停失败", e);
                throw new CommonException(StatusCode.ERROR,"定时任务暂停失败");
            }
        }
    }

    5、程序启动时,自动加载有效的定时任务

    @Slf4j
    @Component
    public class JobStarter implements ApplicationRunner {
        @Resource
        private JobInfoService jobInfoService;
    
        @Resource
        private QuartzJobService quartzJobService;
    
    
        @Override
        public void run(ApplicationArguments args) throws Exception {
            JobInfo jobInfo = new JobInfo();
            jobInfo.setEnable(true);
            List<JobInfo> jobInfoList = jobInfoService.queryAll(jobInfo);
            log.info("加载定时任务...");
            jobInfoList.forEach(job ->{
                log.info("定时任务:"+job.getJobName()+"  cron:"+job.getCronExpression());
                quartzJobService.addSerialJob(job);
            });
            log.info("定时任务加载完成...");
        }
    }

    6、任务调度管理的service实现类

    @Slf4j
    @Service("jobInfoService")
    public class JobInfoServiceImpl implements JobInfoService {
        @Resource
        private JobInfoDao jobInfoDao;
    
        @Resource
        private QuartzJobService quartzJobService;
    
        @Resource
        private IdWorker idWorker;
    
        @Override
        public void startJob(String id) {
            JobInfo jobInfo = jobInfoDao.queryById(id);
            if(jobInfo==null) throw new CommonException(StatusCode.ERROR,"任务不存在");
            // 正常情况下数据库里所有enable=true的定时任务在程序启动后就开始自动执行,
            if(jobInfo.getEnable()){
                // 执行重启命令
                quartzJobService.ResetSerialJob(jobInfo);
                return;
            }
    
            JobInfo updateJob = new JobInfo();
            updateJob.setId(id);
            updateJob.setEnable(true);
            int update = this.jobInfoDao.update(updateJob);
            if(update < 1) throw new CommonException(StatusCode.ERROR,"更新任务失败");
            jobInfo.setEnable(true);
            quartzJobService.addSerialJob(jobInfo);
        }
    
        @Override
        public void resetJob(String id) {
            JobInfo jobInfo = jobInfoDao.queryById(id);
            if(jobInfo==null) throw new CommonException(StatusCode.ERROR,"任务不存在");
            quartzJobService.ResetSerialJob(jobInfo);
        }
    
        @Override
        public void pauseJob(String id) {
            JobInfo jobInfo = jobInfoDao.queryById(id);
            if(jobInfo==null) throw new CommonException(StatusCode.ERROR,"任务不存在");
    
            JobInfo updateJob = new JobInfo();
            updateJob.setId(id);
            updateJob.setEnable(false);
            this.jobInfoDao.update(updateJob);
            quartzJobService.pauseJob(jobInfo);
        }
    
        @Override
        public void resumeJob(String id) {
            JobInfo jobInfo = jobInfoDao.queryById(id);
            if(jobInfo==null) throw new CommonException(StatusCode.ERROR,"任务不存在");
            if(!jobInfo.getEnable()){
                JobInfo updateJob = new JobInfo();
                updateJob.setId(id);
                updateJob.setEnable(true);
                this.jobInfoDao.update(updateJob);
            }
            quartzJobService.resumeJob(jobInfo);
        }
    
        @Override
        public void deleteJob(String id) {
            JobInfo jobInfo = jobInfoDao.queryById(id);
            if(jobInfo==null) throw new CommonException(StatusCode.ERROR,"任务不存在");
            if(jobInfo.getEnable()){
                JobInfo updateJob = new JobInfo();
                updateJob.setId(id);
                updateJob.setEnable(false);
                this.jobInfoDao.update(updateJob);
            }
            quartzJobService.deleteJob(jobInfo);
        }
    
        @Override
        @Transactional(rollbackFor = Exception.class,isolation = Isolation.SERIALIZABLE)
        public JobInfo updateState(JobInfo jobInfo) {
            JobInfo updateJob = new JobInfo();
            updateJob.setId(jobInfo.getId());
            updateJob.setEnable(jobInfo.getEnable());
            updateJob.setUpdateTime(new Date());
    
            int count = this.jobInfoDao.update(updateJob);
            if(count < 1) throw new CommonException(StatusCode.ERROR,"更新数据失败");
    
            JobInfo result = this.queryById(jobInfo.getId());
            if(result.getEnable()){
                this.quartzJobService.ResetSerialJob(result);
            }else {
                this.quartzJobService.deleteJob(jobInfo);
            }
            return result;
        }
    
        @Override
        public PageInfo<JobInfo> queryByPage(JobInfoQueryDto jobInfo, int pageNum, int pageSize) {
            PageHelper.startPage(pageNum, pageSize);
            return new PageInfo<>(this.jobInfoDao.queryByBlurry(jobInfo));
        }
    
    
        @Override
        public JobInfo queryById(String id) {
            return this.jobInfoDao.queryById(id);
        }
    
    
        @Override
        public List<JobInfo> queryAll(JobInfo jobInfo) {
            return this.jobInfoDao.queryAll(jobInfo);
        }
    
        /**
         * 新增数据
         *
         */
        @Override
        public JobInfo insert(JobInfo jobInfo) {
            if (!CronExpression.isValidExpression(jobInfo.getCronExpression())){
                throw new CommonException(StatusCode.INVALID_DATA,"cron表达式格式错误");
            }
            // 判断bean是否存在
            Object bean = SpringContextHolder.getBean(jobInfo.getBeanName());
            log.info("新增定时任务,bean ->"+bean+" method->"+jobInfo.getMethodName());
    
            jobInfo.setId(String.valueOf(idWorker.nextId()));
            jobInfo.setCreateTime(new Date());
            this.jobInfoDao.insert(jobInfo);
            if(jobInfo.getEnable()){
                this.quartzJobService.addSerialJob(jobInfo);
            }
            return jobInfo;
        }
    
        /**
         * 修改任务数据
         * 修改后如果查询到enable=true会重启任务
         *
         */
        @Override
        @Transactional(rollbackFor = Exception.class,isolation = Isolation.SERIALIZABLE)
        public JobInfo update(JobInfo jobInfo) {
    if (!CronExpression.isValidExpression(jobInfo.getCronExpression())){
                throw new CommonException(StatusCode.INVALID_DATA,"cron表达式格式错误");
            }
            // 判断bean是否存在
            Object bean = SpringContextHolder.getBean(jobInfo.getBeanName());
            log.info("更新定时任务,bean ->"+bean+" method->"+jobInfo.getMethodName());
    
            jobInfo.setUpdateTime(new Date());
            this.jobInfoDao.update(jobInfo);
            JobInfo result = this.queryById(jobInfo.getId());
            if(result.getEnable()){
                this.quartzJobService.ResetSerialJob(result);
            }
            return result;
        }
    
        /**
         * 通过主键删除数据
         * 删除前会停止任务
         */
        @Override
        public boolean deleteById(String id) {
    
            JobInfo jobInfo = this.jobInfoDao.queryById(id);
            if(jobInfo==null) throw new CommonException(StatusCode.ERROR,"任务不存在");
            this.quartzJobService.deleteJob(jobInfo);
            return this.jobInfoDao.deleteById(id) > 0;
        }
    }
     
    @DisallowConcurrentExecution
  • 相关阅读:
    计算机网络(1)----概述
    博客园自定义样式
    linux进程
    接口回调解析
    优先级队列
    双栈实现队列
    递归解决反转链表的一部分
    Multisim 之逻辑转换仪
    Multisim 如何添加文本 如何编辑文本字体
    Multisim 中的一些快捷键
  • 原文地址:https://www.cnblogs.com/asker009/p/13654710.html
Copyright © 2011-2022 走看看