zoukankan      html  css  js  c++  java
  • 分布式调度平台XXL-JOB源码分析-调度中心

    架构图

    上图是我们要进行源码分析的2.1版本的整体架构图。其分为两大块,调度中心和执行器,本文先分析调度中心,也就是xxl-job-admin这个包的代码。

    关键bean

    在application.properties配置正确的数据库连接信息后,直接启动XxlJobAdminApplication即可。

    配置类XxlJobAdminConfig,里面维护了一些调度中心端的配置数据。

    XxlJobScheduler这个组件实现了InitializingBean接口,所以spring容器在初始化的时候会调用afterPropertiesSet方法,此方法如下:

    第一步国际化相关。

    第二步监控相关。

    第三步失败重试相关。

    第四步启动admin端服务,接收注册请求等。

    第五步JobScheduleHelper调度器,死循环,在xxl_job_info表里取将要执行的任务,更新下次执行时间的,调用JobTriggerPoolHelper类,来给执行器发送调度任务的

    JobScheduleHelper

    这个类就是死循环从xxl_job_info表中取出未来5秒内要执行的任务,进行调度分发。

     启动了两个守护线程,先来看scheduleThread。

    死循环内的代码如上图,首先利用for update语句进行获取任务的资格锁定,再去获取未来5秒内即将要执行的任务。

    展开遍历任务的逻辑代码,有三个分支

     

     第一个分支当前任务的触发时间已经超时5秒以上了,不在执行,直接计算下一次触发时间。

     第二个分支为触发时间已满足,利用JobTriggerPoolHelper这个类进行任务调度,之后判断下一次执行时间如果在5秒内,进行此任务数据的缓存,处理逻辑与第三个分支一样。

    对触发时间秒数进行60取模,跟进pushTimeRing方法

    ringData是以0到59的整数为key,以jobId集合为value的Map集合。这个集合数据的处理逻辑,就在我们第二个守护线程ringThread中。

     1 while (!ringThreadToStop) {
     2     try {
     3         // second data
     4         List<Integer> ringItemData = new ArrayList<>();
     5         int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
     6         for (int i = 0; i < 2; i++) {
     7             List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
     8             if (tmpData != null) {
     9                 ringItemData.addAll(tmpData);
    10             }
    11         }
    12         // ring trigger
    13         logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
    14         if (ringItemData!=null && ringItemData.size()>0) {
    15             // do trigger
    16             for (int jobId: ringItemData) {
    17                 // do trigger
    18                 JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
    19             }
    20             // clear
    21             ringItemData.clear();
    22         }
    23     } catch (Exception e) {
    24         if (!ringThreadToStop) {
    25             logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
    26         }
    27     }
    28     // next second, align second
    29     try {
    30         TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
    31     } catch (InterruptedException e) {
    32         if (!ringThreadToStop) {
    33             logger.error(e.getMessage(), e);
    34         }
    35     }
    36 }

    根据当前秒数刻度和前一个刻度进行时间轮的任务获取,之后和上文一样,利用JobTriggerPoolHelper进行任务调度。

    时序图

     

    JobTriggerPoolHelper

    如前文所述,不管是scheduleThread还是ringThread,最后完成任务调度的都是JobTriggerPoolHelper.trigger方法,这个类有两个线程池fastTriggerPool和slowTriggerPool,顾名思义,分别是执行较快任务和较慢任务的,后查官方文档,如下:

      

    minTim属性,作用待明确

    jobTimeoutCountMap属性,计数,key为jobId,value使用AtomicInteger计数。

    helper静态变量指向自己本身,提供外部静态方法调用。

    重要方法,向两种线程池其中之一提交调度任务,进行调度,引出XxlJobTrigger这个类,一路跟进去

    继续跟进

    至此,完成执行器的任务调度。

    时序图

    接收注册和心跳请求

  • 相关阅读:
    linux 文件系统解析及相关命令
    2015暑假佛山移动实习个人总结——大三学生
    java 自动装箱自动拆箱
    java HashMap那点事
    STL学习笔记— —无序容器(Unordered Container)
    GNU C库「glibc」getaddrinfo 发现重大漏洞
    分布式缓存Memcache和Redis
    让你的动画不再生硬 Android插值器Interpolator使用秘籍
    安卓数据解析之 fastjson 的解析以及Gson解析
    (转)Unity3d使用心得(2):Unity3d 动态下载动画资源——AnimationClip 的使用
  • 原文地址:https://www.cnblogs.com/jiangyang/p/11576931.html
Copyright © 2011-2022 走看看