相关技术
本文采用spring + quartz的方案。使用mysql作为任务的持久化,支持分布式。
自定义注解
1.启用定时任务
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Import(QuartzConfig.class) //引入配置 @Documented public @interface EnableMScheduling { } //该注解需要放在application启动类上,标识启用定时任务,它的作用就是配置、解析任务以及启动调
2.标识调度类
@Target(value = ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Component public @interface MScheduleClass { /** * 任务分组,页面显示作用 * @return */ String module() default "系统"; /** * 描述,提示作用 * @return */ String desc() default ""; }
//该注解放置在类上,标识指定的类是一组定时任务,其中需要设置任务分组,描述
3.标识执行的方法
@Target(value = ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface MSchedule { /** * 任务名称 * @return */ String title() default ""; /** * 调度触发的corn表达式 : 用作Job的触发器,目前只支持一个触发器表达式。 */ String corn(); /** * 描述 */ String desc() default ""; /** * 参数 */ String param() default ""; }
//该注解标识在@MScheduleClass标识的类中的方法中,标识指定的方式是任务执行的方法。
配置类
import javax.sql.DataSource; import org.springframework.beans.factory.ObjectProvider; import org.springframework.context.annotation.Bean; import org.springframework.core.io.ClassPathResource; import org.springframework.scheduling.quartz.SchedulerFactoryBean; import org.springframework.transaction.PlatformTransactionManager; public class QuartzConfig { /** * 配置任务调度器 * 使用项目数据源作为quartz数据源 * @param jobFactory 自定义配置任务工厂 * @param dataSource 数据源实例 * @return * @throws Exception */ @Bean(destroyMethod = "destroy") public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource, ObjectProvider<PlatformTransactionManager> transactionManager) throws Exception { SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean(); // 项目启动完成后,等待10秒后开始执行调度器初始化 //schedulerFactoryBean.setStartupDelay(10); // 设置调度器自动运行 schedulerFactoryBean.setAutoStartup(false); // 设置数据源,使用与项目统一数据源 schedulerFactoryBean.setDataSource(dataSource); PlatformTransactionManager txManager = transactionManager.getIfUnique(); if (txManager != null) { schedulerFactoryBean.setTransactionManager(txManager); } // 设置上下文spring bean name schedulerFactoryBean.setApplicationContextSchedulerContextKey("applicationContext"); // 设置配置文件位置 schedulerFactoryBean.setConfigLocation(new ClassPathResource("/quartz.properties")); return schedulerFactoryBean; } @Bean public MScheduleBeanPostProcessor mScheduleBeanPostProcessor() { return new MScheduleBeanPostProcessor(); } }
其中dataSource我自己用的阿里的druid。 具体配置自行处理
quartz.properites
#调度器实例名称 org.quartz.scheduler.instanceName = quartzScheduler #调度器实例编号自动生成 org.quartz.scheduler.instanceId = AUTO #持久化方式配置 org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX #持久化方式配置数据驱动,MySQL数据库 org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate #quartz相关数据表前缀名 org.quartz.jobStore.tablePrefix = QRTZ_ #开启分布式部署 org.quartz.jobStore.isClustered = true #配置是否使用 org.quartz.jobStore.useProperties = false #分布式节点有效性检查时间间隔,单位:毫秒 org.quartz.jobStore.clusterCheckinInterval = 20000 #线程池实现类 org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool #执行最大并发线程数量 org.quartz.threadPool.threadCount = 10 #线程优先级 org.quartz.threadPool.threadPriority = 5 #配置是否启动自动加载数据库内的定时任务,默认true org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true
quartz初始化的数据库表在org/quartz/impl/jdbcjobstore/tables_@@platform@@.sql
注解解析beanPosrProcessor
import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; import java.util.Set; import org.quartz.CronScheduleBuilder; import org.quartz.JobBuilder; import org.quartz.JobDataMap; import org.quartz.JobDetail; import org.quartz.JobKey; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.quartz.Trigger; import org.quartz.TriggerBuilder; import org.quartz.TriggerKey; import org.quartz.impl.matchers.GroupMatcher; import org.springframework.beans.BeansException; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import com.mustr.cluster.annotation.MSchedule; import com.mustr.cluster.annotation.MScheduleClass; import lombok.extern.slf4j.Slf4j; @Slf4j public class MScheduleBeanPostProcessor implements BeanPostProcessor, ApplicationListener<ContextRefreshedEvent>, DisposableBean { @Autowired private Scheduler scheduler; private List<MustrTask> tasks = new ArrayList<>(); @Override public void destroy() throws Exception { scheduler.shutdown(); } @Override public void onApplicationEvent(ContextRefreshedEvent event) { log.info("all scheduler tasks total {}", tasks.size()); try { //先把原来的都删除 Set<JobKey> jobKeys = scheduler.getJobKeys(GroupMatcher.anyGroup()); scheduler.deleteJobs(new ArrayList<>(jobKeys)); } catch (SchedulerException e1) { e1.printStackTrace(); } //重新添加新的 tasks.forEach(task -> { try { scheduler.scheduleJob(task.getJobDetail(), task.getTrigger()); } catch (SchedulerException e) { e.printStackTrace(); } }); try { scheduler.start(); //启动调度器 } catch (SchedulerException e) { e.printStackTrace(); } } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { MScheduleClass msClass = bean.getClass().getAnnotation(MScheduleClass.class); if (msClass == null) { return bean; } String group = bean.getClass().getSimpleName(); Method[] methods = bean.getClass().getDeclaredMethods(); if (methods == null) { return bean; } for (Method method : methods) { MSchedule mSchedule = method.getAnnotation(MSchedule.class); if (mSchedule == null) { continue; } hanlderSchedule(group, mSchedule, method, bean); } return bean; } private void hanlderSchedule(String group, MSchedule mSchedule, Method method, Object bean) { String jobName = method.getName(); JobDataMap jobDataMap = new JobDataMap(); jobDataMap.put("targetClass", bean); jobDataMap.put("targetMethod", method.getName()); jobDataMap.put("arguments", mSchedule.param()); JobDetail jobDetail = JobBuilder.newJob(MustrCommonJob.class) .setJobData(jobDataMap) .withIdentity(new JobKey(jobName, group)) .withDescription(mSchedule.desc()) .storeDurably() .build(); Trigger trigger = TriggerBuilder.newTrigger() .withIdentity(new TriggerKey(jobName, group)) .withDescription(mSchedule.desc()) .withSchedule(CronScheduleBuilder.cronSchedule(mSchedule.corn()).withMisfireHandlingInstructionDoNothing()) .forJob(jobDetail) .build(); tasks.add(new MustrTask(jobDetail, trigger)); } }
该类就是解析自定义注解的@MScheduleClass和@MSchedule标识的任务。封装jobDetail和Trigger。
任务job统一使用MustrCommonJob通过反射来执行配置的指定类的指定方法
MustrTask
import org.quartz.JobDetail; import org.quartz.Trigger; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.Setter; @Setter @Getter @AllArgsConstructor public class MustrTask { private JobDetail jobDetail; private Trigger trigger; }
任务类
import java.io.Serializable; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.springframework.util.MethodInvoker; public class MustrCommonJob implements Job, Serializable{ private static final long serialVersionUID = 8651275978441122356L; @Override public void execute(JobExecutionContext context) throws JobExecutionException { Object targetClass = context.getMergedJobDataMap().get("targetClass"); String targetMethod = context.getMergedJobDataMap().getString("targetMethod"); String param = context.getMergedJobDataMap().getString("arguments"); //前置处理 // do .... try { MethodInvoker methodInvoker = new MethodInvoker(); methodInvoker.setTargetObject(targetClass); methodInvoker.setTargetMethod(targetMethod); if (param != null && !"".equals(param)) { String[] params = param.split(","); Object[] temp = new Object[params.length]; for (int i = 0; i < params.length; i++) { temp[i] = params[i]; } methodInvoker.setArguments(temp); } methodInvoker.prepare(); methodInvoker.invoke(); } catch (Exception e) { e.printStackTrace(); } finally { //后置处理 // do ... 如记录日志 } } }
该类实现了quartz的job接口。通过反射调用指定的方法
一个简单的demo
import java.io.Serializable; import java.time.LocalDateTime; import com.mustr.cluster.annotation.MSchedule; import com.mustr.cluster.annotation.MScheduleClass; @MScheduleClass(module = "系统", desc = "测试组") public class HelloSchedule implements Serializable{ private static final long serialVersionUID = 3619058186885794136L; /*@MSchedule(corn = "0/30 * * * * ?", desc = "打印hello world") public void hello() { System.out.println("hello mustr..... <<<:::>>>" + LocalDateTime.now()); }*/ @MSchedule(corn = "0/10 * * * * ?", desc = "打印hello world") public void hello1() { System.out.println("<<<<hello1 mustr..... <<<:::>>>" + LocalDateTime.now()); } }
最后一步
在程序启动类中加入 @EnableMScheduling注解,启动项目即可看到控制台打印
本文代码:https://github.com/Mustr/mustr-quartz-boot