zoukankan      html  css  js  c++  java
  • spring 定时器任务深入理解

    spring配置文件中配置

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
        xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:aop="http://www.springframework.org/schema/aop"
        xmlns:task="http://www.springframework.org/schema/task"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/context
           http://www.springframework.org/schema/context/spring-context-4.2.xsd
           http://www.springframework.org/schema/mvc
           http://www.springframework.org/schema/mvc/spring-mvc-4.2.xsd
           http://www.springframework.org/schema/aop
           http://www.springframework.org/schema/aop/spring-aop-4.2.xsd
           http://www.springframework.org/schema/task 
           http://www.springframework.org/schema/task/spring-task-3.0.xsd">
    
        <bean class="com.wjz.quartz.RepayQuartz"/>
        <task:annotation-driven/>
    </beans>

    定时任务处理类

    package com.wjz.quartz;
    
    import org.springframework.scheduling.annotation.Scheduled;

    public class RepayQuartz { @Scheduled(cron="0/10 * * * * ?") public void repay() { System.out.println("sping 定时任务"); } }

    <task:annotation-driven/>标签使用TaskNamespaceHandler 来处理

    public class TaskNamespaceHandler extends NamespaceHandlerSupport {
    
        @Override
        public void init() {
         // 注册了一些解析器其中AnnotationDrivenBeanDefinitionParser解析@Scheduled注解
    this.registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser()); this.registerBeanDefinitionParser("executor", new ExecutorBeanDefinitionParser()); this.registerBeanDefinitionParser("scheduled-tasks", new ScheduledTasksBeanDefinitionParser()); this.registerBeanDefinitionParser("scheduler", new SchedulerBeanDefinitionParser()); } }

    注册了两个后置处理器

    org.springframework.scheduling.annotation.AsyncAnnotationBeanPostProcessor

    org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor

    我们主要来看ScheduledAnnotationBeanPostProcessor

    我们可以看到他实现了BeanPostProcesser,ApplicationListener接口

    先执行初始化后的后置处理

    @Override
        public Object postProcessAfterInitialization(final Object bean, String beanName) {
            Class<?> targetClass = AopUtils.getTargetClass(bean);
            if (!this.nonAnnotatedClasses.contains(targetClass)) {
           // 获得bean的所有的方法,获得方法信息和@Scheduled注解信息 Map
    <Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, new MethodIntrospector.MetadataLookup<Set<Scheduled>>() { @Override public Set<Scheduled> inspect(Method method) { Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(method, Scheduled.class, Schedules.class); return (!scheduledMethods.isEmpty() ? scheduledMethods : null); } }); if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(targetClass); if (logger.isTraceEnabled()) { logger.trace("No @Scheduled annotations found on bean class: " + bean.getClass()); } } else { for (Map.Entry<Method, Set<Scheduled>> entry : annotatedMethods.entrySet()) { Method method = entry.getKey(); for (Scheduled scheduled : entry.getValue()) {
                   // 处理加工方法和注解信息,后文详解 processScheduled(scheduled, method, bean); } }
    if (logger.isDebugEnabled()) { logger.debug(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName + "': " + annotatedMethods); } } } return bean; }
        protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
            try {
                Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());
           // 创建了一个Runnable对象,主要是用来反射定时方法的,后文详解#1 Runnable runnable
    = new ScheduledMethodRunnable(bean, invocableMethod); boolean processedSchedule = false; String errorMessage = "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";         // 初始化一个定时任务集合 Set<ScheduledTask> tasks = this.scheduledTasks.get(bean); if (tasks == null) { tasks = new LinkedHashSet<ScheduledTask>(4); this.scheduledTasks.put(bean, tasks); } // Determine initial delay long initialDelay = scheduled.initialDelay(); String initialDelayString = scheduled.initialDelayString(); if (StringUtils.hasText(initialDelayString)) { Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both"); if (this.embeddedValueResolver != null) { initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString); } try { initialDelay = Long.parseLong(initialDelayString); } catch (NumberFormatException ex) { throw new IllegalArgumentException( "Invalid initialDelayString value "" + initialDelayString + "" - cannot parse into integer"); } } // 解析表达式 String cron = scheduled.cron(); if (StringUtils.hasText(cron)) { Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers"); processedSchedule = true; String zone = scheduled.zone(); if (this.embeddedValueResolver != null) { cron = this.embeddedValueResolver.resolveStringValue(cron); zone = this.embeddedValueResolver.resolveStringValue(zone); } TimeZone timeZone; if (StringUtils.hasText(zone)) { timeZone = StringUtils.parseTimeZoneString(zone); } else {
                // 没有指定时区的话使用默认时区 timeZone
    = TimeZone.getDefault(); }
              // 将表达式解析成任务计划表填充到任务集合中,后文详解#2 tasks.add(
    this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone)))); } // At this point we don't need to differentiate between initial delay set or not anymore if (initialDelay < 0) { initialDelay = 0; } // Check fixed delay long fixedDelay = scheduled.fixedDelay(); if (fixedDelay >= 0) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; tasks.add(this.registrar.scheduleFixedDelayTask(new IntervalTask(runnable, fixedDelay, initialDelay))); } String fixedDelayString = scheduled.fixedDelayString(); if (StringUtils.hasText(fixedDelayString)) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; if (this.embeddedValueResolver != null) { fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString); } try { fixedDelay = Long.parseLong(fixedDelayString); } catch (NumberFormatException ex) { throw new IllegalArgumentException( "Invalid fixedDelayString value "" + fixedDelayString + "" - cannot parse into integer"); } tasks.add(this.registrar.scheduleFixedDelayTask(new IntervalTask(runnable, fixedDelay, initialDelay))); } // Check fixed rate long fixedRate = scheduled.fixedRate(); if (fixedRate >= 0) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; tasks.add(this.registrar.scheduleFixedRateTask(new IntervalTask(runnable, fixedRate, initialDelay))); } String fixedRateString = scheduled.fixedRateString(); if (StringUtils.hasText(fixedRateString)) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; if (this.embeddedValueResolver != null) { fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString); } try { fixedRate = Long.parseLong(fixedRateString); } catch (NumberFormatException ex) { throw new IllegalArgumentException( "Invalid fixedRateString value "" + fixedRateString + "" - cannot parse into integer"); } tasks.add(this.registrar.scheduleFixedRateTask(new IntervalTask(runnable, fixedRate, initialDelay))); } // Check whether we had any attribute set Assert.isTrue(processedSchedule, errorMessage); } catch (IllegalArgumentException ex) { throw new IllegalStateException( "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage()); } }

    书接前文#1

    Runnable的run方法中是定时方法反射

        @Override
        public void run() {
            try {
                ReflectionUtils.makeAccessible(this.method);
                this.method.invoke(this.target);
            }
            catch (InvocationTargetException ex) {
                ReflectionUtils.rethrowRuntimeException(ex.getTargetException());
            }
            catch (IllegalAccessException ex) {
                throw new UndeclaredThrowableException(ex);
            }
        }

    书接前文#2

    创建一个CronTask任务对象,其中有Runnable对象和CronTrigger触发对象(表达式,时区),后文详解触发器的构造#2-1

    public ScheduledTask scheduleCronTask(CronTask task) {
            ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
            boolean newTask = false;
            if (scheduledTask == null) {
                scheduledTask = new ScheduledTask();
                newTask = true;
            }
            if (this.taskScheduler != null) {
            // 执行定时任务 scheduledTask.future
    = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger()); } else {
            // 添加定时任务 addCronTask(task);
    this.unresolvedTasks.put(task, scheduledTask); } return (newTask ? scheduledTask : null); }

    再看监听处理

        @Override
        public void onApplicationEvent(ContextRefreshedEvent event) {
            if (event.getApplicationContext() == this.applicationContext) {
                // Running in an ApplicationContext -> register tasks this late...
                // giving other ContextRefreshedEvent listeners a chance to perform
                // their work at the same time (e.g. Spring Batch's job registration).
                finishRegistration();
            }
        }
    this.registrar.afterPropertiesSet();
    protected void scheduleTasks() {
            if (this.taskScheduler == null) {
            // 初始化一个线程池调度器
    this.localExecutor = Executors.newSingleThreadScheduledExecutor();
            // 指定了任务调度器
    this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor); } if (this.triggerTasks != null) { for (TriggerTask task : this.triggerTasks) { addScheduledTask(scheduleTriggerTask(task)); } } if (this.cronTasks != null) { for (CronTask task : this.cronTasks) { addScheduledTask(scheduleCronTask(task)); } } if (this.fixedRateTasks != null) { for (IntervalTask task : this.fixedRateTasks) { addScheduledTask(scheduleFixedRateTask(task)); } } if (this.fixedDelayTasks != null) { for (IntervalTask task : this.fixedDelayTasks) { addScheduledTask(scheduleFixedDelayTask(task)); } } }
    public ScheduledTask scheduleCronTask(CronTask task) {
            ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
            boolean newTask = false;
            if (scheduledTask == null) {
                scheduledTask = new ScheduledTask();
                newTask = true;
            }
         // 这次指定了ConcurrentTaskScheduler任务调度器,开始调度定时任务
    if (this.taskScheduler != null) { scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger()); } else { addCronTask(task); this.unresolvedTasks.put(task, scheduledTask); } return (newTask ? scheduledTask : null); }
    @Override
        public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
            try {
                if (this.enterpriseConcurrentScheduler) {
                    return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger);
                }
                else {
                    ErrorHandler errorHandler = (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));
              // 这里就是核心的地方了, Runnable的run方法内,定时调用自己的run方法实现功能,巧妙之极
    return new ReschedulingRunnable(task, trigger, this.scheduledExecutor, errorHandler).schedule(); } } catch (RejectedExecutionException ex) { throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex); } }
    public ScheduledFuture<?> schedule() {
            synchronized (this.triggerContextMonitor) {
            // 拿到下一次执行任务的时间,后文详解#2-2
    this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext); if (this.scheduledExecutionTime == null) { return null; }
            // 固定延迟即延迟多长时间执行
    long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis(); this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS); return this; } } @Override public void run() { Date actualExecutionTime = new Date();
         // 这就是上文所提到的定时方法反射执行定时业务方法
    super.run(); Date completionTime = new Date(); synchronized (this.triggerContextMonitor) {
            // 记录上一次任务执行的时间
    this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime); if (!this.currentFuture.isCancelled()) {
              // 继续无限调度 schedule(); } } }

     书接前文#2-1

    CronTrigger触发对象的构造,初始化了计划时间表生成器

    public CronTrigger(String expression, TimeZone timeZone) {
            this.sequenceGenerator = new CronSequenceGenerator(expression, timeZone);
        }
    public CronSequenceGenerator(String expression, TimeZone timeZone) {
            this.expression = expression;
            this.timeZone = timeZone;
         // 解析表达式生成计划时间表 parse(expression); }
    private void parse(String expression) throws IllegalArgumentException {
            String[] fields = StringUtils.tokenizeToStringArray(expression, " ");
            if (!areValidCronFields(fields)) {
                throw new IllegalArgumentException(String.format(
                        "Cron expression must consist of 6 fields (found %d in "%s")", fields.length, expression));
            }
         // 主要使用了BitSet来制造计划时间表 setNumberHits(
    this.seconds, fields[0], 0, 60); setNumberHits(this.minutes, fields[1], 0, 60); setNumberHits(this.hours, fields[2], 0, 24); setDaysOfMonth(this.daysOfMonth, fields[3]); setMonths(this.months, fields[4]); setDays(this.daysOfWeek, replaceOrdinals(fields[5], "SUN,MON,TUE,WED,THU,FRI,SAT"), 8); if (this.daysOfWeek.get(7)) { // Sunday can be represented as 0 or 7 this.daysOfWeek.set(0); this.daysOfWeek.clear(7); } }
    private void setNumberHits(BitSet bits, String value, int min, int max) {
            String[] fields = StringUtils.delimitedListToStringArray(value, ",");
            for (String field : fields) {
           // 检查是否包含'/',不包含的话列出最小值到最大值的范围
    if (!field.contains("/")) { // Not an incrementer so it must be a range (possibly empty) int[] range = getRange(field, min, max); bits.set(range[0], range[1] + 1); } else {
    // 包含'/'的话列出每xx秒(或分、时)的时间表如{0,10,20,30,40,50} String[] split
    = StringUtils.delimitedListToStringArray(field, "/"); if (split.length > 2) { throw new IllegalArgumentException("Incrementer has more than two fields: '" + field + "' in expression "" + this.expression + """); } int[] range = getRange(split[0], min, max); if (!split[0].contains("-")) { range[1] = max - 1; } int delta = Integer.valueOf(split[1]); if (delta <= 0) { throw new IllegalArgumentException("Incrementer delta must be 1 or higher: '" + field + "' in expression "" + this.expression + """); }
              // 以一定数值叠加为BitSet赋值
    for (int i = range[0]; i <= range[1]; i += delta) { bits.set(i); } } } }
    private int[] getRange(String field, int min, int max) {
            int[] result = new int[2];
         // 包含*就是最大到最小
    if (field.contains("*")) { result[0] = min; result[1] = max - 1; return result; }
         // 不包含'/'和'-'返回值
    if (!field.contains("-")) { result[0] = result[1] = Integer.valueOf(field); } else { String[] split = StringUtils.delimitedListToStringArray(field, "-"); if (split.length > 2) { throw new IllegalArgumentException("Range has more than two fields: '" + field + "' in expression "" + this.expression + """); } result[0] = Integer.valueOf(split[0]); result[1] = Integer.valueOf(split[1]); } if (result[0] >= max || result[1] >= max) { throw new IllegalArgumentException("Range exceeds maximum (" + max + "): '" + field + "' in expression "" + this.expression + """); } if (result[0] < min || result[1] < min) { throw new IllegalArgumentException("Range less than minimum (" + min + "): '" + field + "' in expression "" + this.expression + """); } if (result[0] > result[1]) { throw new IllegalArgumentException("Invalid inverted range: '" + field + "' in expression "" + this.expression + """); } return result; }

    书接前文#2-2

    拿到下一个执行调度的时间

    @Override
        public Date nextExecutionTime(TriggerContext triggerContext) {
         // 拿到上一次执行调度的时间 Date date
    = triggerContext.lastCompletionTime(); if (date != null) { Date scheduled = triggerContext.lastScheduledExecutionTime(); if (scheduled != null && date.before(scheduled)) { date = scheduled; } } else {
            // 没拿到的话就拿当前时间 date
    = new Date(); }
         // 拿到一下一次执行调度的时间
    return this.sequenceGenerator.next(date); }
    public Date next(Date date) {
    Calendar calendar = new GregorianCalendar(); calendar.setTimeZone(this.timeZone); calendar.setTime(date); calendar.set(Calendar.MILLISECOND, 0); long originalTimestamp = calendar.getTimeInMillis();
         // 为日历设置下一次执行调度的时间 doNext(calendar, calendar.get(Calendar.YEAR));
    if (calendar.getTimeInMillis() == originalTimestamp) { calendar.add(Calendar.SECOND, 1); doNext(calendar, calendar.get(Calendar.YEAR)); }      // 返回设置好下一次调度的日历时间 return calendar.getTime(); }
    private void doNext(Calendar calendar, int dot) {
            List<Integer> resets = new ArrayList<Integer>();
         // 拿到当前时间的秒数
            int second = calendar.get(Calendar.SECOND);
            List<Integer> emptyList = Collections.emptyList();
         // 拿到下一次执行调度的秒数
    int updateSecond = findNext(this.seconds, second, calendar, Calendar.SECOND, Calendar.MINUTE, emptyList); if (second == updateSecond) { resets.add(Calendar.SECOND); } int minute = calendar.get(Calendar.MINUTE); int updateMinute = findNext(this.minutes, minute, calendar, Calendar.MINUTE, Calendar.HOUR_OF_DAY, resets); if (minute == updateMinute) { resets.add(Calendar.MINUTE); } else { doNext(calendar, dot); } int hour = calendar.get(Calendar.HOUR_OF_DAY); int updateHour = findNext(this.hours, hour, calendar, Calendar.HOUR_OF_DAY, Calendar.DAY_OF_WEEK, resets); if (hour == updateHour) { resets.add(Calendar.HOUR_OF_DAY); } else { doNext(calendar, dot); } int dayOfWeek = calendar.get(Calendar.DAY_OF_WEEK); int dayOfMonth = calendar.get(Calendar.DAY_OF_MONTH); int updateDayOfMonth = findNextDay(calendar, this.daysOfMonth, dayOfMonth, daysOfWeek, dayOfWeek, resets); if (dayOfMonth == updateDayOfMonth) { resets.add(Calendar.DAY_OF_MONTH); } else { doNext(calendar, dot); } int month = calendar.get(Calendar.MONTH); int updateMonth = findNext(this.months, month, calendar, Calendar.MONTH, Calendar.YEAR, resets); if (month != updateMonth) { if (calendar.get(Calendar.YEAR) - dot > 4) { throw new IllegalArgumentException("Invalid cron expression "" + this.expression + "" led to runaway search for next trigger"); } doNext(calendar, dot); } }
    private int findNext(BitSet bits, int value, Calendar calendar, int field, int nextField, List<Integer> lowerOrders) {
         // 拿到下一个执行调度的秒数如当前时间的秒数是23任务是每10秒调度则拿到30,如果是58则返回-1
    int nextValue = bits.nextSetBit(value); if (nextValue == -1) {
            // 加一分(时,日) calendar.add(nextField,
    1);
            // 重置秒(分、时) reset(calendar, Arrays.asList(field)); nextValue
    = bits.nextSetBit(0); } if (nextValue != value) {
            // 为日历设置下一个执行调度的秒(分、时)数 calendar.set(field, nextValue); reset(calendar, lowerOrders); }
    return nextValue; }
  • 相关阅读:
    C#基础(WinForm窗体的单例模式,避免窗体被实例化多次)
    NPOI基础入门(旧版本)
    SQLite数据插入异常
    EClipse开发NDK流程
    git 常用命令
    6.0权限的简单实用
    MVVM模式
    去掉所有字符里面的空格换行符等
    高逼格的实现WiFi共享,不安装第三方wifi共享软件,两种方式实现开启wifi的功能
    常用的正则表达表达式以及简单用法
  • 原文地址:https://www.cnblogs.com/BINGJJFLY/p/7485599.html
Copyright © 2011-2022 走看看