zoukankan      html  css  js  c++  java
  • xxljob源码(三)服务端源码

      简单了解下xxl-job-admin 任务调度中心原理。

    1. 服务启动

      服务启动之后之后,查看开启的线程如下:

       可以看到启动的后台线程,接下来查看启动的后台线程所做的事情。

    2. 线程开启

    1. 服务启动配置类

    com.xxl.job.admin.core.conf.XxlJobAdminConfig

    package com.xxl.job.admin.core.conf;
    
    import com.xxl.job.admin.core.alarm.JobAlarmer;
    import com.xxl.job.admin.core.scheduler.XxlJobScheduler;
    import com.xxl.job.admin.dao.*;
    import org.springframework.beans.factory.DisposableBean;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.mail.javamail.JavaMailSender;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    import javax.sql.DataSource;
    import java.util.Arrays;
    
    /**
     * xxl-job config
     *
     * @author xuxueli 2017-04-28
     */
    
    @Component
    public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
    
        private static XxlJobAdminConfig adminConfig = null;
        public static XxlJobAdminConfig getAdminConfig() {
            return adminConfig;
        }
    
    
        // ---------------------- XxlJobScheduler ----------------------
    
        private XxlJobScheduler xxlJobScheduler;
    
        @Override
        public void afterPropertiesSet() throws Exception {
            adminConfig = this;
    
            xxlJobScheduler = new XxlJobScheduler();
            xxlJobScheduler.init();
        }
    
        @Override
        public void destroy() throws Exception {
            xxlJobScheduler.destroy();
        }
    
    
        // ---------------------- XxlJobScheduler ----------------------
    
        // conf
        @Value("${xxl.job.i18n}")
        private String i18n;
    
        @Value("${xxl.job.accessToken}")
        private String accessToken;
    
        @Value("${spring.mail.from}")
        private String emailFrom;
    
        @Value("${xxl.job.triggerpool.fast.max}")
        private int triggerPoolFastMax;
    
        @Value("${xxl.job.triggerpool.slow.max}")
        private int triggerPoolSlowMax;
    
        @Value("${xxl.job.logretentiondays}")
        private int logretentiondays;
    
        // dao, service
    
        @Resource
        private XxlJobLogDao xxlJobLogDao;
        @Resource
        private XxlJobInfoDao xxlJobInfoDao;
        @Resource
        private XxlJobRegistryDao xxlJobRegistryDao;
        @Resource
        private XxlJobGroupDao xxlJobGroupDao;
        @Resource
        private XxlJobLogReportDao xxlJobLogReportDao;
        @Resource
        private JavaMailSender mailSender;
        @Resource
        private DataSource dataSource;
        @Resource
        private JobAlarmer jobAlarmer;
    
    
        public String getI18n() {
            if (!Arrays.asList("zh_CN", "zh_TC", "en").contains(i18n)) {
                return "zh_CN";
            }
            return i18n;
        }
    
        public String getAccessToken() {
            return accessToken;
        }
    
        public String getEmailFrom() {
            return emailFrom;
        }
    
        public int getTriggerPoolFastMax() {
            if (triggerPoolFastMax < 200) {
                return 200;
            }
            return triggerPoolFastMax;
        }
    
        public int getTriggerPoolSlowMax() {
            if (triggerPoolSlowMax < 100) {
                return 100;
            }
            return triggerPoolSlowMax;
        }
    
        public int getLogretentiondays() {
            if (logretentiondays < 7) {
                return -1;  // Limit greater than or equal to 7, otherwise close
            }
            return logretentiondays;
        }
    
        public XxlJobLogDao getXxlJobLogDao() {
            return xxlJobLogDao;
        }
    
        public XxlJobInfoDao getXxlJobInfoDao() {
            return xxlJobInfoDao;
        }
    
        public XxlJobRegistryDao getXxlJobRegistryDao() {
            return xxlJobRegistryDao;
        }
    
        public XxlJobGroupDao getXxlJobGroupDao() {
            return xxlJobGroupDao;
        }
    
        public XxlJobLogReportDao getXxlJobLogReportDao() {
            return xxlJobLogReportDao;
        }
    
        public JavaMailSender getMailSender() {
            return mailSender;
        }
    
        public DataSource getDataSource() {
            return dataSource;
        }
    
        public JobAlarmer getJobAlarmer() {
            return jobAlarmer;
        }
    
    }
    View Code

    如下配置类可以看到,请求转发到了:com.xxl.job.admin.core.scheduler.XxlJobScheduler

    package com.xxl.job.admin.core.scheduler;
    
    import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
    import com.xxl.job.admin.core.thread.*;
    import com.xxl.job.admin.core.util.I18nUtil;
    import com.xxl.job.core.biz.ExecutorBiz;
    import com.xxl.job.core.biz.client.ExecutorBizClient;
    import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentMap;
    
    /**
     * @author xuxueli 2018-10-28 00:18:17
     */
    
    public class XxlJobScheduler  {
        private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class);
    
    
        public void init() throws Exception {
            // init i18n
            initI18n();
    
            // admin trigger pool start
            JobTriggerPoolHelper.toStart();
    
            // admin registry monitor run
            JobRegistryHelper.getInstance().start();
    
            // admin fail-monitor run
            JobFailMonitorHelper.getInstance().start();
    
            // admin lose-monitor run ( depend on JobTriggerPoolHelper )
            JobCompleteHelper.getInstance().start();
    
            // admin log report start
            JobLogReportHelper.getInstance().start();
    
            // start-schedule  ( depend on JobTriggerPoolHelper )
            JobScheduleHelper.getInstance().start();
    
            logger.info(">>>>>>>>> init xxl-job admin success.");
        }
    
        
        public void destroy() throws Exception {
    
            // stop-schedule
            JobScheduleHelper.getInstance().toStop();
    
            // admin log report stop
            JobLogReportHelper.getInstance().toStop();
    
            // admin lose-monitor stop
            JobCompleteHelper.getInstance().toStop();
    
            // admin fail-monitor stop
            JobFailMonitorHelper.getInstance().toStop();
    
            // admin registry stop
            JobRegistryHelper.getInstance().toStop();
    
            // admin trigger pool stop
            JobTriggerPoolHelper.toStop();
    
        }
    
        // ---------------------- I18n ----------------------
    
        private void initI18n(){
            for (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum.values()) {
                item.setTitle(I18nUtil.getString("jobconf_block_".concat(item.name())));
            }
        }
    
        // ---------------------- executor-client ----------------------
        private static ConcurrentMap<String, ExecutorBiz> executorBizRepository = new ConcurrentHashMap<String, ExecutorBiz>();
        public static ExecutorBiz getExecutorBiz(String address) throws Exception {
            // valid
            if (address==null || address.trim().length()==0) {
                return null;
            }
    
            // load-cache
            address = address.trim();
            ExecutorBiz executorBiz = executorBizRepository.get(address);
            if (executorBiz != null) {
                return executorBiz;
            }
    
            // set-cache
            executorBiz = new ExecutorBizClient(address, XxlJobAdminConfig.getAdminConfig().getAccessToken());
    
            executorBizRepository.put(address, executorBiz);
            return executorBiz;
        }
    
    }
    View Code

      从init 方法可以看到启动了一系列的线程。下面分析几个线程的作用。

    2. com.xxl.job.admin.core.thread.JobTriggerPoolHelper 

    package com.xxl.job.admin.core.thread;
    
    import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
    import com.xxl.job.admin.core.trigger.TriggerTypeEnum;
    import com.xxl.job.admin.core.trigger.XxlJobTrigger;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * job trigger thread pool helper
     *
     * @author xuxueli 2018-07-03 21:08:07
     */
    public class JobTriggerPoolHelper {
        private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class);
    
    
        // ---------------------- trigger pool ----------------------
    
        // fast/slow thread pool
        private ThreadPoolExecutor fastTriggerPool = null;
        private ThreadPoolExecutor slowTriggerPool = null;
    
        public void start(){
            fastTriggerPool = new ThreadPoolExecutor(
                    10,
                    XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
                    60L,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(1000),
                    new ThreadFactory() {
                        @Override
                        public Thread newThread(Runnable r) {
                            return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
                        }
                    });
    
            slowTriggerPool = new ThreadPoolExecutor(
                    10,
                    XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
                    60L,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(2000),
                    new ThreadFactory() {
                        @Override
                        public Thread newThread(Runnable r) {
                            return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
                        }
                    });
        }
    
    
        public void stop() {
            //triggerPool.shutdown();
            fastTriggerPool.shutdownNow();
            slowTriggerPool.shutdownNow();
            logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success.");
        }
    
    
        // job timeout count
        private volatile long minTim = System.currentTimeMillis()/60000;     // ms > min
        private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();
    
    
        /**
         * add trigger
         */
        public void addTrigger(final int jobId,
                               final TriggerTypeEnum triggerType,
                               final int failRetryCount,
                               final String executorShardingParam,
                               final String executorParam,
                               final String addressList) {
    
            // choose thread pool
            ThreadPoolExecutor triggerPool_ = fastTriggerPool;
            AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
            if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
                triggerPool_ = slowTriggerPool;
            }
    
            // trigger
            triggerPool_.execute(new Runnable() {
                @Override
                public void run() {
    
                    long start = System.currentTimeMillis();
    
                    try {
                        // do trigger
                        XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    } finally {
    
                        // check timeout-count-map
                        long minTim_now = System.currentTimeMillis()/60000;
                        if (minTim != minTim_now) {
                            minTim = minTim_now;
                            jobTimeoutCountMap.clear();
                        }
    
                        // incr timeout-count-map
                        long cost = System.currentTimeMillis()-start;
                        if (cost > 500) {       // ob-timeout threshold 500ms
                            AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                            if (timeoutCount != null) {
                                timeoutCount.incrementAndGet();
                            }
                        }
    
                    }
    
                }
            });
        }
    
    
    
        // ---------------------- helper ----------------------
    
        private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();
    
        public static void toStart() {
            helper.start();
        }
        public static void toStop() {
            helper.stop();
        }
    
        /**
         * @param jobId
         * @param triggerType
         * @param failRetryCount
         *             >=0: use this param
         *             <0: use param from job info config
         * @param executorShardingParam
         * @param executorParam
         *          null: use job param
         *          not null: cover job param
         */
        public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
            helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
        }
    
    }
    View Code

      这个线程的核心作用是触发定时任务的线程。这里快慢线程池,用于执行不同频率的线程。

    3. com.xxl.job.admin.core.thread.JobRegistryHelper

    注册线程源码如下:

    package com.xxl.job.admin.core.thread;
    
    import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
    import com.xxl.job.admin.core.model.XxlJobGroup;
    import com.xxl.job.admin.core.model.XxlJobRegistry;
    import com.xxl.job.core.biz.model.RegistryParam;
    import com.xxl.job.core.biz.model.ReturnT;
    import com.xxl.job.core.enums.RegistryConfig;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.util.StringUtils;
    
    import java.util.*;
    import java.util.concurrent.*;
    
    /**
     * job registry instance
     * @author xuxueli 2016-10-02 19:10:24
     */
    public class JobRegistryHelper {
        private static Logger logger = LoggerFactory.getLogger(JobRegistryHelper.class);
    
        private static JobRegistryHelper instance = new JobRegistryHelper();
        public static JobRegistryHelper getInstance(){
            return instance;
        }
    
        private ThreadPoolExecutor registryOrRemoveThreadPool = null;
        private Thread registryMonitorThread;
        private volatile boolean toStop = false;
    
        public void start(){
    
            // for registry or remove
            registryOrRemoveThreadPool = new ThreadPoolExecutor(
                    2,
                    10,
                    30L,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(2000),
                    new ThreadFactory() {
                        @Override
                        public Thread newThread(Runnable r) {
                            return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());
                        }
                    },
                    new RejectedExecutionHandler() {
                        @Override
                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                            r.run();
                            logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");
                        }
                    });
    
            // for monitor
            registryMonitorThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (!toStop) {
                        try {
                            // auto registry group
                            List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
                            if (groupList!=null && !groupList.isEmpty()) {
    
                                // remove dead address (admin/executor)
                                List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
                                if (ids!=null && ids.size()>0) {
                                    XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
                                }
    
                                // fresh online address (admin/executor)
                                HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
                                List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
                                if (list != null) {
                                    for (XxlJobRegistry item: list) {
                                        if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
                                            String appname = item.getRegistryKey();
                                            List<String> registryList = appAddressMap.get(appname);
                                            if (registryList == null) {
                                                registryList = new ArrayList<String>();
                                            }
    
                                            if (!registryList.contains(item.getRegistryValue())) {
                                                registryList.add(item.getRegistryValue());
                                            }
                                            appAddressMap.put(appname, registryList);
                                        }
                                    }
                                }
    
                                // fresh group address
                                for (XxlJobGroup group: groupList) {
                                    List<String> registryList = appAddressMap.get(group.getAppname());
                                    String addressListStr = null;
                                    if (registryList!=null && !registryList.isEmpty()) {
                                        Collections.sort(registryList);
                                        StringBuilder addressListSB = new StringBuilder();
                                        for (String item:registryList) {
                                            addressListSB.append(item).append(",");
                                        }
                                        addressListStr = addressListSB.toString();
                                        addressListStr = addressListStr.substring(0, addressListStr.length()-1);
                                    }
                                    group.setAddressList(addressListStr);
                                    group.setUpdateTime(new Date());
    
                                    XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
                                }
                            }
                        } catch (Exception e) {
                            if (!toStop) {
                                logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
                            }
                        }
                        try {
                            TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                        } catch (InterruptedException e) {
                            if (!toStop) {
                                logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
                            }
                        }
                    }
                    logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
                }
            });
            registryMonitorThread.setDaemon(true);
            registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
            registryMonitorThread.start();
        }
    
        public void toStop(){
            toStop = true;
    
            // stop registryOrRemoveThreadPool
            registryOrRemoveThreadPool.shutdownNow();
    
            // stop monitir (interrupt and wait)
            registryMonitorThread.interrupt();
            try {
                registryMonitorThread.join();
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            }
        }
    
    
        // ---------------------- helper ----------------------
    
        public ReturnT<String> registry(RegistryParam registryParam) {
    
            // valid
            if (!StringUtils.hasText(registryParam.getRegistryGroup())
                    || !StringUtils.hasText(registryParam.getRegistryKey())
                    || !StringUtils.hasText(registryParam.getRegistryValue())) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
            }
    
            // async execute
            registryOrRemoveThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
                    if (ret < 1) {
                        XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
    
                        // fresh
                        freshGroupRegistryInfo(registryParam);
                    }
                }
            });
    
            return ReturnT.SUCCESS;
        }
    
        public ReturnT<String> registryRemove(RegistryParam registryParam) {
    
            // valid
            if (!StringUtils.hasText(registryParam.getRegistryGroup())
                    || !StringUtils.hasText(registryParam.getRegistryKey())
                    || !StringUtils.hasText(registryParam.getRegistryValue())) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
            }
    
            // async execute
            registryOrRemoveThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryDelete(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
                    if (ret > 0) {
                        // fresh
                        freshGroupRegistryInfo(registryParam);
                    }
                }
            });
    
            return ReturnT.SUCCESS;
        }
    
        private void freshGroupRegistryInfo(RegistryParam registryParam){
            // Under consideration, prevent affecting core tables
        }
    
    
    }
    View Code

    这个类开启了一个线程池registryOrRemoveThreadPool , 一个定时任务 registryMonitorThread 。

    (1) registryOrRemoveThreadPool  线程池主要用于处理执行客户端的注册和删除任务。对于服务端生效的主要是com.xxl.job.admin.core.thread.JobRegistryHelper#registryRemove 方法,也就是执行com.xxl.job.admin.dao.XxlJobRegistryDao#registryDelete 删除方法。其SQL如下:

        <delete id="registryDelete" >
            DELETE FROM xxl_job_registry
            WHERE registry_group = #{registryGroup}
                AND registry_key = #{registryKey}
                AND registry_value = #{registryValue}
        </delete>

    (2) registryMonitorThread 相当于个定时任务, 以30 s 为周期。 每次获取过去90 s 没有收到心跳的注册信息, 然后删掉之后更新XxlJobGroup 的地址信息。

    1》com.xxl.job.admin.dao.XxlJobRegistryDao#findDead 找90 s 没有发送心跳的注册信息

        <select id="findDead" parameterType="java.util.HashMap" resultType="java.lang.Integer" >
            SELECT t.id
            FROM xxl_job_registry AS t
            WHERE t.update_time <![CDATA[ < ]]> DATE_ADD(#{nowTime},INTERVAL -#{timeout} SECOND)
        </select>

    2》 com.xxl.job.admin.dao.XxlJobRegistryDao#removeDead 删除过期注册信息

        <delete id="removeDead" parameterType="java.lang.Integer" >
            DELETE FROM xxl_job_registry
            WHERE id in
            <foreach collection="ids" item="item" open="(" close=")" separator="," >
                #{item}
            </foreach>
        </delete>

    3》 com.xxl.job.admin.dao.XxlJobGroupDao#update 更新任务组的地址信息,这个就不贴sql 了

        public int update(XxlJobGroup xxlJobGroup);

    4. com.xxl.job.admin.core.thread.JobFailMonitorHelper 处理失败任务

    源码如下:

    package com.xxl.job.admin.core.thread;
    
    import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
    import com.xxl.job.admin.core.model.XxlJobInfo;
    import com.xxl.job.admin.core.model.XxlJobLog;
    import com.xxl.job.admin.core.trigger.TriggerTypeEnum;
    import com.xxl.job.admin.core.util.I18nUtil;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    
    /**
     * job monitor instance
     *
     * @author xuxueli 2015-9-1 18:05:56
     */
    public class JobFailMonitorHelper {
        private static Logger logger = LoggerFactory.getLogger(JobFailMonitorHelper.class);
        
        private static JobFailMonitorHelper instance = new JobFailMonitorHelper();
        public static JobFailMonitorHelper getInstance(){
            return instance;
        }
    
        // ---------------------- monitor ----------------------
    
        private Thread monitorThread;
        private volatile boolean toStop = false;
        public void start(){
            monitorThread = new Thread(new Runnable() {
    
                @Override
                public void run() {
    
                    // monitor
                    while (!toStop) {
                        try {
    
                            List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
                            if (failLogIds!=null && !failLogIds.isEmpty()) {
                                for (long failLogId: failLogIds) {
    
                                    // lock log
                                    int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
                                    if (lockRet < 1) {
                                        continue;
                                    }
                                    XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
                                    XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
    
                                    // 1、fail retry monitor
                                    if (log.getExecutorFailRetryCount() > 0) {
                                        JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
                                        String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
                                        log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
                                        XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
                                    }
    
                                    // 2、fail alarm monitor
                                    int newAlarmStatus = 0;        // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
                                    if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
                                        boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
                                        newAlarmStatus = alarmResult?2:3;
                                    } else {
                                        newAlarmStatus = 1;
                                    }
    
                                    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
                                }
                            }
    
                        } catch (Exception e) {
                            if (!toStop) {
                                logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
                            }
                        }
    
                        try {
                            TimeUnit.SECONDS.sleep(10);
                        } catch (Exception e) {
                            if (!toStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
    
                    }
    
                    logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");
    
                }
            });
            monitorThread.setDaemon(true);
            monitorThread.setName("xxl-job, admin JobFailMonitorHelper");
            monitorThread.start();
        }
    
        public void toStop(){
            toStop = true;
            // interrupt and wait
            monitorThread.interrupt();
            try {
                monitorThread.join();
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            }
        }
    
    }
    View Code

      可以看出也是以10 s 为周期执行定时任务。

    1》 com.xxl.job.admin.dao.XxlJobLogDao#findFailJobLogIds 查找失败日志的ID

    2》 遍历上面的ID集合, 执行com.xxl.job.admin.dao.XxlJobLogDao#updateAlarmStatus 修改alarm 状态为 -1, 如果修改行数大于1 就执行下面, 否则跳过该ID

    3》 com.xxl.job.admin.dao.XxlJobLogDao#load 找失败的日志

    4》 com.xxl.job.admin.dao.XxlJobInfoDao#loadById 找到XxlJobInfo 任务信息

    5》 com.xxl.job.admin.core.thread.JobTriggerPoolHelper#trigger 尝试再次执行

    6》 com.xxl.job.admin.core.alarm.JobAlarm#doAlarm 发送失败日志, 配置了 email  就发送日志

    7》 com.xxl.job.admin.dao.XxlJobLogDao#updateAlarmStatus 更新alarm 状态

    5. com.xxl.job.admin.core.thread.JobCompleteHelper 

    package com.xxl.job.admin.core.thread;
    
    import com.xxl.job.admin.core.complete.XxlJobCompleter;
    import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
    import com.xxl.job.admin.core.model.XxlJobLog;
    import com.xxl.job.admin.core.util.I18nUtil;
    import com.xxl.job.core.biz.model.HandleCallbackParam;
    import com.xxl.job.core.biz.model.ReturnT;
    import com.xxl.job.core.util.DateUtil;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Date;
    import java.util.List;
    import java.util.concurrent.*;
    
    /**
     * job lose-monitor instance
     *
     * @author xuxueli 2015-9-1 18:05:56
     */
    public class JobCompleteHelper {
        private static Logger logger = LoggerFactory.getLogger(JobCompleteHelper.class);
        
        private static JobCompleteHelper instance = new JobCompleteHelper();
        public static JobCompleteHelper getInstance(){
            return instance;
        }
    
        // ---------------------- monitor ----------------------
    
        private ThreadPoolExecutor callbackThreadPool = null;
        private Thread monitorThread;
        private volatile boolean toStop = false;
        public void start(){
    
            // for callback
            callbackThreadPool = new ThreadPoolExecutor(
                    2,
                    20,
                    30L,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(3000),
                    new ThreadFactory() {
                        @Override
                        public Thread newThread(Runnable r) {
                            return new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" + r.hashCode());
                        }
                    },
                    new RejectedExecutionHandler() {
                        @Override
                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                            r.run();
                            logger.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now).");
                        }
                    });
    
    
            // for monitor
            monitorThread = new Thread(new Runnable() {
    
                @Override
                public void run() {
    
                    // wait for JobTriggerPoolHelper-init
                    try {
                        TimeUnit.MILLISECONDS.sleep(50);
                    } catch (InterruptedException e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
    
                    // monitor
                    while (!toStop) {
                        try {
                            // 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
                            Date losedTime = DateUtil.addMinutes(new Date(), -10);
                            List<Long> losedJobIds  = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);
    
                            if (losedJobIds!=null && losedJobIds.size()>0) {
                                for (Long logId: losedJobIds) {
    
                                    XxlJobLog jobLog = new XxlJobLog();
                                    jobLog.setId(logId);
    
                                    jobLog.setHandleTime(new Date());
                                    jobLog.setHandleCode(ReturnT.FAIL_CODE);
                                    jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );
    
                                    XxlJobCompleter.updateHandleInfoAndFinish(jobLog);
                                }
    
                            }
                        } catch (Exception e) {
                            if (!toStop) {
                                logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
                            }
                        }
    
                        try {
                            TimeUnit.SECONDS.sleep(60);
                        } catch (Exception e) {
                            if (!toStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
    
                    }
    
                    logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop");
    
                }
            });
            monitorThread.setDaemon(true);
            monitorThread.setName("xxl-job, admin JobLosedMonitorHelper");
            monitorThread.start();
        }
    
        public void toStop(){
            toStop = true;
    
            // stop registryOrRemoveThreadPool
            callbackThreadPool.shutdownNow();
    
            // stop monitorThread (interrupt and wait)
            monitorThread.interrupt();
            try {
                monitorThread.join();
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            }
        }
    
    
        // ---------------------- helper ----------------------
    
        public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
    
            callbackThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    for (HandleCallbackParam handleCallbackParam: callbackParamList) {
                        ReturnT<String> callbackResult = callback(handleCallbackParam);
                        logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}",
                                (callbackResult.getCode()== ReturnT.SUCCESS_CODE?"success":"fail"), handleCallbackParam, callbackResult);
                    }
                }
            });
    
            return ReturnT.SUCCESS;
        }
    
        private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) {
            // valid log item
            XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(handleCallbackParam.getLogId());
            if (log == null) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "log item not found.");
            }
            if (log.getHandleCode() > 0) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "log repeate callback.");     // avoid repeat callback, trigger child job etc
            }
    
            // handle msg
            StringBuffer handleMsg = new StringBuffer();
            if (log.getHandleMsg()!=null) {
                handleMsg.append(log.getHandleMsg()).append("<br>");
            }
            if (handleCallbackParam.getHandleMsg() != null) {
                handleMsg.append(handleCallbackParam.getHandleMsg());
            }
    
            // success, save log
            log.setHandleTime(new Date());
            log.setHandleCode(handleCallbackParam.getHandleCode());
            log.setHandleMsg(handleMsg.toString());
            XxlJobCompleter.updateHandleInfoAndFinish(log);
    
            return ReturnT.SUCCESS;
        }
    
    
    
    }
    View Code

      callbackThreadPool 线程池执行callbackThreadPool 回调, 用于记录日志、触发子任务等操作。

      monitorThread 线程池任务结果丢失处理, 相当于不执行任务,直接结束。

    6. com.xxl.job.admin.core.thread.JobLogReportHelper

    package com.xxl.job.admin.core.thread;
    
    import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
    import com.xxl.job.admin.core.model.XxlJobLogReport;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Calendar;
    import java.util.Date;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.TimeUnit;
    
    /**
     * job log report helper
     *
     * @author xuxueli 2019-11-22
     */
    public class JobLogReportHelper {
        private static Logger logger = LoggerFactory.getLogger(JobLogReportHelper.class);
    
        private static JobLogReportHelper instance = new JobLogReportHelper();
        public static JobLogReportHelper getInstance(){
            return instance;
        }
    
    
        private Thread logrThread;
        private volatile boolean toStop = false;
        public void start(){
            logrThread = new Thread(new Runnable() {
    
                @Override
                public void run() {
    
                    // last clean log time
                    long lastCleanLogTime = 0;
    
    
                    while (!toStop) {
    
                        // 1、log-report refresh: refresh log report in 3 days
                        try {
    
                            for (int i = 0; i < 3; i++) {
    
                                // today
                                Calendar itemDay = Calendar.getInstance();
                                itemDay.add(Calendar.DAY_OF_MONTH, -i);
                                itemDay.set(Calendar.HOUR_OF_DAY, 0);
                                itemDay.set(Calendar.MINUTE, 0);
                                itemDay.set(Calendar.SECOND, 0);
                                itemDay.set(Calendar.MILLISECOND, 0);
    
                                Date todayFrom = itemDay.getTime();
    
                                itemDay.set(Calendar.HOUR_OF_DAY, 23);
                                itemDay.set(Calendar.MINUTE, 59);
                                itemDay.set(Calendar.SECOND, 59);
                                itemDay.set(Calendar.MILLISECOND, 999);
    
                                Date todayTo = itemDay.getTime();
    
                                // refresh log-report every minute
                                XxlJobLogReport xxlJobLogReport = new XxlJobLogReport();
                                xxlJobLogReport.setTriggerDay(todayFrom);
                                xxlJobLogReport.setRunningCount(0);
                                xxlJobLogReport.setSucCount(0);
                                xxlJobLogReport.setFailCount(0);
    
                                Map<String, Object> triggerCountMap = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLogReport(todayFrom, todayTo);
                                if (triggerCountMap!=null && triggerCountMap.size()>0) {
                                    int triggerDayCount = triggerCountMap.containsKey("triggerDayCount")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCount"))):0;
                                    int triggerDayCountRunning = triggerCountMap.containsKey("triggerDayCountRunning")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountRunning"))):0;
                                    int triggerDayCountSuc = triggerCountMap.containsKey("triggerDayCountSuc")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountSuc"))):0;
                                    int triggerDayCountFail = triggerDayCount - triggerDayCountRunning - triggerDayCountSuc;
    
                                    xxlJobLogReport.setRunningCount(triggerDayCountRunning);
                                    xxlJobLogReport.setSucCount(triggerDayCountSuc);
                                    xxlJobLogReport.setFailCount(triggerDayCountFail);
                                }
    
                                // do refresh
                                int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);
                                if (ret < 1) {
                                    XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport);
                                }
                            }
    
                        } catch (Exception e) {
                            if (!toStop) {
                                logger.error(">>>>>>>>>>> xxl-job, job log report thread error:{}", e);
                            }
                        }
    
                        // 2、log-clean: switch open & once each day
                        if (XxlJobAdminConfig.getAdminConfig().getLogretentiondays()>0
                                && System.currentTimeMillis() - lastCleanLogTime > 24*60*60*1000) {
    
                            // expire-time
                            Calendar expiredDay = Calendar.getInstance();
                            expiredDay.add(Calendar.DAY_OF_MONTH, -1 * XxlJobAdminConfig.getAdminConfig().getLogretentiondays());
                            expiredDay.set(Calendar.HOUR_OF_DAY, 0);
                            expiredDay.set(Calendar.MINUTE, 0);
                            expiredDay.set(Calendar.SECOND, 0);
                            expiredDay.set(Calendar.MILLISECOND, 0);
                            Date clearBeforeTime = expiredDay.getTime();
    
                            // clean expired log
                            List<Long> logIds = null;
                            do {
                                logIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findClearLogIds(0, 0, clearBeforeTime, 0, 1000);
                                if (logIds!=null && logIds.size()>0) {
                                    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().clearLog(logIds);
                                }
                            } while (logIds!=null && logIds.size()>0);
    
                            // update clean time
                            lastCleanLogTime = System.currentTimeMillis();
                        }
    
                        try {
                            TimeUnit.MINUTES.sleep(1);
                        } catch (Exception e) {
                            if (!toStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
    
                    }
    
                    logger.info(">>>>>>>>>>> xxl-job, job log report thread stop");
    
                }
            });
            logrThread.setDaemon(true);
            logrThread.setName("xxl-job, admin JobLogReportHelper");
            logrThread.start();
        }
    
        public void toStop(){
            toStop = true;
            // interrupt and wait
            logrThread.interrupt();
            try {
                logrThread.join();
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            }
        }
    
    }
    View Code

    com.xxl.job.admin.core.thread.JobLogReportHelper#logrThread 线程用于生成调度报告,然后清除过期日志。

    7. com.xxl.job.admin.core.thread.JobScheduleHelper

    package com.xxl.job.admin.core.thread;
    
    import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
    import com.xxl.job.admin.core.cron.CronExpression;
    import com.xxl.job.admin.core.model.XxlJobInfo;
    import com.xxl.job.admin.core.scheduler.MisfireStrategyEnum;
    import com.xxl.job.admin.core.scheduler.ScheduleTypeEnum;
    import com.xxl.job.admin.core.trigger.TriggerTypeEnum;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.sql.Connection;
    import java.sql.PreparedStatement;
    import java.sql.SQLException;
    import java.util.*;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author xuxueli 2019-05-21
     */
    public class JobScheduleHelper {
        private static Logger logger = LoggerFactory.getLogger(JobScheduleHelper.class);
    
        private static JobScheduleHelper instance = new JobScheduleHelper();
        public static JobScheduleHelper getInstance(){
            return instance;
        }
    
        public static final long PRE_READ_MS = 5000;    // pre read
    
        private Thread scheduleThread;
        private Thread ringThread;
        private volatile boolean scheduleThreadToStop = false;
        private volatile boolean ringThreadToStop = false;
        private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();
    
        public void start(){
    
            // schedule thread
            scheduleThread = new Thread(new Runnable() {
                @Override
                public void run() {
    
                    try {
                        TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
                    } catch (InterruptedException e) {
                        if (!scheduleThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                    logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
    
                    // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
                    int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
    
                    while (!scheduleThreadToStop) {
    
                        // Scan Job
                        long start = System.currentTimeMillis();
    
                        Connection conn = null;
                        Boolean connAutoCommit = null;
                        PreparedStatement preparedStatement = null;
    
                        boolean preReadSuc = true;
                        try {
    
                            conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                            connAutoCommit = conn.getAutoCommit();
                            conn.setAutoCommit(false);
    
                            preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                            preparedStatement.execute();
    
                            // tx start
    
                            // 1、pre read
                            long nowTime = System.currentTimeMillis();
                            List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                            if (scheduleList!=null && scheduleList.size()>0) {
                                // 2、push time-ring
                                for (XxlJobInfo jobInfo: scheduleList) {
    
                                    // time-ring jump
                                    if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                                        // 2.1、trigger-expire > 5s:pass && make next-trigger-time
                                        logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
    
                                        // 1、misfire match
                                        MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
                                        if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
                                            // FIRE_ONCE_NOW 》 trigger
                                            JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
                                            logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
                                        }
    
                                        // 2、fresh next
                                        refreshNextValidTime(jobInfo, new Date());
    
                                    } else if (nowTime > jobInfo.getTriggerNextTime()) {
                                        // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
    
                                        // 1、trigger
                                        JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
                                        logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
    
                                        // 2、fresh next
                                        refreshNextValidTime(jobInfo, new Date());
    
                                        // next-trigger-time in 5s, pre-read again
                                        if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
    
                                            // 1、make ring second
                                            int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
    
                                            // 2、push time ring
                                            pushTimeRing(ringSecond, jobInfo.getId());
    
                                            // 3、fresh next
                                            refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
    
                                        }
    
                                    } else {
                                        // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
    
                                        // 1、make ring second
                                        int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
    
                                        // 2、push time ring
                                        pushTimeRing(ringSecond, jobInfo.getId());
    
                                        // 3、fresh next
                                        refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
    
                                    }
    
                                }
    
                                // 3、update trigger info
                                for (XxlJobInfo jobInfo: scheduleList) {
                                    XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                                }
    
                            } else {
                                preReadSuc = false;
                            }
    
                            // tx stop
    
    
                        } catch (Exception e) {
                            if (!scheduleThreadToStop) {
                                logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
                            }
                        } finally {
    
                            // commit
                            if (conn != null) {
                                try {
                                    conn.commit();
                                } catch (SQLException e) {
                                    if (!scheduleThreadToStop) {
                                        logger.error(e.getMessage(), e);
                                    }
                                }
                                try {
                                    conn.setAutoCommit(connAutoCommit);
                                } catch (SQLException e) {
                                    if (!scheduleThreadToStop) {
                                        logger.error(e.getMessage(), e);
                                    }
                                }
                                try {
                                    conn.close();
                                } catch (SQLException e) {
                                    if (!scheduleThreadToStop) {
                                        logger.error(e.getMessage(), e);
                                    }
                                }
                            }
    
                            // close PreparedStatement
                            if (null != preparedStatement) {
                                try {
                                    preparedStatement.close();
                                } catch (SQLException e) {
                                    if (!scheduleThreadToStop) {
                                        logger.error(e.getMessage(), e);
                                    }
                                }
                            }
                        }
                        long cost = System.currentTimeMillis()-start;
    
    
                        // Wait seconds, align second
                        if (cost < 1000) {  // scan-overtime, not wait
                            try {
                                // pre-read period: success > scan each second; fail > skip this period;
                                TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
                            } catch (InterruptedException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                        }
    
                    }
    
                    logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
                }
            });
            scheduleThread.setDaemon(true);
            scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
            scheduleThread.start();
    
    
            // ring thread
            ringThread = new Thread(new Runnable() {
                @Override
                public void run() {
    
                    while (!ringThreadToStop) {
    
                        // align second
                        try {
                            TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
                        } catch (InterruptedException e) {
                            if (!ringThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
    
                        try {
                            // second data
                            List<Integer> ringItemData = new ArrayList<>();
                            int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                            for (int i = 0; i < 2; i++) {
                                List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                                if (tmpData != null) {
                                    ringItemData.addAll(tmpData);
                                }
                            }
    
                            // ring trigger
                            logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
                            if (ringItemData.size() > 0) {
                                // do trigger
                                for (int jobId: ringItemData) {
                                    // do trigger
                                    JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                                }
                                // clear
                                ringItemData.clear();
                            }
                        } catch (Exception e) {
                            if (!ringThreadToStop) {
                                logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
                            }
                        }
                    }
                    logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
                }
            });
            ringThread.setDaemon(true);
            ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
            ringThread.start();
        }
    
        private void refreshNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception {
            Date nextValidTime = generateNextValidTime(jobInfo, fromTime);
            if (nextValidTime != null) {
                jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
                jobInfo.setTriggerNextTime(nextValidTime.getTime());
            } else {
                jobInfo.setTriggerStatus(0);
                jobInfo.setTriggerLastTime(0);
                jobInfo.setTriggerNextTime(0);
                logger.warn(">>>>>>>>>>> xxl-job, refreshNextValidTime fail for job: jobId={}, scheduleType={}, scheduleConf={}",
                        jobInfo.getId(), jobInfo.getScheduleType(), jobInfo.getScheduleConf());
            }
        }
    
        private void pushTimeRing(int ringSecond, int jobId){
            // push async ring
            List<Integer> ringItemData = ringData.get(ringSecond);
            if (ringItemData == null) {
                ringItemData = new ArrayList<Integer>();
                ringData.put(ringSecond, ringItemData);
            }
            ringItemData.add(jobId);
    
            logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) );
        }
    
        public void toStop(){
    
            // 1、stop schedule
            scheduleThreadToStop = true;
            try {
                TimeUnit.SECONDS.sleep(1);  // wait
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            }
            if (scheduleThread.getState() != Thread.State.TERMINATED){
                // interrupt and wait
                scheduleThread.interrupt();
                try {
                    scheduleThread.join();
                } catch (InterruptedException e) {
                    logger.error(e.getMessage(), e);
                }
            }
    
            // if has ring data
            boolean hasRingData = false;
            if (!ringData.isEmpty()) {
                for (int second : ringData.keySet()) {
                    List<Integer> tmpData = ringData.get(second);
                    if (tmpData!=null && tmpData.size()>0) {
                        hasRingData = true;
                        break;
                    }
                }
            }
            if (hasRingData) {
                try {
                    TimeUnit.SECONDS.sleep(8);
                } catch (InterruptedException e) {
                    logger.error(e.getMessage(), e);
                }
            }
    
            // stop ring (wait job-in-memory stop)
            ringThreadToStop = true;
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            }
            if (ringThread.getState() != Thread.State.TERMINATED){
                // interrupt and wait
                ringThread.interrupt();
                try {
                    ringThread.join();
                } catch (InterruptedException e) {
                    logger.error(e.getMessage(), e);
                }
            }
    
            logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper stop");
        }
    
    
        // ---------------------- tools ----------------------
        public static Date generateNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception {
            ScheduleTypeEnum scheduleTypeEnum = ScheduleTypeEnum.match(jobInfo.getScheduleType(), null);
            if (ScheduleTypeEnum.CRON == scheduleTypeEnum) {
                Date nextValidTime = new CronExpression(jobInfo.getScheduleConf()).getNextValidTimeAfter(fromTime);
                return nextValidTime;
            } else if (ScheduleTypeEnum.FIX_RATE == scheduleTypeEnum /*|| ScheduleTypeEnum.FIX_DELAY == scheduleTypeEnum*/) {
                return new Date(fromTime.getTime() + Integer.valueOf(jobInfo.getScheduleConf())*1000 );
            }
            return null;
        }
    
    }
    View Code

      触发任务调度。cron 表达式以及获取数据库开启运行的任务也是在这里运行的。这里也是Cron 表达式计算的核心逻辑。

    1》线程 scheduleThread  相当于以固定周期从数据库提前读取快要加载的job的id, 然后判断添加到ringData 中, 最后根据cron 表达式计算出下次触发时间,然后同步到数据库中。

    2》ringThread 从上面的ringData 中获取到数据执行,触发任务的执行。 

     3. 一次任务执行流程

    1. com.xxl.job.admin.core.thread.JobTriggerPoolHelper#trigger 添加到任务队列

    2. com.xxl.job.admin.core.thread.JobTriggerPoolHelper#addTrigger 线程池中跑任务

    3. com.xxl.job.admin.core.trigger.XxlJobTrigger#trigger 触发任务

      继续调用 com.xxl.job.admin.core.trigger.XxlJobTrigger#processTrigger 获取到参数之后构造TriggerParam 发送http 请求调用(调用到com.xxl.job.core.biz.client.ExecutorBizClient#run 发送http 请求), 同时构造XxlJobLog 保存相关日志信息。

    4. 客戶端收到请求,会调用到 com.xxl.job.core.server.EmbedServer.EmbedHttpServerHandler#process 执行请求。 最后交给异步线程池: com.xxl.job.core.thread.JobThread。 

    线程池, 最后会com.xxl.job.core.thread.TriggerCallbackThread#pushCallBack 生成回调信息。 然后调用com.xxl.job.core.biz.client.AdminBizClient#callback 向admin 调度中心发送 /callback 信息 (走http 发送信息到admin 调度信息)。

    5. admin 调度中心收到callback 回调后, 调用到: com.xxl.job.admin.controller.JobApiController#api 。 然后调用到: com.xxl.job.admin.core.thread.JobCompleteHelper#callback(java.util.List<com.xxl.job.core.biz.model.HandleCallbackParam>) -》 com.xxl.job.admin.core.thread.JobCompleteHelper#callback(com.xxl.job.core.biz.model.HandleCallbackParam)

    -》 com.xxl.job.admin.core.complete.XxlJobCompleter#finishJob 结束任务, 如果有子任务, 继续执行子任务

    补充:对于分片任务的执行

    1. 测试jiob

        @XxlJob("shardingJobHandler")
        public void shardingJobHandler() throws Exception {
    
            // 分片参数
            int shardIndex = XxlJobHelper.getShardIndex();
            int shardTotal = XxlJobHelper.getShardTotal();
    
            XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
            System.out.println(Thread.currentThread().getName());
    
            // 业务逻辑
            for (int i = 0; i < shardTotal; i++) {
                if (i == shardIndex) {
                    System.out.println("第 " +  i+ " 片, 命中分片开始处理");
                    XxlJobHelper.log("第 {} 片, 命中分片开始处理", i);
                } else {
                    System.out.println("第 " +  i+ " 片, 忽略");
                    XxlJobHelper.log("第 {} 片, 忽略", i);
                }
            }
    
        }

    2. 配置

     3. 查看触发机制

    com.xxl.job.admin.core.trigger.XxlJobTrigger#trigger

        public static void trigger(int jobId,
                                   TriggerTypeEnum triggerType,
                                   int failRetryCount,
                                   String executorShardingParam,
                                   String executorParam,
                                   String addressList) {
    
            // load data
            XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
            if (jobInfo == null) {
                logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
                return;
            }
            if (executorParam != null) {
                jobInfo.setExecutorParam(executorParam);
            }
            int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
            XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
    
            // cover addressList
            if (addressList!=null && addressList.trim().length()>0) {
                group.setAddressType(1);
                group.setAddressList(addressList.trim());
            }
    
            // sharding param
            int[] shardingParam = null;
            if (executorShardingParam!=null){
                String[] shardingArr = executorShardingParam.split("/");
                if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
                    shardingParam = new int[2];
                    shardingParam[0] = Integer.valueOf(shardingArr[0]);
                    shardingParam[1] = Integer.valueOf(shardingArr[1]);
                }
            }
            if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
                    && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
                    && shardingParam==null) {
                for (int i = 0; i < group.getRegistryList().size(); i++) {
                    processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
                }
            } else {
                if (shardingParam == null) {
                    shardingParam = new int[]{0, 1};
                }
                processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
            }
    
        }

      这里判断如果是分片策略。获取到数量,给每个主机一个分片顺序,然后遍历主机进行调用。相当于在线的主机都会调用一次,然后在线主机可以根据自己的顺序执行不同的业务逻辑。最后发送的请求参数个数如下:

    【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】
  • 相关阅读:
    关于js的语句类型运算符等
    关于flex的布局理解
    三天来都在写项目;今天开始学习了js
    12.13的学习内容
    Css多列语法笔记
    Css3关键帧动画
    codevs1085数字游戏(环形DP+划分DP )
    codevs1040统计单词个数(区间+划分型dp)
    POJ1062昂贵的聘礼
    POJ3687Labeling Balls
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/15510945.html
Copyright © 2011-2022 走看看