zoukankan      html  css  js  c++  java
  • SpringBoot集成Quartz定时任务(持久化到数据库)

    前言

    现在大多数项目都使用了springboot,所以本文主要讲springboot与quartz的完美整合,简化配置、持久化数据并自定义quartz数据源。

    正文

    一、增加依赖

    我们使用的spring-boot-starter-quartz,所以不用显示指定版本号:

    <!--quartz相关依赖-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-quartz</artifactId>
    </dependency>
    

    二、yml配置信息

    quartz:
      job-store-type: jdbc
      jdbc:
        initialize-schema: never
      properties:
        org:
          quartz:
            scheduler:
              #在集群中每个实例都必须有一个唯一的instanceId,但是应该有一个相同的instanceName
              instanceId: AUTO
              instanceName: um-scheduler
              skipUpdateCheck: true #是否跳过Quartz版本更新检查。如果检查并且找到更新,则会在Quartz的日志中报告它。生产部署要禁止
            jobStore:
              acquireTriggersWithinLock: true #获取trigger的时候是否上锁,默认false采用乐观锁,但有可能出现ABA导致重复调度
              #此存储机制用于Quartz独立于应用容器的事务管理,如果是Tomcat容器管理的数据源,那我们定义的事物也不会传播给Quartz框架内部。
              #通俗的讲就是不管我们的Service服务本身业务代码是否执行成功,只要代码中调用了Quartz API的数据库操作,那任务状态就永久持久化了,
              #就算业务代码抛出运行时异常任务状态也不会回滚到之前的状态。与之相反的是JobStoreCMT。
              class: org.quartz.impl.jdbcjobstore.JobStoreTX
              driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate #JDBC代理类
              useProperties: true #让JDBCJobStore将JobDataMaps中的所有值都作为字符串,因此可以作为键值对存储而不是在BLOB列中以其序列化形式存储,从而避免序列化的版本问题
              tablePrefix: QRTZ_ #数据库表前缀
              misfireThreshold: 60_000 #超过这个时间还未触发的trigger,就被认为发生了misfire,默认60s。job成功触发叫fire,misfire就是未成功触发。
              isClustered: true #是否开启群集,集群模式需要在多台服务器上做时间同步或者使用zookeeper去解决
              clusterCheckinInterval: 20_000 #定义了Scheduler实例检入到数据库中的频率(单位:毫秒)。
            threadPool:
              class: org.quartz.simpl.SimpleThreadPool #SimpleThreadPool这个线程池只是简单地在它的池中保持固定数量的线程,不增长也不缩小。但是它非常健壮且经过良好的测试,差不多每个Quartz用户都使用这个池
              threadCount: 10 #最大线程数,意味着最多有多少个job可以同时执行
              threadPriority: 5 #线程优先级
              threadsInheritContextClassLoaderOfInitializingThread: true #线程上下文类加载器是否继承自初始线程的加载器
            startup-delay: 60 #延时启动,要有足够长的时间让你的应用先启动完成后再让Scheduler启动(单位秒)
            overwrite-existing-jobs: true #是否每次系统运行都会清空数据库中的Job信息,重新进行初始化

    数据源配置:

    spring: 
      # 数据库配置
      datasource:
        type: com.alibaba.druid.pool.DruidDataSource
        dynamic:
          primary: master #设置默认的数据源或者数据源组,默认值即为master
          datasource:
            master:
              url: xxx
              driver-class-name: com.mysql.cj.jdbc.Driver
              username: root
              password: xxx
            quartz:
              url: xxx
              driver-class-name: com.mysql.cj.jdbc.Driver
              username: root

    自定生成表结构,需要配置如下信息:

    spring.quartz.jdbc.initialize-schema: always
    spring.quartz.job-store-type: jdbc

    项目启动后生成的表信息:

    三、定时任务逻辑封装

    1.QuartzConfig定时任务配置类。

    mport org.quartz.spi.JobFactory;
    import org.quartz.spi.TriggerFiredBundle;
    import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.quartz.SchedulerFactoryBean;
    import org.springframework.scheduling.quartz.SpringBeanJobFactory;
    
    import javax.sql.DataSource;
    import java.io.IOException;
    
    /**
     * @version v1.0
     * @description:
     * @author: 47 on 2020/4/9 14:32
     */
    @Configuration
    public class QuartzConfig {
    
    
        public void setSchedulerFactoryBeanProperties(SchedulerFactoryBean schedulerFactoryBean, DataSource dataSource, JobFactory jobFactory) throws IOException {
            schedulerFactoryBean.setJobFactory(jobFactory);
            //设置数据源
            schedulerFactoryBean.setDataSource(dataSource);
        }
    
        /**
         * 自定义JobFactory,以便Job类里可以使用Spring类注入
         */
        @Bean
        public JobFactory jobFactory() {
            return new QuartzSpringBeanJobFactory();
        }
    
    
        private class QuartzSpringBeanJobFactory extends SpringBeanJobFactory {
    
            private AutowireCapableBeanFactory beanFactory;
    
            @Override
            public void setApplicationContext(ApplicationContext context) {
                beanFactory = context.getAutowireCapableBeanFactory();
            }
    
            @Override
            protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
                final Object job = super.createJobInstance(bundle);
                beanFactory.autowireBean(job);
                return job;
            }
        }
    }

    2.定义SchedulerJob计划任务类

    /**
     * @version v1.0
     * @description:
     * @author: 47 on 2020/4/9 14:32
     */
    public class SchedulerJob {
    
        private String name;
        private String group;
        private String cron;
        private String jobClass;
        private String desc;
        //间隔时长
        private Long interval;
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public String getGroup() {
            return group;
        }
    
        public void setGroup(String group) {
            this.group = group;
        }
    
        public String getCron() {
            return cron;
        }
    
        public void setCron(String cron) {
            this.cron = cron;
        }
    
        public String getJobClass() {
            return jobClass;
        }
    
        public void setJobClass(String jobClass) {
            this.jobClass = jobClass;
        }
    
        public String getDesc() {
            return desc;
        }
    
        public void setDesc(String desc) {
            this.desc = desc;
        }
    
        public Long getInterval() {
            return interval;
        }
    
        public void setInterval(Long interval) {
            this.interval = interval;
        }
    }

    3.定义SchedulerJobs (用来主要注入配置文件里多个定时任务)

    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.stereotype.Component;
    
    import java.util.List;
    
    /**
     * @version v1.0
     * @description:
     * @author: 47 on 2020/4/9 14:26
     */
    @Component
    @ConfigurationProperties(prefix = "quartz")
    public class SchedulerJobs {
    
        private List<SchedulerJob> jobs;
    
        public List<SchedulerJob> getJobs() {
            return jobs;
        }
    
        public void setJobs(List<SchedulerJob> jobs) {
            this.jobs = jobs;
        }
    }

    4.封装定时任务的方法 SchedulerManager 

    import com.gamer.um.quartz.configure.SchedulerJob;
    import com.gamer.um.quartz.jobs.MonitorCronJob;
    import org.quartz.*;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.List;
    
    /**
     * @version v1.0
     * @description:
     * @author: 47 on 2020/4/9 11:37
     */
    @Component
    public class SchedulerManager {
    
        private final static Logger logger = LoggerFactory.getLogger(SchedulerManager.class);
    
    
        @Autowired
        private Scheduler scheduler;
    
        /**
         * 激活任务
         * @param schedulerJob
         */
        public void activeJob(SchedulerJob schedulerJob){
            JobKey jobKey = JobKey.jobKey(schedulerJob.getName(), schedulerJob.getGroup());
            try {
                if (scheduler.checkExists(jobKey) && !MonitorCronJob.JOB_NAME.equals(schedulerJob.getName())) {
                    updateJob(schedulerJob);
                }else {
                    createJob(schedulerJob);
                }
            } catch (SchedulerException e) {
                logger.error("activeJob {}", e);
            }
        }
    
        /**
         * 创建任务并加入调度
         * @param schedulerJob
         */
        public void createJob(SchedulerJob schedulerJob){
            JobKey jobKey = JobKey.jobKey(schedulerJob.getName(), schedulerJob.getGroup());
            try {
                if (scheduler.checkExists(jobKey)) {
                    return;
                }
                Class<?> clazz = Class.forName(schedulerJob.getJobClass());
                JobDetail jobDetail = getJobDetail(schedulerJob, (Class<Job>) clazz);
                Trigger cronTrigger = getCronTrigger(schedulerJob);
                //加入调度器
                scheduler.scheduleJob(jobDetail, cronTrigger);
            } catch (ClassNotFoundException | SchedulerException e) {
                logger.error("createJob {}", e);
            }
        }
    
        /**
         * 更新任务触发器
         * @param schedulerJob
         */
        public void updateJob(SchedulerJob schedulerJob){
            TriggerKey triggerKey = TriggerKey.triggerKey(schedulerJob.getName(), schedulerJob.getGroup());
            try {
                Trigger trigger = scheduler.getTrigger(triggerKey);
                if (trigger == null) {
                    return;
                }
                JobKey jobKey = trigger.getJobKey();
                //查询cron
                String oldCron = ((CronTrigger)trigger).getCronExpression();
                //没有变化则返回
                if (oldCron.equals(schedulerJob.getCron())){
                    return;
                }
                Trigger cronTrigger = getCronTrigger(schedulerJob);
                //加入调度器
                scheduler.rescheduleJob(triggerKey, cronTrigger);
            } catch (SchedulerException e) {
                logger.error("updateJob {}", e);
            }
        }
    
        public void deleteJobs(List<JobKey> jobKeys) {
            try {
                scheduler.deleteJobs(jobKeys);
            } catch (SchedulerException e) {
                logger.error("deleteJobs {}", e);
            }
        }
    
        /**
         * 创建任务
         * @param schedulerJob
         * @param clazz
         * @return
         */
        private JobDetail getJobDetail(SchedulerJob schedulerJob, Class<Job> clazz) {
            return JobBuilder.newJob()
                    .ofType(clazz)
                    .withIdentity(schedulerJob.getName(), schedulerJob.getGroup())
                    .withDescription(schedulerJob.getDesc())
                    .build();
        }
    
        /**
         * 创建触发器
         * @param schedulerJob
         * @return
         */
        private Trigger getCronTrigger(SchedulerJob schedulerJob) {
            CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(schedulerJob.getCron());
            if (!MonitorCronJob.JOB_NAME.equals(schedulerJob.getName())){
                //任务错过执行策略,以错过的第一个频率时间立刻开始执行,重做错过的所有频率周期后,当下一次触发频率发生时间大于当前时间后,再按照正常的Cron频率依次执行
                cronScheduleBuilder.withMisfireHandlingInstructionIgnoreMisfires();
            }
            return TriggerBuilder.newTrigger()
                    .withIdentity(schedulerJob.getName(), schedulerJob.getGroup())
                    .withDescription(schedulerJob.getDesc())
                    .withSchedule(cronScheduleBuilder)
                    .build();
        }
    
    }

    5.监控其他定时任务的总任务MonitorCronJob(用于监控cron的更新)

    package com.gamer.um.quartz.jobs;
    
    import com.gamer.um.quartz.SchedulerManager;
    import com.gamer.um.quartz.configure.SchedulerJob;
    import com.gamer.um.quartz.configure.SchedulerJobs;
    import com.gamer.um.quartz.utils.LicUtil;
    import org.quartz.*;
    import org.quartz.impl.matchers.GroupMatcher;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.cloud.context.refresh.ContextRefresher;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Objects;
    import java.util.Set;
    import java.util.stream.Collectors;
    
    /**
     * @version v1.0
     * @description:
     * @author: 47 on 2020/4/9 15:07
     */
    public class MonitorCronJob implements Job {
    
        private final static Logger logger = LoggerFactory.getLogger(MonitorCronJob.class);
    
        public static final String JOB_NAME = "monitor_cron";
        public static final String GROUP_NAME = "monitor";
        public static final String CRON = "0 0/10 * * * ?";
        public static final String DESC = "监控cron更新";
    
        @Autowired
        private SchedulerManager schedulerManager;
        @Autowired
        private SchedulerJobs schedulerJobs;
        @Autowired
        private ContextRefresher contextRefresher;
    
        @Override
        public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
            //重新加载配置
            contextRefresher.refresh();
            Set<JobKey> oldJobKeys = null;
            try {
                oldJobKeys = jobExecutionContext.getScheduler().getJobKeys(GroupMatcher.anyJobGroup());
            } catch (SchedulerException e) {
                logger.error("MonitorCronJob {}", e);
            }
    
            List<String> newJobKeys = new ArrayList<>();
            for (SchedulerJob job : schedulerJobs.getJobs()) {
                //过滤掉monitor_cron任务
                if (job.getName().equals(JOB_NAME)) {
                    continue;
                }
                newJobKeys.add(job.getName());
                logger.info("job【{}】,cron【{}】", job.getName(), job.getCron());
                schedulerManager.activeJob(job);
            }
            if (oldJobKeys == null) {
                return;
            }
            //删除没有配置的任务
            List<JobKey> shouldDeleteJobKeys = oldJobKeys.stream()
                    .filter(jobKey -> !JOB_NAME.equals(jobKey.getName()) && !newJobKeys.contains(jobKey.getName()))
                    .collect(Collectors.toList());
            logger.info("delete jobs {}", shouldDeleteJobKeys);
            schedulerManager.deleteJobs(shouldDeleteJobKeys);
        }
    }

    6.配置一个定时任务(以后其他都不管,直接在配置文件里新加其他的定时任务即可)

    #定时任务
    quartz:
      jobs:
        - name: myName #(随便取任务名)
          group: collect
          cron: 0 0/5 * * * ? *
          jobClass: com.gamer.me.quartz.jobs.MyJob #(自己的定时任务的执行类,也就是你写业务代码的类)
          desc: 我的任务

    7.设置启动项目就初始化定时任务(主要是上面的监控cron的类需要初始化)

    import com.gamer.um.quartz.configure.SchedulerJob;
    import com.gamer.um.quartz.jobs.MonitorCronJob;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.ApplicationArguments;
    import org.springframework.boot.ApplicationRunner;
    import org.springframework.stereotype.Component;
    
    /**
     * @version v1.0
     * @description:
     * @author: 47 on 2020/4/9 17:12
     */
    @Component
    public class Initialization implements ApplicationRunner {
    
        @Autowired
        private SchedulerManager schedulerManager;
    
        @Override
        public void run(ApplicationArguments args) throws Exception {
            SchedulerJob schedulerJob = new SchedulerJob();
            schedulerJob.setName(MonitorCronJob.JOB_NAME);
            schedulerJob.setGroup(MonitorCronJob.GROUP_NAME);
            schedulerJob.setCron(MonitorCronJob.CRON);
            schedulerJob.setDesc(MonitorCronJob.DESC);
            schedulerJob.setJobClass(MonitorCronJob.class.getName());
            schedulerManager.activeJob(schedulerJob);
        }
    }

    8.测试

    public class MyJob implements Job {
    
      @Autowired
      private SchedulerManager schedulerManager;

     测试时需要去过滤下当前Jobs里的job是当前执行的(下面会给出代码)
      .......你的业务代码    
    }

    @Override
    public SchedulerJob getSchedulerJobDetail(Class jobClass) {
      List<SchedulerJob> jobs = schedulerJobs.getJobs();
      if (Objects.isNull(jobs)) {
        return null;
      }
      SchedulerJob job = jobs.stream()
        .filter(schedulerJob -> Objects.equals(schedulerJob.getJobClass(), jobClass.getName()))
        .findFirst()
        .orElse(null);
      return job;
    }

  • 相关阅读:
    【转】PowerManager 与 WakeLock
    【转】设计模式总结之模式分类
    【转】一篇文章,教你学会Git
    【转】Iconfont
    【转】码云source tree 提交超过100m 为什么大文件推不上去
    各 Android 平台版本支持的 API 级别
    【转】Android进程机制
    【转】数据库CRUD操作
    【转】数据库--视图的基本概念以及作用
    动态规划的两种形式
  • 原文地址:https://www.cnblogs.com/47Gamer/p/13752448.html
Copyright © 2011-2022 走看看