zoukankan      html  css  js  c++  java
  • 分布式调度平台XXL-JOB源码分析-调度中心

    架构图

    上图是我们要进行源码分析的2.1版本的整体架构图。其分为两大块,调度中心和执行器,本文先分析调度中心,也就是xxl-job-admin这个包的代码。

    关键bean

    在application.properties配置正确的数据库连接信息后,直接启动XxlJobAdminApplication即可。

    配置类XxlJobAdminConfig,里面维护了一些调度中心端的配置数据。

    XxlJobScheduler这个组件实现了InitializingBean接口,所以spring容器在初始化的时候会调用afterPropertiesSet方法,此方法如下:

    第一步国际化相关。

    第二步监控相关。

    第三步失败重试相关。

    第四步启动admin端服务,接收注册请求等。

    第五步JobScheduleHelper调度器,死循环,在xxl_job_info表里取将要执行的任务,更新下次执行时间的,调用JobTriggerPoolHelper类,来给执行器发送调度任务的

    JobScheduleHelper

    这个类就是死循环从xxl_job_info表中取出未来5秒内要执行的任务,进行调度分发。

     启动了两个守护线程,先来看scheduleThread。

    死循环内的代码如上图,首先利用for update语句进行获取任务的资格锁定,再去获取未来5秒内即将要执行的任务。

    展开遍历任务的逻辑代码,有三个分支

     

     第一个分支当前任务的触发时间已经超时5秒以上了,不在执行,直接计算下一次触发时间。

     第二个分支为触发时间已满足,利用JobTriggerPoolHelper这个类进行任务调度,之后判断下一次执行时间如果在5秒内,进行此任务数据的缓存,处理逻辑与第三个分支一样。

    对触发时间秒数进行60取模,跟进pushTimeRing方法

    ringData是以0到59的整数为key,以jobId集合为value的Map集合。这个集合数据的处理逻辑,就在我们第二个守护线程ringThread中。

     1 while (!ringThreadToStop) {
     2     try {
     3         // second data
     4         List<Integer> ringItemData = new ArrayList<>();
     5         int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
     6         for (int i = 0; i < 2; i++) {
     7             List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
     8             if (tmpData != null) {
     9                 ringItemData.addAll(tmpData);
    10             }
    11         }
    12         // ring trigger
    13         logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
    14         if (ringItemData!=null && ringItemData.size()>0) {
    15             // do trigger
    16             for (int jobId: ringItemData) {
    17                 // do trigger
    18                 JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
    19             }
    20             // clear
    21             ringItemData.clear();
    22         }
    23     } catch (Exception e) {
    24         if (!ringThreadToStop) {
    25             logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
    26         }
    27     }
    28     // next second, align second
    29     try {
    30         TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
    31     } catch (InterruptedException e) {
    32         if (!ringThreadToStop) {
    33             logger.error(e.getMessage(), e);
    34         }
    35     }
    36 }

    根据当前秒数刻度和前一个刻度进行时间轮的任务获取,之后和上文一样,利用JobTriggerPoolHelper进行任务调度。

    时序图

     

    JobTriggerPoolHelper

    如前文所述,不管是scheduleThread还是ringThread,最后完成任务调度的都是JobTriggerPoolHelper.trigger方法,这个类有两个线程池fastTriggerPool和slowTriggerPool,顾名思义,分别是执行较快任务和较慢任务的,后查官方文档,如下:

      

    minTim属性,作用待明确

    jobTimeoutCountMap属性,计数,key为jobId,value使用AtomicInteger计数。

    helper静态变量指向自己本身,提供外部静态方法调用。

    重要方法,向两种线程池其中之一提交调度任务,进行调度,引出XxlJobTrigger这个类,一路跟进去

    继续跟进

    至此,完成执行器的任务调度。

    时序图

    接收注册和心跳请求

  • 相关阅读:
    webpack基础
    LeetCode232. 用栈实现队列做题笔记
    mysql 时间加减一个月
    leetcode 1381. 设计一个支持增量操作的栈 思路与算法
    LeetCode 141. 环形链表 做题笔记
    leetcode 707. 设计链表 做题笔记
    leetcode 876. 链表的中间结点 做题笔记
    leetcode 143. 重排链表 做题笔记
    leetcode 1365. 有多少小于当前数字的数字 做题笔记
    LeetCode1360. 日期之间隔几天 做题笔记
  • 原文地址:https://www.cnblogs.com/jiangyang/p/11576931.html
Copyright © 2011-2022 走看看