zoukankan      html  css  js  c++  java
  • SpringBoot+Quartz+MySQL实现分布式定时任务

    第一步:引入依赖

         <!--quartz相关依赖-->
            <dependency>
                <groupId>org.quartz-scheduler</groupId>
                <artifactId>quartz</artifactId>
                <version>2.3.0</version>
            </dependency>
            <dependency>
                <groupId>org.quartz-scheduler</groupId>
                <artifactId>quartz-jobs</artifactId>
                <version>2.3.0</version>
            </dependency>
            <!--定时任务需要依赖context模块-->
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-context-support</artifactId>
            </dependency>

    第二步:创建MySQL表,Quartz是基于表来感知其他定时任务节点的,节点间不会直接通信。建表语句在jar包中自带了。

    orgquartz-schedulerquartz2.3.0quartz-2.3.0.jar!orgquartzimpljdbcjobstore ables_mysql_innodb.sql

     第三步:配置线程池,我这里是因为项目的其他地方有用到线程池,你也可以选择在Quartz的配置类中注入。

    (我在其他位置使用了线程池,占用了一个线程,所以当我将核心线程数量设置为1时,定时任务不会执行;需确保有足够的线程来执行)

    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    import java.util.concurrent.Executor;
    import java.util.concurrent.ThreadPoolExecutor;
    
    /**
     * @Author 1
     * @Description 配置线程池交给Spring容器管理
     * @Date 2020/8/26 18:23
     **/
    @Configuration
    public class ExecturConfig {
        @Bean("taskExector")
        public Executor taskExector() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            //核心线程池数量
            executor.setCorePoolSize(2);
            //最大线程数量
            executor.setMaxPoolSize(5);
            //线程池的队列容量
            executor.setQueueCapacity(10);
            //线程名称的前缀
            executor.setThreadNamePrefix("expireOrderHandle-");
            //配置拒绝策略
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
            executor.initialize();
            return executor;
        }
    
    }

    第四步:因为定时任务业务中需要使用到注入Spring容器的类,所以配置注入,否则报空指针异常。

    参考了一位大佬的博客:https://blog.csdn.net/qq_39513430/article/details/104996237

    import org.quartz.spi.TriggerFiredBundle;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
    import org.springframework.scheduling.quartz.AdaptableJobFactory;
    import org.springframework.stereotype.Component;
    
    @Component("myAdaptableJobFactory")
    public class MyAdaptableJobFactory extends AdaptableJobFactory {
    
        //AutowireCapableBeanFactory 可以将一个对象添加到SpringIOC容器中,并且完成该对象注入
        @Autowired
        private AutowireCapableBeanFactory autowireCapableBeanFactory;
        
        /**
         * 该方法需要将实例化的任务对象手动的添加到springIOC容器中并且完成对象的注入
         */
        @Override
        protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
            Object obj = super.createJobInstance(bundle);
            //将obj对象添加Spring IOC容器中,并完成注入
            this.autowireCapableBeanFactory.autowireBean(obj);
            return obj;
        }
    
    }

    第五步:添加Quartz属性文件

    关于属性配置解释参考https://blog.csdn.net/github_36429631/article/details/63254055

    #============================================================================
    # Configure JobStore
    # Using Spring datasource in SchedulerConfig.java
    # Spring uses LocalDataSourceJobStore extension of JobStoreCMT
    #============================================================================
    #设置为TRUE不会出现序列化非字符串类到 BLOB 时产生的类版本问题
    org.quartz.jobStore.useProperties=true
    #quartz相关数据表前缀名
    org.quartz.jobStore.tablePrefix = QRTZ_
    #开启分布式部署
    org.quartz.jobStore.isClustered = true
    #分布式节点有效性检查时间间隔,单位:毫秒
    org.quartz.jobStore.clusterCheckinInterval = 20000
    #信息保存时间 默认值60秒
    org.quartz.jobStore.misfireThreshold = 60000
    #事务隔离级别为“读已提交”
    org.quartz.jobStore.txIsolationLevelReadCommitted = true
    #配置线程池实现类
    org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
    org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
    
    #============================================================================
    # Configure Main Scheduler Properties
    # Needed to manage cluster instances
    #============================================================================
    org.quartz.scheduler.instanceName = ClusterQuartz
    org.quartz.scheduler.instanceId= AUTO
    #如果你想quartz-scheduler出口本身通过RMI作为服务器,然后设置“出口”标志true(默认值为false)。
    org.quartz.scheduler.rmi.export = false
    #true:链接远程服务调度(客户端),这个也要指定registryhost和registryport,默认为false
    # 如果export和proxy同时指定为true,则export的设置将被忽略
    org.quartz.scheduler.rmi.proxy = false
    org.quartz.scheduler.wrapJobExecutionInUserTransaction = false
    
    #============================================================================
    # Configure ThreadPool
    # Can also be configured in spring configuration
    #============================================================================
    #线程池的实现类(一般使用SimpleThreadPool即可满足几乎所有用户的需求)
    #org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
    #org.quartz.threadPool.threadCount = 5
    #org.quartz.threadPool.threadPriority = 5
    #org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true

    第六步:写任务类

    package com.website.task;
    
    import com.website.mapper.WebTeacherMapper;
    import com.website.pojo.ao.TeacherSalaryRuleAO;
    import com.website.pojo.bo.TeacherSalaryRuleDetailBO;
    import com.website.pojo.bo.TeacherSalaryRuleRelationBO;
    import com.website.pojo.bo.TeacherSalaryStatTempBO;
    import com.website.pojo.bo.TeacherSalaryStatisticBO;
    import io.jsonwebtoken.lang.Collections;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.StringUtils;
    import org.joda.time.DateTime;
    import org.quartz.DisallowConcurrentExecution;
    import org.quartz.JobExecutionContext;
    import org.quartz.JobExecutionException;
    import org.quartz.PersistJobDataAfterExecution;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.quartz.QuartzJobBean;
    
    import java.math.BigDecimal;
    import java.util.ArrayList;
    import java.util.HashSet;
    import java.util.List;
    import java.util.Map;
    import java.util.stream.Collectors;
    
    @PersistJobDataAfterExecution
    @DisallowConcurrentExecution
    @Slf4j
    public class QuartzJob extends QuartzJobBean {
        @Autowired
        private WebTeacherMapper webTeacherMapper;
    
        @Override
        protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
            log.info("统计老师月薪资定时任务开始执行。");
            System.out.println("任务编写位置");
            log.info("统计老师月薪资定时任务执行完毕。");
        }
    
    }

    第七步:配置定时器

    import com.website.task.QuartzJob;
    import org.quartz.Scheduler;
    import org.quartz.TriggerKey;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.config.PropertiesFactoryBean;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.io.ClassPathResource;
    import org.springframework.scheduling.quartz.CronTriggerFactoryBean;
    import org.springframework.scheduling.quartz.JobDetailFactoryBean;
    import org.springframework.scheduling.quartz.SchedulerFactoryBean;
    
    import javax.sql.DataSource;
    import java.io.IOException;
    import java.util.Properties;
    import java.util.concurrent.Executor;
    
    @Configuration
    public class SchedulerConfig {
    
        @Autowired
        private DataSource dataSource;
    
        @Autowired
        private Executor taskExector;
    
        @Autowired
        private MyAdaptableJobFactory myAdaptableJobFactory;
    
    
    
        @Bean
        public Scheduler scheduler() throws Exception {
            Scheduler scheduler = schedulerFactoryBean().getScheduler();
            TriggerKey triggerKey1 = TriggerKey.triggerKey("trigger1", "TriggerTest111");
            /*========如果有必要可以配置删除任务,开始====================*/
            //停止触发器
    //        scheduler.pauseTrigger(triggerKey1);
            //移除触发器
    //        scheduler.unscheduleJob(triggerKey1);
    //        JobKey jobKey1 = JobKey.jobKey("job1111------", "quartzTest--------");
            //删除任务
    //        boolean b = scheduler.deleteJob(jobKey1);
    //        System.out.println(b);
            /*=========结束====================*/
            scheduler.start();
            return scheduler;
        }
    
        @Bean
        public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
            SchedulerFactoryBean factory = new SchedulerFactoryBean();
            //开启更新job
            factory.setOverwriteExistingJobs(true);
            //如果不配置就会使用quartz.properties中的instanceName
            //factory.setSchedulerName("Cluster_Scheduler");
            //配置数据源,这是quartz使用的表的数据库存放位置
            factory.setDataSource(dataSource);
            //设置实例在spring容器中的key
            factory.setApplicationContextSchedulerContextKey("applicationContext");
            //配置线程池
            factory.setTaskExecutor(taskExector);
            //配置配置文件
            factory.setQuartzProperties(quartzProperties());
            //设置调度器自动运行
            factory.setAutoStartup(true);
            //配置任务执行规则,参数是一个可变数组
            factory.setTriggers(trigger1().getObject());
            // 解决mapper无法注入问题,此处配合第四步的配置。
            factory.setJobFactory(myAdaptableJobFactory);
            return factory;
        }
    
    
        @Bean
        public Properties quartzProperties() throws IOException {
            PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
            propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
    
            // 在quartz.properties中的属性被读取并注入后再初始化对象
            propertiesFactoryBean.afterPropertiesSet();
            return propertiesFactoryBean.getObject();
        }
    
        @Bean
        public JobDetailFactoryBean job1() {
            JobDetailFactoryBean jobDetail = new JobDetailFactoryBean();
            //配置任务的具体实现
            jobDetail.setJobClass(QuartzJob.class);
            //是否持久化
            jobDetail.setDurability(true);
            //出现异常是否重新执行
            jobDetail.setRequestsRecovery(true);
            //配置定时任务信息
            jobDetail.setName("TeacherSalaryJob");
            jobDetail.setGroup("TeacherSalaryJobGroup");
            jobDetail.setDescription("这是每月1号凌晨统计教师薪资任务");
            return jobDetail;
        }
    
        @Bean
        public CronTriggerFactoryBean trigger1() {
            CronTriggerFactoryBean cronTrigger = new CronTriggerFactoryBean();
            //定时规则的分组
            cronTrigger.setGroup("TeacherSalaryTriggerGroup");
            cronTrigger.setName("TeacherSalaryTrigger");
            //配置执行的任务jobdetail
            cronTrigger.setJobDetail(job1().getObject());
            //配置执行规则 每月一号0点过1分执行一次
            cronTrigger.setCronExpression("0 1 0 1 * ? ");
            return cronTrigger;
        }
    
    }

    到此完毕,另外发现如果执行任务的代码中报错,会导致定时任务停止循环,重启也不会再执行。建议任务内容用try...catch代码块包裹起来,打印好日志。

    已中断的任务清空Quartz所有表格,再启动项目即可再次触发启动任务。

    如果某一天定时任务突然不执行了,网上很多情况都是远程调用没有加超时中断,从而导致线程阻塞引起的。

    抛异常中断周期执行原因暂未明确,有知道的大佬还请不吝赐教。

  • 相关阅读:
    hdu 5101 Select
    hdu 5100 Chessboard
    cf B. I.O.U.
    cf C. Inna and Dima
    cf B. Inna and Nine
    cf C. Counting Kangaroos is Fun
    Radar Installation 贪心
    spfa模板
    Sequence
    棋盘问题
  • 原文地址:https://www.cnblogs.com/zou-rong/p/13744721.html
Copyright © 2011-2022 走看看