zoukankan      html  css  js  c++  java
  • SpringBoot整合Quartz 实现分布式定时任务调度

    一、Quartz 集群架构

    Quartz 是 Java 领域最著名的开源任务调度工具。

    在上篇文章中,我们详细的介绍了 Quartz 的单体应用实践,如果只在单体环境中应用,Quartz 未必是最好的选择,例如Spring Scheduled一样也可以实现任务调度,并且与SpringBoot无缝集成,支持注解配置,非常简单,但是它有个缺点就是在集群环境下,会导致任务被重复调度!

    而与之对应的 Quartz 提供了极为广用的特性,如任务持久化、集群部署和分布式调度任务等等,正因如此,基于 Quartz 任务调度功能在系统开发中应用极为广泛!

    在集群环境下,Quartz 集群中的每个节点是一个独立的 Quartz 应用,没有负责集中管理的节点,而是通过数据库表来感知另一个应用,利用数据库锁的方式来实现集群环境下进行并发控制,每个任务当前运行的有效节点有且只有一个!

    特别需要注意的是:分布式部署时需要保证各个节点的系统时间一致!

    二、数据表初始化

    数据库表结构官网已经提供,我们可以直接访问Quartz对应的官方网站,找到对应的版本,然后将其下载!

     我选择的是quartz-2.3.0-distribution.tar.gz,下载完成之后将其解压,在文件中搜索sql,在里面选择适合当前环境的数据库脚本文件,然后将其初始化到数据库中即可!

     例如,我使用的数据库是mysql-5.7,因此我选择的是tables_mysql_innodb.sql脚本,具体内容如下:

    DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS;
    DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS;
    DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE;
    DROP TABLE IF EXISTS QRTZ_LOCKS;
    DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS;
    DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS;
    DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS;
    DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS;
    DROP TABLE IF EXISTS QRTZ_TRIGGERS;
    DROP TABLE IF EXISTS QRTZ_JOB_DETAILS;
    DROP TABLE IF EXISTS QRTZ_CALENDARS;
     
    CREATE TABLE QRTZ_JOB_DETAILS(
    SCHED_NAME VARCHAR(120) NOT NULL,
    JOB_NAME VARCHAR(190) NOT NULL,
    JOB_GROUP VARCHAR(190) NOT NULL,
    DESCRIPTION VARCHAR(250) NULL,
    JOB_CLASS_NAME VARCHAR(250) NOT NULL,
    IS_DURABLE VARCHAR(1) NOT NULL,
    IS_NONCONCURRENT VARCHAR(1) NOT NULL,
    IS_UPDATE_DATA VARCHAR(1) NOT NULL,
    REQUESTS_RECOVERY VARCHAR(1) NOT NULL,
    JOB_DATA BLOB NULL,
    PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP))
    ENGINE=InnoDB;
     
    CREATE TABLE QRTZ_TRIGGERS (
    SCHED_NAME VARCHAR(120) NOT NULL,
    TRIGGER_NAME VARCHAR(190) NOT NULL,
    TRIGGER_GROUP VARCHAR(190) NOT NULL,
    JOB_NAME VARCHAR(190) NOT NULL,
    JOB_GROUP VARCHAR(190) NOT NULL,
    DESCRIPTION VARCHAR(250) NULL,
    NEXT_FIRE_TIME BIGINT(13) NULL,
    PREV_FIRE_TIME BIGINT(13) NULL,
    PRIORITY INTEGER NULL,
    TRIGGER_STATE VARCHAR(16) NOT NULL,
    TRIGGER_TYPE VARCHAR(8) NOT NULL,
    START_TIME BIGINT(13) NOT NULL,
    END_TIME BIGINT(13) NULL,
    CALENDAR_NAME VARCHAR(190) NULL,
    MISFIRE_INSTR SMALLINT(2) NULL,
    JOB_DATA BLOB NULL,
    PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
    FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
    REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP))
    ENGINE=InnoDB;
     
    CREATE TABLE QRTZ_SIMPLE_TRIGGERS (
    SCHED_NAME VARCHAR(120) NOT NULL,
    TRIGGER_NAME VARCHAR(190) NOT NULL,
    TRIGGER_GROUP VARCHAR(190) NOT NULL,
    REPEAT_COUNT BIGINT(7) NOT NULL,
    REPEAT_INTERVAL BIGINT(12) NOT NULL,
    TIMES_TRIGGERED BIGINT(10) NOT NULL,
    PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
    FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
    REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
    ENGINE=InnoDB;
     
    CREATE TABLE QRTZ_CRON_TRIGGERS (
    SCHED_NAME VARCHAR(120) NOT NULL,
    TRIGGER_NAME VARCHAR(190) NOT NULL,
    TRIGGER_GROUP VARCHAR(190) NOT NULL,
    CRON_EXPRESSION VARCHAR(120) NOT NULL,
    TIME_ZONE_ID VARCHAR(80),
    PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
    FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
    REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
    ENGINE=InnoDB;
     
    CREATE TABLE QRTZ_SIMPROP_TRIGGERS
      (
        SCHED_NAME VARCHAR(120) NOT NULL,
        TRIGGER_NAME VARCHAR(190) NOT NULL,
        TRIGGER_GROUP VARCHAR(190) NOT NULL,
        STR_PROP_1 VARCHAR(512) NULL,
        STR_PROP_2 VARCHAR(512) NULL,
        STR_PROP_3 VARCHAR(512) NULL,
        INT_PROP_1 INT NULL,
        INT_PROP_2 INT NULL,
        LONG_PROP_1 BIGINT NULL,
        LONG_PROP_2 BIGINT NULL,
        DEC_PROP_1 NUMERIC(13,4) NULL,
        DEC_PROP_2 NUMERIC(13,4) NULL,
        BOOL_PROP_1 VARCHAR(1) NULL,
        BOOL_PROP_2 VARCHAR(1) NULL,
        PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
        FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
        REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
    ENGINE=InnoDB;
     
    CREATE TABLE QRTZ_BLOB_TRIGGERS (
    SCHED_NAME VARCHAR(120) NOT NULL,
    TRIGGER_NAME VARCHAR(190) NOT NULL,
    TRIGGER_GROUP VARCHAR(190) NOT NULL,
    BLOB_DATA BLOB NULL,
    PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
    INDEX (SCHED_NAME,TRIGGER_NAME, TRIGGER_GROUP),
    FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
    REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
    ENGINE=InnoDB;
     
    CREATE TABLE QRTZ_CALENDARS (
    SCHED_NAME VARCHAR(120) NOT NULL,
    CALENDAR_NAME VARCHAR(190) NOT NULL,
    CALENDAR BLOB NOT NULL,
    PRIMARY KEY (SCHED_NAME,CALENDAR_NAME))
    ENGINE=InnoDB;
     
    CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS (
    SCHED_NAME VARCHAR(120) NOT NULL,
    TRIGGER_GROUP VARCHAR(190) NOT NULL,
    PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP))
    ENGINE=InnoDB;
     
    CREATE TABLE QRTZ_FIRED_TRIGGERS (
    SCHED_NAME VARCHAR(120) NOT NULL,
    ENTRY_ID VARCHAR(95) NOT NULL,
    TRIGGER_NAME VARCHAR(190) NOT NULL,
    TRIGGER_GROUP VARCHAR(190) NOT NULL,
    INSTANCE_NAME VARCHAR(190) NOT NULL,
    FIRED_TIME BIGINT(13) NOT NULL,
    SCHED_TIME BIGINT(13) NOT NULL,
    PRIORITY INTEGER NOT NULL,
    STATE VARCHAR(16) NOT NULL,
    JOB_NAME VARCHAR(190) NULL,
    JOB_GROUP VARCHAR(190) NULL,
    IS_NONCONCURRENT VARCHAR(1) NULL,
    REQUESTS_RECOVERY VARCHAR(1) NULL,
    PRIMARY KEY (SCHED_NAME,ENTRY_ID))
    ENGINE=InnoDB;
     
    CREATE TABLE QRTZ_SCHEDULER_STATE (
    SCHED_NAME VARCHAR(120) NOT NULL,
    INSTANCE_NAME VARCHAR(190) NOT NULL,
    LAST_CHECKIN_TIME BIGINT(13) NOT NULL,
    CHECKIN_INTERVAL BIGINT(13) NOT NULL,
    PRIMARY KEY (SCHED_NAME,INSTANCE_NAME))
    ENGINE=InnoDB;
     
    CREATE TABLE QRTZ_LOCKS (
    SCHED_NAME VARCHAR(120) NOT NULL,
    LOCK_NAME VARCHAR(40) NOT NULL,
    PRIMARY KEY (SCHED_NAME,LOCK_NAME))
    ENGINE=InnoDB;
     
    CREATE INDEX IDX_QRTZ_J_REQ_RECOVERY ON QRTZ_JOB_DETAILS(SCHED_NAME,REQUESTS_RECOVERY);
    CREATE INDEX IDX_QRTZ_J_GRP ON QRTZ_JOB_DETAILS(SCHED_NAME,JOB_GROUP);
     
    CREATE INDEX IDX_QRTZ_T_J ON QRTZ_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
    CREATE INDEX IDX_QRTZ_T_JG ON QRTZ_TRIGGERS(SCHED_NAME,JOB_GROUP);
    CREATE INDEX IDX_QRTZ_T_C ON QRTZ_TRIGGERS(SCHED_NAME,CALENDAR_NAME);
    CREATE INDEX IDX_QRTZ_T_G ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);
    CREATE INDEX IDX_QRTZ_T_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE);
    CREATE INDEX IDX_QRTZ_T_N_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP,TRIGGER_STATE);
    CREATE INDEX IDX_QRTZ_T_N_G_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP,TRIGGER_STATE);
    CREATE INDEX IDX_QRTZ_T_NEXT_FIRE_TIME ON QRTZ_TRIGGERS(SCHED_NAME,NEXT_FIRE_TIME);
    CREATE INDEX IDX_QRTZ_T_NFT_ST ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE,NEXT_FIRE_TIME);
    CREATE INDEX IDX_QRTZ_T_NFT_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME);
    CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_STATE);
    CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE_GRP ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_GROUP,TRIGGER_STATE);
     
    CREATE INDEX IDX_QRTZ_FT_TRIG_INST_NAME ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME);
    CREATE INDEX IDX_QRTZ_FT_INST_JOB_REQ_RCVRY ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME,REQUESTS_RECOVERY);
    CREATE INDEX IDX_QRTZ_FT_J_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
    CREATE INDEX IDX_QRTZ_FT_JG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_GROUP);
    CREATE INDEX IDX_QRTZ_FT_T_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP);
    CREATE INDEX IDX_QRTZ_FT_TG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);
     
    commit;

    具体表描述如下:

     其中,QRTZ_LOCKS 就是 Quartz 集群实现同步机制的行锁表!

    三、Quartz 集群实践

    3.1、创建springboot项目,导入maven依赖包

    <!--引入boot父类-->
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.0.RELEASE</version>
    </parent>
     
    <!--引入相关包-->
    <dependencies>
        <!--spring boot核心-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!--spring boot 测试-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!--springmvc web-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--开发环境调试-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <optional>true</optional>
        </dependency>
        <!--jpa 支持-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <!--mysql 数据源-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <!--druid 数据连接池-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.17</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-quartz</artifactId>
        </dependency>
        <!--Alibaba Json处理包 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.46</version>
        </dependency>
    </dependencies>

    3.2、创建 application.properties 配置文件

    spring.application.name=springboot-quartz-001
    server.port=8080
     
    #引入数据源
    spring.datasource.url=jdbc:mysql://127.0.0.1:3306/test?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=true
    spring.datasource.username=root
    spring.datasource.password=123456
    spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

    3.3、创建 quartz.properties 配置文件

    #调度配置
    #调度器实例名称
    org.quartz.scheduler.instanceName=SsmScheduler
    #调度器实例编号自动生成
    org.quartz.scheduler.instanceId=AUTO
    #是否在Quartz执行一个job前使用UserTransaction
    org.quartz.scheduler.wrapJobExecutionInUserTransaction=false
     
    #线程池配置
    #线程池的实现类
    org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
    #线程池中的线程数量
    org.quartz.threadPool.threadCount=10
    #线程优先级
    org.quartz.threadPool.threadPriority=5
    #配置是否启动自动加载数据库内的定时任务,默认true
    org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread=true
    #是否设置为守护线程,设置后任务将不会执行
    #org.quartz.threadPool.makeThreadsDaemons=true
     
    #持久化方式配置
    #JobDataMaps是否都为String类型
    org.quartz.jobStore.useProperties=true
    #数据表的前缀,默认QRTZ_
    org.quartz.jobStore.tablePrefix=QRTZ_
    #最大能忍受的触发超时时间
    org.quartz.jobStore.misfireThreshold=60000
    #是否以集群方式运行
    org.quartz.jobStore.isClustered=true
    #调度实例失效的检查时间间隔,单位毫秒
    org.quartz.jobStore.clusterCheckinInterval=2000
    #数据保存方式为数据库持久化
    org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
    #数据库代理类,一般org.quartz.impl.jdbcjobstore.StdJDBCDelegate可以满足大部分数据库
    org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
    #数据库别名 随便取
    org.quartz.jobStore.dataSource=qzDS
     
    #数据库连接池,将其设置为druid
    org.quartz.dataSource.qzDS.connectionProvider.class=com.example.cluster.quartz.config.DruidConnectionProvider
    #数据库引擎
    org.quartz.dataSource.qzDS.driver=com.mysql.cj.jdbc.Driver
    #数据库连接
    org.quartz.dataSource.qzDS.URL=jdbc:mysql://127.0.0.1:3306/test-quartz?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=true
    #数据库用户
    org.quartz.dataSource.qzDS.user=root
    #数据库密码
    org.quartz.dataSource.qzDS.password=123456
    #允许最大连接
    org.quartz.dataSource.qzDS.maxConnection=5
    #验证查询sql,可以不设置
    org.quartz.dataSource.qzDS.validationQuery=select 0 from dual

    3.4、注册 Quartz 任务工厂

    @Component
    public class QuartzJobFactory extends AdaptableJobFactory {
     
        @Autowired
        private AutowireCapableBeanFactory capableBeanFactory;
     
        @Override
        protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
            //调用父类的方法
            Object jobInstance = super.createJobInstance(bundle);
            //进行注入
            capableBeanFactory.autowireBean(jobInstance);
            return jobInstance;
        }
    }

    3.5、注册调度工厂

    @Configuration
    public class QuartzConfig {
     
        @Autowired
        private QuartzJobFactory jobFactory;
     
     
        @Bean
        public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
            //获取配置属性
            PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
            propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
            //在quartz.properties中的属性被读取并注入后再初始化对象
            propertiesFactoryBean.afterPropertiesSet();
            //创建SchedulerFactoryBean
            SchedulerFactoryBean factory = new SchedulerFactoryBean();
            factory.setQuartzProperties(propertiesFactoryBean.getObject());
            factory.setJobFactory(jobFactory);//支持在JOB实例中注入其他的业务对象
            factory.setApplicationContextSchedulerContextKey("applicationContextKey");
            factory.setWaitForJobsToCompleteOnShutdown(true);//这样当spring关闭时,会等待所有已经启动的quartz job结束后spring才能完全shutdown。
            factory.setOverwriteExistingJobs(false);//是否覆盖己存在的Job
            factory.setStartupDelay(10);//QuartzScheduler 延时启动,应用启动完后 QuartzScheduler 再启动
     
            return factory;
        }
     
        /**
         * 通过SchedulerFactoryBean获取Scheduler的实例
         * @return
         * @throws IOException
         * @throws SchedulerException
         */
        @Bean(name = "scheduler")
        public Scheduler scheduler() throws IOException, SchedulerException {
            Scheduler scheduler = schedulerFactoryBean().getScheduler();
            return scheduler;
        }
    }

    3.6、重新设置 Quartz 数据连接池

    默认 Quartz 的数据连接池是 c3p0,由于性能不太稳定,不推荐使用,因此我们将其改成driud数据连接池,配置如下:

    public class DruidConnectionProvider implements ConnectionProvider {
     
        /**
         * 常量配置,与quartz.properties文件的key保持一致(去掉前缀),同时提供set方法,Quartz框架自动注入值。
         * @return
         * @throws SQLException
         */
     
        //JDBC驱动
        public String driver;
        //JDBC连接串
        public String URL;
        //数据库用户名
        public String user;
        //数据库用户密码
        public String password;
        //数据库最大连接数
        public int maxConnection;
        //数据库SQL查询每次连接返回执行到连接池,以确保它仍然是有效的。
        public String validationQuery;
     
        private boolean validateOnCheckout;
     
        private int idleConnectionValidationSeconds;
     
        public String maxCachedStatementsPerConnection;
     
        private String discardIdleConnectionsSeconds;
     
        public static final int DEFAULT_DB_MAX_CONNECTIONS = 10;
     
        public static final int DEFAULT_DB_MAX_CACHED_STATEMENTS_PER_CONNECTION = 120;
     
        //Druid连接池
        private DruidDataSource datasource;
     
        @Override
        public Connection getConnection() throws SQLException {
            return datasource.getConnection();
        }
     
        @Override
        public void shutdown() throws SQLException {
            datasource.close();
        }
     
        @Override
        public void initialize() throws SQLException {
            if (this.URL == null) {
                throw new SQLException("DBPool could not be created: DB URL cannot be null");
            }
     
            if (this.driver == null) {
                throw new SQLException("DBPool driver could not be created: DB driver class name cannot be null!");
            }
     
            if (this.maxConnection < 0) {
                throw new SQLException("DBPool maxConnectins could not be created: Max connections must be greater than zero!");
            }
     
            datasource = new DruidDataSource();
            try{
                datasource.setDriverClassName(this.driver);
            } catch (Exception e) {
                try {
                    throw new SchedulerException("Problem setting driver class name on datasource: " + e.getMessage(), e);
                } catch (SchedulerException e1) {
                }
            }
     
            datasource.setUrl(this.URL);
            datasource.setUsername(this.user);
            datasource.setPassword(this.password);
            datasource.setMaxActive(this.maxConnection);
            datasource.setMinIdle(1);
            datasource.setMaxWait(0);
            datasource.setMaxPoolPreparedStatementPerConnectionSize(DEFAULT_DB_MAX_CONNECTIONS);
     
            if (this.validationQuery != null) {
                datasource.setValidationQuery(this.validationQuery);
                if(!this.validateOnCheckout)
                    datasource.setTestOnReturn(true);
                else
                    datasource.setTestOnBorrow(true);
                datasource.setValidationQueryTimeout(this.idleConnectionValidationSeconds);
            }
        }
     
        public String getDriver() {
            return driver;
        }
     
        public void setDriver(String driver) {
            this.driver = driver;
        }
     
        public String getURL() {
            return URL;
        }
     
        public void setURL(String URL) {
            this.URL = URL;
        }
     
        public String getUser() {
            return user;
        }
     
        public void setUser(String user) {
            this.user = user;
        }
     
        public String getPassword() {
            return password;
        }
     
        public void setPassword(String password) {
            this.password = password;
        }
     
        public int getMaxConnection() {
            return maxConnection;
        }
     
        public void setMaxConnection(int maxConnection) {
            this.maxConnection = maxConnection;
        }
     
        public String getValidationQuery() {
            return validationQuery;
        }
     
        public void setValidationQuery(String validationQuery) {
            this.validationQuery = validationQuery;
        }
     
        public boolean isValidateOnCheckout() {
            return validateOnCheckout;
        }
     
        public void setValidateOnCheckout(boolean validateOnCheckout) {
            this.validateOnCheckout = validateOnCheckout;
        }
     
        public int getIdleConnectionValidationSeconds() {
            return idleConnectionValidationSeconds;
        }
     
        public void setIdleConnectionValidationSeconds(int idleConnectionValidationSeconds) {
            this.idleConnectionValidationSeconds = idleConnectionValidationSeconds;
        }
     
        public DruidDataSource getDatasource() {
            return datasource;
        }
     
        public void setDatasource(DruidDataSource datasource) {
            this.datasource = datasource;
        }
     
        public String getDiscardIdleConnectionsSeconds() {
            return discardIdleConnectionsSeconds;
        }
     
        public void setDiscardIdleConnectionsSeconds(String discardIdleConnectionsSeconds) {
            this.discardIdleConnectionsSeconds = discardIdleConnectionsSeconds;
        }
    }

    创建完成之后,还需要在quartz.properties配置文件中设置一下即可!

    #数据库连接池,将其设置为druid
    org.quartz.dataSource.qzDS.connectionProvider.class=com.example.cluster.quartz.config.DruidConnectionProvider

    如果已经配置,请忽略!

    3.7、编写 Job 具体任务类

    public class TfCommandJob implements Job {
     
        private static final Logger log = LoggerFactory.getLogger(TfCommandJob.class);
     
        @Override
        public void execute(JobExecutionContext context) {
            try {
                System.out.println(context.getScheduler().getSchedulerInstanceId() + "--" + new SimpleDateFormat("YYYY-MM-dd HH:mm:ss").format(new Date()));
            } catch (SchedulerException e) {
                log.error("任务执行失败",e);
            }
        }
    }

    3.8、编写 Quartz 服务层接口

    public interface QuartzJobService {
        /**
         * 添加任务可以传参数
         * @param clazzName
         * @param jobName
         * @param groupName
         * @param cronExp
         * @param param
         */
        void addJob(String clazzName, String jobName, String groupName, String cronExp, Map<String, Object> param);
     
        /**
         * 暂停任务
         * @param jobName
         * @param groupName
         */
        void pauseJob(String jobName, String groupName);
     
        /**
         * 恢复任务
         * @param jobName
         * @param groupName
         */
        void resumeJob(String jobName, String groupName);
     
        /**
         * 立即运行一次定时任务
         * @param jobName
         * @param groupName
         */
        void runOnce(String jobName, String groupName);
     
        /**
         * 更新任务
         * @param jobName
         * @param groupName
         * @param cronExp
         * @param param
         */
        void updateJob(String jobName, String groupName, String cronExp, Map<String, Object> param);
     
        /**
         * 删除任务
         * @param jobName
         * @param groupName
         */
        void deleteJob(String jobName, String groupName);
     
        /**
         * 启动所有任务
         */
        void startAllJobs();
     
        /**
         * 暂停所有任务
         */
        void pauseAllJobs();
     
        /**
         * 恢复所有任务
         */
        void resumeAllJobs();
     
        /**
         * 关闭所有任务
         */
        void shutdownAllJobs();
    }

    对应的实现类QuartzJobServiceImpl如下:

    @Service
    public class QuartzJobServiceImpl implements QuartzJobService {
     
        private static final Logger log = LoggerFactory.getLogger(QuartzJobServiceImpl.class);
     
        @Autowired
        private Scheduler scheduler;
     
        @Override
        public void addJob(String clazzName, String jobName, String groupName, String cronExp, Map<String, Object> param) {
            try {
                // 启动调度器,默认初始化的时候已经启动
    //            scheduler.start();
                //构建job信息
                Class<? extends Job> jobClass = (Class<? extends Job>) Class.forName(clazzName);
                JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, groupName).build();
                //表达式调度构建器(即任务执行的时间)
                CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExp);
                //按新的cronExpression表达式构建一个新的trigger
                CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, groupName).withSchedule(scheduleBuilder).build();
                //获得JobDataMap,写入数据
                if (param != null) {
                    trigger.getJobDataMap().putAll(param);
                }
                scheduler.scheduleJob(jobDetail, trigger);
            } catch (Exception e) {
                log.error("创建任务失败", e);
            }
        }
     
        @Override
        public void pauseJob(String jobName, String groupName) {
            try {
                scheduler.pauseJob(JobKey.jobKey(jobName, groupName));
            } catch (SchedulerException e) {
                log.error("暂停任务失败", e);
            }
        }
     
        @Override
        public void resumeJob(String jobName, String groupName) {
            try {
                scheduler.resumeJob(JobKey.jobKey(jobName, groupName));
            } catch (SchedulerException e) {
                log.error("恢复任务失败", e);
            }
        }
     
        @Override
        public void runOnce(String jobName, String groupName) {
            try {
                scheduler.triggerJob(JobKey.jobKey(jobName, groupName));
            } catch (SchedulerException e) {
                log.error("立即运行一次定时任务失败", e);
            }
        }
     
        @Override
        public void updateJob(String jobName, String groupName, String cronExp, Map<String, Object> param) {
            try {
                TriggerKey triggerKey = TriggerKey.triggerKey(jobName, groupName);
                CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
                if (cronExp != null) {
                    // 表达式调度构建器
                    CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExp);
                    // 按新的cronExpression表达式重新构建trigger
                    trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
                }
                //修改map
                if (param != null) {
                    trigger.getJobDataMap().putAll(param);
                }
                // 按新的trigger重新设置job执行
                scheduler.rescheduleJob(triggerKey, trigger);
            } catch (Exception e) {
                log.error("更新任务失败", e);
            }
        }
     
        @Override
        public void deleteJob(String jobName, String groupName) {
            try {
                //暂停、移除、删除
                scheduler.pauseTrigger(TriggerKey.triggerKey(jobName, groupName));
                scheduler.unscheduleJob(TriggerKey.triggerKey(jobName, groupName));
                scheduler.deleteJob(JobKey.jobKey(jobName, groupName));
            } catch (Exception e) {
                log.error("删除任务失败", e);
            }
        }
     
        @Override
        public void startAllJobs() {
            try {
                scheduler.start();
            } catch (Exception e) {
                log.error("开启所有的任务失败", e);
            }
        }
     
        @Override
        public void pauseAllJobs() {
            try {
                scheduler.pauseAll();
            } catch (Exception e) {
                log.error("暂停所有任务失败", e);
            }
        }
     
        @Override
        public void resumeAllJobs() {
            try {
                scheduler.resumeAll();
            } catch (Exception e) {
                log.error("恢复所有任务失败", e);
            }
        }
     
        @Override
        public void shutdownAllJobs() {
            try {
     
                if (!scheduler.isShutdown()) {
                    // 需谨慎操作关闭scheduler容器
                    // scheduler生命周期结束,无法再 start() 启动scheduler
                    scheduler.shutdown(true);
                }
            } catch (Exception e) {
                log.error("关闭所有的任务失败", e);
            }
        }
    }

    3.9、编写 contoller 服务

    先创建一个请求参数实体类

    public class QuartzConfigDTO implements Serializable {
     
     
        private static final long serialVersionUID = 1L;
        /**
         * 任务名称
         */
        private String jobName;
     
        /**
         * 任务所属组
         */
        private String groupName;
     
        /**
         * 任务执行类
         */
        private String jobClass;
     
        /**
         * 任务调度时间表达式
         */
        private String cronExpression;
     
        /**
         * 附加参数
         */
        private Map<String, Object> param;
     
     
        public String getJobName() {
            return jobName;
        }
     
        public QuartzConfigDTO setJobName(String jobName) {
            this.jobName = jobName;
            return this;
        }
     
        public String getGroupName() {
            return groupName;
        }
     
        public QuartzConfigDTO setGroupName(String groupName) {
            this.groupName = groupName;
            return this;
        }
     
        public String getJobClass() {
            return jobClass;
        }
     
        public QuartzConfigDTO setJobClass(String jobClass) {
            this.jobClass = jobClass;
            return this;
        }
     
        public String getCronExpression() {
            return cronExpression;
        }
     
        public QuartzConfigDTO setCronExpression(String cronExpression) {
            this.cronExpression = cronExpression;
            return this;
        }
     
        public Map<String, Object> getParam() {
            return param;
        }
     
        public QuartzConfigDTO setParam(Map<String, Object> param) {
            this.param = param;
            return this;
        }
    }

    编写 web 服务接口

    @RestController
    @RequestMapping("/test")
    public class TestController {
     
        private static final Logger log = LoggerFactory.getLogger(TestController.class);
     
        @Autowired
        private QuartzJobService quartzJobService;
     
        /**
         * 添加新任务
         * @param configDTO
         * @return
         */
        @RequestMapping("/addJob")
        public Object addJob(@RequestBody QuartzConfigDTO configDTO) {
            quartzJobService.addJob(configDTO.getJobClass(), configDTO.getJobName(), configDTO.getGroupName(), configDTO.getCronExpression(), configDTO.getParam());
            return HttpStatus.OK;
        }
     
        /**
         * 暂停任务
         * @param configDTO
         * @return
         */
        @RequestMapping("/pauseJob")
        public Object pauseJob(@RequestBody QuartzConfigDTO configDTO) {
            quartzJobService.pauseJob(configDTO.getJobName(), configDTO.getGroupName());
            return HttpStatus.OK;
        }
     
        /**
         * 恢复任务
         * @param configDTO
         * @return
         */
        @RequestMapping("/resumeJob")
        public Object resumeJob(@RequestBody QuartzConfigDTO configDTO) {
            quartzJobService.resumeJob(configDTO.getJobName(), configDTO.getGroupName());
            return HttpStatus.OK;
        }
     
        /**
         * 立即运行一次定时任务
         * @param configDTO
         * @return
         */
        @RequestMapping("/runOnce")
        public Object runOnce(@RequestBody QuartzConfigDTO configDTO) {
            quartzJobService.runOnce(configDTO.getJobName(), configDTO.getGroupName());
            return HttpStatus.OK;
        }
     
        /**
         * 更新任务
         * @param configDTO
         * @return
         */
        @RequestMapping("/updateJob")
        public Object updateJob(@RequestBody QuartzConfigDTO configDTO) {
            quartzJobService.updateJob(configDTO.getJobName(), configDTO.getGroupName(), configDTO.getCronExpression(), configDTO.getParam());
            return HttpStatus.OK;
        }
     
        /**
         * 删除任务
         * @param configDTO
         * @return
         */
        @RequestMapping("/deleteJob")
        public Object deleteJob(@RequestBody QuartzConfigDTO configDTO) {
            quartzJobService.deleteJob(configDTO.getJobName(), configDTO.getGroupName());
            return HttpStatus.OK;
        }
     
        /**
         * 启动所有任务
         * @return
         */
        @RequestMapping("/startAllJobs")
        public Object startAllJobs() {
            quartzJobService.startAllJobs();
            return HttpStatus.OK;
        }
     
        /**
         * 暂停所有任务
         * @return
         */
        @RequestMapping("/pauseAllJobs")
        public Object pauseAllJobs() {
            quartzJobService.pauseAllJobs();
            return HttpStatus.OK;
        }
     
        /**
         * 恢复所有任务
         * @return
         */
        @RequestMapping("/resumeAllJobs")
        public Object resumeAllJobs() {
            quartzJobService.resumeAllJobs();
            return HttpStatus.OK;
        }
     
        /**
         * 关闭所有任务
         * @return
         */
        @RequestMapping("/shutdownAllJobs")
        public Object shutdownAllJobs() {
            quartzJobService.shutdownAllJobs();
            return HttpStatus.OK;
        }
     
    }

    3.10、服务接口测试

    运行 SpringBoot 的Application类,启动服务!

      创建一个每5秒钟执行一次的定时任务

     

    可以看到服务正常运行!

    3.11、注册监听器(选用)

    当然,如果你想在 SpringBoot 里面集成 Quartz 的监听器,操作也很简单!

    创建任务调度监听器

    @Component
    public class SimpleSchedulerListener extends SchedulerListenerSupport {
     
        @Override
        public void jobScheduled(Trigger trigger) {
            System.out.println("任务被部署时被执行");
        }
     
        @Override
        public void jobUnscheduled(TriggerKey triggerKey) {
            System.out.println("任务被卸载时被执行");
        }
     
        @Override
        public void triggerFinalized(Trigger trigger) {
            System.out.println("任务完成了它的使命,光荣退休时被执行");
        }
     
        @Override
        public void triggerPaused(TriggerKey triggerKey) {
            System.out.println(triggerKey + "(一个触发器)被暂停时被执行");
        }
     
        @Override
        public void triggersPaused(String triggerGroup) {
            System.out.println(triggerGroup + "所在组的全部触发器被停止时被执行");
        }
     
        @Override
        public void triggerResumed(TriggerKey triggerKey) {
            System.out.println(triggerKey + "(一个触发器)被恢复时被执行");
        }
     
        @Override
        public void triggersResumed(String triggerGroup) {
            System.out.println(triggerGroup + "所在组的全部触发器被回复时被执行");
        }
     
        @Override
        public void jobAdded(JobDetail jobDetail) {
            System.out.println("一个JobDetail被动态添加进来");
        }
     
        @Override
        public void jobDeleted(JobKey jobKey) {
            System.out.println(jobKey + "被删除时被执行");
        }
     
        @Override
        public void jobPaused(JobKey jobKey) {
            System.out.println(jobKey + "被暂停时被执行");
        }
     
        @Override
        public void jobsPaused(String jobGroup) {
            System.out.println(jobGroup + "(一组任务)被暂停时被执行");
        }
     
        @Override
        public void jobResumed(JobKey jobKey) {
            System.out.println(jobKey + "被恢复时被执行");
        }
     
        @Override
        public void jobsResumed(String jobGroup) {
            System.out.println(jobGroup + "(一组任务)被恢复时被执行");
        }
     
        @Override
        public void schedulerError(String msg, SchedulerException cause) {
            System.out.println("出现异常" + msg + "时被执行");
            cause.printStackTrace();
        }
     
        @Override
        public void schedulerInStandbyMode() {
            System.out.println("scheduler被设为standBy等候模式时被执行");
        }
     
        @Override
        public void schedulerStarted() {
            System.out.println("scheduler启动时被执行");
        }
     
        @Override
        public void schedulerStarting() {
            System.out.println("scheduler正在启动时被执行");
        }
     
        @Override
        public void schedulerShutdown() {
            System.out.println("scheduler关闭时被执行");
        }
     
        @Override
        public void schedulerShuttingdown() {
            System.out.println("scheduler正在关闭时被执行");
        }
     
        @Override
        public void schedulingDataCleared() {
            System.out.println("scheduler中所有数据包括jobs, triggers和calendars都被清空时被执行");
        }
    }

    创建任务触发监听器

    @Component
    public class SimpleTriggerListener extends TriggerListenerSupport {
     
        /**
         * Trigger监听器的名称
         * @return
         */
        @Override
        public String getName() {
            return "mySimpleTriggerListener";
        }
     
        /**
         * Trigger被激发 它关联的job即将被运行
         * @param trigger
         * @param context
         */
        @Override
        public void triggerFired(Trigger trigger, JobExecutionContext context) {
            System.out.println("myTriggerListener.triggerFired()");
        }
     
        /**
         * Trigger被激发 它关联的job即将被运行, TriggerListener 给了一个选择去否决 Job 的执行,如果返回TRUE 那么任务job会被终止
         * @param trigger
         * @param context
         * @return
         */
        @Override
        public boolean vetoJobExecution(Trigger trigger, JobExecutionContext context) {
            System.out.println("myTriggerListener.vetoJobExecution()");
            return false;
        }
     
        /**
         * 当Trigger错过被激发时执行,比如当前时间有很多触发器都需要执行,但是线程池中的有效线程都在工作,
         * 那么有的触发器就有可能超时,错过这一轮的触发。
         * @param trigger
         */
        @Override
        public void triggerMisfired(Trigger trigger) {
            System.out.println("myTriggerListener.triggerMisfired()");
        }
     
        /**
         * 任务完成时触发
         * @param trigger
         * @param context
         * @param triggerInstructionCode
         */
        @Override
        public void triggerComplete(Trigger trigger, JobExecutionContext context, Trigger.CompletedExecutionInstruction triggerInstructionCode) {
            System.out.println("myTriggerListener.triggerComplete()");
        }
    }

    创建任务执行监听器

    @Component
    public class SimpleJobListener extends JobListenerSupport {
     
     
        /**
         * job监听器名称
         * @return
         */
        @Override
        public String getName() {
            return "mySimpleJobListener";
        }
     
        /**
         * 任务被调度前
         * @param context
         */
        @Override
        public void jobToBeExecuted(JobExecutionContext context) {
            System.out.println("simpleJobListener监听器,准备执行:"+context.getJobDetail().getKey());
        }
     
        /**
         * 任务调度被拒了
         * @param context
         */
        @Override
        public void jobExecutionVetoed(JobExecutionContext context) {
            System.out.println("simpleJobListener监听器,取消执行:"+context.getJobDetail().getKey());
        }
     
        /**
         * 任务被调度后
         * @param context
         * @param jobException
         */
        @Override
        public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
            System.out.println("simpleJobListener监听器,执行结束:"+context.getJobDetail().getKey());
        }
    }

    最后,将监听器注册到Scheduler

    @Autowired
    private SimpleSchedulerListener simpleSchedulerListener;
     
    @Autowired
    private SimpleJobListener simpleJobListener;
     
    @Autowired
    private SimpleTriggerListener simpleTriggerListener;
     
    @Bean(name = "scheduler")
    public Scheduler scheduler() throws IOException, SchedulerException {
        Scheduler scheduler = schedulerFactoryBean().getScheduler();
        //全局添加监听器
        //添加SchedulerListener监听器
        scheduler.getListenerManager().addSchedulerListener(simpleSchedulerListener);
     
        // 添加JobListener, 支持带条件匹配监听器
        scheduler.getListenerManager().addJobListener(simpleJobListener, KeyMatcher.keyEquals(JobKey.jobKey("myJob", "myGroup")));
     
        // 添加triggerListener,设置全局监听
        scheduler.getListenerManager().addTriggerListener(simpleTriggerListener, EverythingMatcher.allTriggers());
        return scheduler;
    }

    3.12、采用项目数据源(选用)

    在上面的 Quartz 数据源配置中,我们使用了自定义的数据源,目的是和项目中的数据源实现解耦,当然有的同学不想单独建库,想和项目中数据源保持一致,配置也很简单!

    quartz.properties配置文件中,去掉org.quartz.jobStore.dataSource配置:

    #注释掉quartz的数据源配置
    #org.quartz.jobStore.dataSource=qzDS

    QuartzConfig配置类中加入dataSource数据源,并将其注入到quartz中:

    @Autowired
    private DataSource dataSource;
     
    @Bean
    public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
        //...
     
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        factory.setQuartzProperties(propertiesFactoryBean.getObject());
        //使用数据源,自定义数据源
        factory.setDataSource(dataSource);
        
        //...
        return factory;
    }

    四、任务调度测试

    在实际的部署中,项目都是集群进行部署,因此为了和正式环境一致,我们再新建两个相同的项目来测试一下在集群环境下 quartz 是否可以实现分布式调度,保证任何一个定时任务只有一台机器在运行

    理论上,我们只需要将刚刚新建好的项目,重新复制一份,然后修改一下端口号就可以实现本地测试!

    因为curd服务只需要一个,因此我们不需要再编写QuartzJobService等增、删、改服务,仅仅保持QuartzConfig、DruidConnectionProvider、QuartzJobFactory、TfCommandJob、quartz.properties类和配置都是相同的就可以了!

     依次启动服务quartz-001quartz-002quartz-003,看看效果如何:

    第一个启动的服务quartz-001会优先加载数据库中已经配置好的定时任务,其他两个服务quartz-002quartz-003都没有主动调度服务

    当我们主动关闭quartz-001quartz-002服务主动接收任务调度 

    当我们主动关闭quartz-002,同样quartz-003服务主动接收任务调度

     最终结果,和我们预期效果一致!

    五、小结

    本文主要围绕springboot + quartz + mysql实现持久化分布式调度进行介绍,所有的代码功能,都测试过。

  • 相关阅读:
    spark实验四(2)
    spark实验四
    神奇的一天
    Spark实验三
    Scala实验任务三
    Scala实验任务二
    Scala语言实验任务一
    kettle的基本使用
    质量属性之安全性战术
    datax相关
  • 原文地址:https://www.cnblogs.com/xiejn/p/15686444.html
Copyright © 2011-2022 走看看