zoukankan      html  css  js  c++  java
  • spring + quartz 分布式自定义注解

    相关技术

        本文采用spring + quartz的方案。使用mysql作为任务的持久化,支持分布式。

    自定义注解

      1.启用定时任务

    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Import(QuartzConfig.class)   //引入配置
    @Documented
    public @interface EnableMScheduling {
    
    }
    
    //该注解需要放在application启动类上,标识启用定时任务,它的作用就是配置、解析任务以及启动调

     2.标识调度类 

    @Target(value = ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Component
    public @interface MScheduleClass {
    
        /**
         * 任务分组,页面显示作用
         * @return
         */
        String module() default "系统";
        
        
        /**
         * 描述,提示作用
         * @return
         */
        String desc() default "";
        
    }
    //
    该注解放置在类上,标识指定的类是一组定时任务,其中需要设置任务分组,描述
    
    

     3.标识执行的方法

    @Target(value = ElementType.METHOD)
    @Retention(RetentionPolicy.RUNTIME)
    public @interface MSchedule {
    
    
        /**
         * 任务名称
         * @return
         */
        String title() default "";
    
        
        /**
         * 调度触发的corn表达式 : 用作Job的触发器,目前只支持一个触发器表达式。
         */
        String corn();
    
        /**
         * 描述
         */
        String desc() default "";
    
        /**
         * 参数
         */
        String param() default "";
    }
    //
    该注解标识在@MScheduleClass标识的类中的方法中,标识指定的方式是任务执行的方法。
    
    

     配置类

    import javax.sql.DataSource;
    
    import org.springframework.beans.factory.ObjectProvider;
    import org.springframework.context.annotation.Bean;
    import org.springframework.core.io.ClassPathResource;
    import org.springframework.scheduling.quartz.SchedulerFactoryBean;
    import org.springframework.transaction.PlatformTransactionManager;
    
    public class QuartzConfig {
    
        /**
         * 配置任务调度器
         * 使用项目数据源作为quartz数据源
         * @param jobFactory 自定义配置任务工厂
         * @param dataSource 数据源实例
         * @return
         * @throws Exception
         */
        @Bean(destroyMethod = "destroy")
        public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource,
            ObjectProvider<PlatformTransactionManager> transactionManager) throws Exception {
            
            SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
            // 项目启动完成后,等待10秒后开始执行调度器初始化
            //schedulerFactoryBean.setStartupDelay(10);
            // 设置调度器自动运行
            schedulerFactoryBean.setAutoStartup(false);
            // 设置数据源,使用与项目统一数据源
            schedulerFactoryBean.setDataSource(dataSource);
    
            PlatformTransactionManager txManager = transactionManager.getIfUnique();
            if (txManager != null) {
                schedulerFactoryBean.setTransactionManager(txManager);
            }
            // 设置上下文spring bean name
            schedulerFactoryBean.setApplicationContextSchedulerContextKey("applicationContext");
            // 设置配置文件位置
            schedulerFactoryBean.setConfigLocation(new ClassPathResource("/quartz.properties"));
            return schedulerFactoryBean;
        }
        
        @Bean
        public MScheduleBeanPostProcessor mScheduleBeanPostProcessor() {
            return new MScheduleBeanPostProcessor();
        }
    }

    其中dataSource我自己用的阿里的druid。  具体配置自行处理

     quartz.properites

    #调度器实例名称
    org.quartz.scheduler.instanceName = quartzScheduler
    
    #调度器实例编号自动生成
    org.quartz.scheduler.instanceId = AUTO
    
    #持久化方式配置
    org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
    
    #持久化方式配置数据驱动,MySQL数据库
    org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
    
    #quartz相关数据表前缀名
    org.quartz.jobStore.tablePrefix = QRTZ_
    
    #开启分布式部署
    org.quartz.jobStore.isClustered = true
    #配置是否使用
    org.quartz.jobStore.useProperties = false
    
    #分布式节点有效性检查时间间隔,单位:毫秒
    org.quartz.jobStore.clusterCheckinInterval = 20000
    
    #线程池实现类
    org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
    
    #执行最大并发线程数量
    org.quartz.threadPool.threadCount = 10
    
    #线程优先级
    org.quartz.threadPool.threadPriority = 5
    
    #配置是否启动自动加载数据库内的定时任务,默认true
    org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true

    quartz初始化的数据库表在org/quartz/impl/jdbcjobstore/tables_@@platform@@.sql

    注解解析beanPosrProcessor

    import java.lang.reflect.Method;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Set;
    
    import org.quartz.CronScheduleBuilder;
    import org.quartz.JobBuilder;
    import org.quartz.JobDataMap;
    import org.quartz.JobDetail;
    import org.quartz.JobKey;
    import org.quartz.Scheduler;
    import org.quartz.SchedulerException;
    import org.quartz.Trigger;
    import org.quartz.TriggerBuilder;
    import org.quartz.TriggerKey;
    import org.quartz.impl.matchers.GroupMatcher;
    import org.springframework.beans.BeansException;
    import org.springframework.beans.factory.DisposableBean;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.config.BeanPostProcessor;
    import org.springframework.context.ApplicationListener;
    import org.springframework.context.event.ContextRefreshedEvent;
    
    import com.mustr.cluster.annotation.MSchedule;
    import com.mustr.cluster.annotation.MScheduleClass;
    
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class MScheduleBeanPostProcessor implements BeanPostProcessor, ApplicationListener<ContextRefreshedEvent>, DisposableBean {
    
        @Autowired
        private Scheduler scheduler;
        
        private List<MustrTask> tasks = new ArrayList<>();
        
        
        @Override
        public void destroy() throws Exception {
            scheduler.shutdown();
        }
    
        @Override
        public void onApplicationEvent(ContextRefreshedEvent event) {
            log.info("all scheduler tasks total {}", tasks.size());
            
            try {
                //先把原来的都删除
                Set<JobKey> jobKeys = scheduler.getJobKeys(GroupMatcher.anyGroup());
                scheduler.deleteJobs(new ArrayList<>(jobKeys));
            } catch (SchedulerException e1) {
                e1.printStackTrace();
            }
            
            //重新添加新的
            tasks.forEach(task -> {
                try {
                    scheduler.scheduleJob(task.getJobDetail(), task.getTrigger());
                } catch (SchedulerException e) {
                    e.printStackTrace();
                }
            });
            
            try {
                scheduler.start(); //启动调度器
            } catch (SchedulerException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
            MScheduleClass msClass = bean.getClass().getAnnotation(MScheduleClass.class);
            if (msClass == null) {
                return bean;
            }
            
            String group = bean.getClass().getSimpleName();
            Method[] methods = bean.getClass().getDeclaredMethods();
            if (methods == null) {
                return bean;
            }
            
            
            for (Method method : methods) {
                MSchedule mSchedule = method.getAnnotation(MSchedule.class);
                if (mSchedule == null) {
                    continue;
                }
                hanlderSchedule(group, mSchedule, method, bean);
            }
            
            return bean;
        }
    
        private void hanlderSchedule(String group, MSchedule mSchedule, Method method, Object bean) {
            String jobName = method.getName();
            
            JobDataMap jobDataMap = new JobDataMap();
            jobDataMap.put("targetClass", bean);
            jobDataMap.put("targetMethod", method.getName());
            jobDataMap.put("arguments", mSchedule.param());
            
            JobDetail jobDetail = JobBuilder.newJob(MustrCommonJob.class)
                .setJobData(jobDataMap)
                .withIdentity(new JobKey(jobName, group))
                .withDescription(mSchedule.desc())
                .storeDurably()
                .build();
            
            Trigger trigger = TriggerBuilder.newTrigger()
                .withIdentity(new TriggerKey(jobName, group))
                .withDescription(mSchedule.desc())
                .withSchedule(CronScheduleBuilder.cronSchedule(mSchedule.corn()).withMisfireHandlingInstructionDoNothing())
                .forJob(jobDetail)
                .build();
            
            tasks.add(new MustrTask(jobDetail, trigger));
        }
    }

    该类就是解析自定义注解的@MScheduleClass@MSchedule标识的任务。封装jobDetailTrigger

    任务job统一使用MustrCommonJob通过反射来执行配置的指定类的指定方法

     

    MustrTask

    import org.quartz.JobDetail;
    import org.quartz.Trigger;
    
    import lombok.AllArgsConstructor;
    import lombok.Getter;
    import lombok.Setter;
    
    @Setter
    @Getter
    @AllArgsConstructor
    public class MustrTask {
    
        private JobDetail jobDetail;
        private Trigger trigger;
    }

    任务类

    import java.io.Serializable;
    
    import org.quartz.Job;
    import org.quartz.JobExecutionContext;
    import org.quartz.JobExecutionException;
    import org.springframework.util.MethodInvoker;
    
    public class MustrCommonJob implements Job, Serializable{
        private static final long serialVersionUID = 8651275978441122356L;
    
        @Override
        public void execute(JobExecutionContext context) throws JobExecutionException {
            Object targetClass = context.getMergedJobDataMap().get("targetClass");
            String targetMethod = context.getMergedJobDataMap().getString("targetMethod");
            String param = context.getMergedJobDataMap().getString("arguments");
            
            //前置处理
            // do ....
            
            try {
                MethodInvoker methodInvoker = new MethodInvoker();
                methodInvoker.setTargetObject(targetClass);
                methodInvoker.setTargetMethod(targetMethod);
                if (param != null && !"".equals(param)) {
                    String[] params = param.split(",");
                    Object[] temp = new Object[params.length];
                    for (int i = 0; i < params.length; i++) {
                        temp[i] = params[i];
                    }
                    methodInvoker.setArguments(temp);
                }
                methodInvoker.prepare();
                methodInvoker.invoke();
            }  catch (Exception e) {
                e.printStackTrace();
            } finally {
                
                //后置处理
                // do  ... 如记录日志
            }
        }
    
    }
    该类实现了quartz的job接口。通过反射调用指定的方法

    一个简单的demo

    import java.io.Serializable;
    import java.time.LocalDateTime;
    
    import com.mustr.cluster.annotation.MSchedule;
    import com.mustr.cluster.annotation.MScheduleClass;
    
    @MScheduleClass(module = "系统", desc = "测试组")
    public class HelloSchedule implements Serializable{
        private static final long serialVersionUID = 3619058186885794136L;
    
        /*@MSchedule(corn = "0/30 * * * * ?", desc = "打印hello world")
        public void hello() {
            System.out.println("hello mustr..... <<<:::>>>" + LocalDateTime.now());
        }*/
        
        @MSchedule(corn = "0/10 * * * * ?", desc = "打印hello world")
        public void hello1() {
            System.out.println("<<<<hello1 mustr..... <<<:::>>>" + LocalDateTime.now());
        }
    }

    最后一步

    在程序启动类中加入 @EnableMScheduling注解,启动项目即可看到控制台打印

    本文代码:https://github.com/Mustr/mustr-quartz-boot

  • 相关阅读:
    VMware虚拟机下网络配置模式
    2021考研规划(持续更新)
    20210326 名词解释及常用下载地址(持续更新)
    20210326学习笔记1---java及hadoop组件最新版本汇总
    20210326日记
    20210325一天总结--进步最大的一天
    20210325学习感悟--学习是开始几小时烦躁,越学习越上瘾;暴食看剧打游戏打飞机,最开始爽,时间越长越厌倦甚至感觉痛苦。
    20210326继续解决----20210325学习笔记2--运行MapReduce Jar(我为什么这样起标题,因为结构化数据才好搜索)
    20210325学习笔记1--解决了打包不生成jar文件的问题
    20210325日记--加油,相信只要基础够扎实熟练,就能找到转职成功。
  • 原文地址:https://www.cnblogs.com/Mustr/p/14029456.html
Copyright © 2011-2022 走看看