zoukankan      html  css  js  c++  java
  • xxljob学习4:任务调度器

    在xxl-job中,有三个角色,一个是执行器,一个人服务端(注册中心),另一个就是调度中心了。而任务调度器可以说是调度中心的最核心,我们发往执行器的任务,都是从任务调度器发出来的(除手动执行的)。

    调度中心在服务启动初始化Bean时,执行了com.xxl.job.admin.core.conf.XxlJobAdminConfig#afterPropertiesSet方法

    其中XxlJobScheduler的init方法

    public void init() throws Exception {
            // init i18n
            initI18n();
    
            // admin trigger pool start
            JobTriggerPoolHelper.toStart();
    
            // admin registry monitor run
            JobRegistryHelper.getInstance().start();
    
            // admin fail-monitor run
            JobFailMonitorHelper.getInstance().start();
    
            // admin lose-monitor run ( depend on JobTriggerPoolHelper )
            JobCompleteHelper.getInstance().start();
    
            // admin log report start
            JobLogReportHelper.getInstance().start();
    
            // 启动任务调度
            JobScheduleHelper.getInstance().start();
    
            logger.info(">>>>>>>>> init xxl-job admin success.");
        }

    在最下一行,启动了任务调度com.xxl.job.admin.core.thread.JobScheduleHelper#start,该方法分了两部分:scheduleThread和ringThread

    第一部分:调度线程scheduleThread

    scheduleThread = new Thread(new Runnable() {
                @Override
                public void run() {
    
                    try {
                        TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
                    } catch (InterruptedException e) {
                        if (!scheduleThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                    logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
    
                    // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
                    int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
    
                    while (!scheduleThreadToStop) {
                        // Scan Job 
                        long start = System.currentTimeMillis();
                        
                        Connection conn = null;
                        Boolean connAutoCommit = null;
                        PreparedStatement preparedStatement = null;
    
                        boolean preReadSuc = true;
                        try {
    
                            conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                            connAutoCommit = conn.getAutoCommit();
                            conn.setAutoCommit(false);
    
                            preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                            preparedStatement.execute();
    
                            // tx start
    
                            // 1、pre read
                            long nowTime = System.currentTimeMillis();
                            //<1>:扫描任务
                            List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                            if (scheduleList!=null && scheduleList.size()>0) {
                                // 2、push time-ring
                                for (XxlJobInfo jobInfo: scheduleList) {
    
                                    // <2>: 调度时间过了5s
                                    if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                                        // 2.1、trigger-expire > 5s:pass && make next-trigger-time
                                        logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
    
                                        // 1、misfire match
                                        //过期策略
                                        MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
                                        if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
                                            // FIRE_ONCE_NOW 》 trigger
                                            JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
                                            logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
                                        }
    
                                        // 2、fresh next
                                        refreshNextValidTime(jobInfo, new Date());
                                    //<3>: 调度时间在5s内
                                    } else if (nowTime > jobInfo.getTriggerNextTime()) {
                                        // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
    
                                        // 1、trigger
                                        JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
                                        logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
    
                                        // 2、fresh next
                                        refreshNextValidTime(jobInfo, new Date());
    
                                        // next-trigger-time in 5s, pre-read again
                                        if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
    
                                            // 1、make ring second
                                            int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
    
                                            // 2、push time ring
                                            // 添加到时间刻度轮
                                            pushTimeRing(ringSecond, jobInfo.getId());
    
                                            // 3、fresh next
                                            refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
    
                                        }
    
                                    } else {  // <4>: 还差点时间才能执行
                                        // 1、make ring second
                                        int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
    
                                        // 2、push time ring
                                        pushTimeRing(ringSecond, jobInfo.getId());
    
                                        // 3、fresh next
                                        refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
    
                                    }
    
                                }
    
                                // 3、update trigger info
                                for (XxlJobInfo jobInfo: scheduleList) {
                                    XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                                }
    
                            } else {
                                preReadSuc = false;
                            }
    
                            // tx stop
                        } catch (Exception e) {
                           
                        } finally {    
                        }
                        long cost = System.currentTimeMillis()-start;
                        // <5>:控制速度
                        if (cost < 1000) { 
    TimeUnit.MILLISECONDS.sleep(); } } } }); scheduleThread.setDaemon(
    true); scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread"); scheduleThread.start();

    <1>:获取5s内要执行的任务配置

    <2>:任务超过了当前执行时间5s(前面耽误超时了):

            按照配置的过期策略(立刻执行一次 or 丢弃)来判断是否继续执行任务,并计算好下一次执行时间更新到数据库

    public static Date generateNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception {
            ScheduleTypeEnum scheduleTypeEnum = ScheduleTypeEnum.match(jobInfo.getScheduleType(), null);
            if (ScheduleTypeEnum.CRON == scheduleTypeEnum) {
                Date nextValidTime = new CronExpression(jobInfo.getScheduleConf()).getNextValidTimeAfter(fromTime);
                return nextValidTime;
            } else if (ScheduleTypeEnum.FIX_RATE == scheduleTypeEnum /*|| ScheduleTypeEnum.FIX_DELAY == scheduleTypeEnum*/) {
                return new Date(fromTime.getTime() + Integer.valueOf(jobInfo.getScheduleConf())*1000 );
            }
            return null;
        }

     解析corn表达式

    <3>:任务调度时间在当前时间5s内:先执行一次,刷新下次执行时间。如果下次执行时间还在5s内,则加到上

     // 1、make ring second
      int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
     // 2、push time ring
     pushTimeRing(ringSecond, jobInfo.getId());

    时间刻度轮根据下次执行时间,按时钟满60个刻度 ,取任务id放到  volatile修饰的Map<Integer, List<Integer>>中,供ringThread线程来调度。

    private void pushTimeRing(int ringSecond, int jobId){
            // push async ring
            List<Integer> ringItemData = ringData.get(ringSecond);
            if (ringItemData == null) {
                ringItemData = new ArrayList<Integer>();
                ringData.put(ringSecond, ringItemData);
            }
            ringItemData.add(jobId);
    
        }

    <4>:还没到任务执行时间,也给塞到时间刻度轮上

    <5>:控制调度速度

    第二部分:扫描ringData刻度环上的数据  ringThread

    ringThread = new Thread(new Runnable() {
                @Override
                public void run() {
    
                    while (!ringThreadToStop) {
    
                        // 1:align second
                        try {
                            TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
                        } catch (InterruptedException e) {
                            if (!ringThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
    
                        try {
                            // 2:second data
                            List<Integer> ringItemData = new ArrayList<>();
                            int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                            for (int i = 0; i < 2; i++) {
                                List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                                if (tmpData != null) {
                                    ringItemData.addAll(tmpData);
                                }
                            }
      
                            if (ringItemData.size() > 0) {
                                // do trigger
                                for (int jobId: ringItemData) {
                                    // do trigger
                                    JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                                }
                                // clear
                                ringItemData.clear();
                            }
                        } catch (Exception e) {                    
                    }
                }
            });
            ringThread.setDaemon(true);
            ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
            ringThread.start();
        }

    该线程扫描调度线程扫描调度线上扔到ringData上的数据

    1:一秒内扫描一次

    2:按照时钟刻度取数,并取上一刻度的任务,避免因卡壳导致的取不到数据

  • 相关阅读:
    sphinx 源码阅读之分词,压缩索引,倒排——单词对应的文档ID列表本质和lucene无异 也是外部排序再压缩 解压的时候需要全部扫描doc_ids列表偏移量相加获得最终的文档ID
    详细说明XML分解(两)—DOM4J
    JSP简单的练习-用户登记表
    设计师给了px显着的单位,Android要设置多少开发商dip、dp、sp?
    左右xcode的重构选项的一些理解
    unicode下一个,读取数据库乱码问题
    java中间==、equals和hashCode差额
    MIPS台OpenWrt在系统内的路由器Rust应用程序开发
    Android采取async框架文件上传
    ios-上拉电阻负载许多其他接口
  • 原文地址:https://www.cnblogs.com/at20191018/p/15787769.html
Copyright © 2011-2022 走看看