简单了解下xxl-job-admin 任务调度中心原理。
1. 服务启动
服务启动之后之后,查看开启的线程如下:
可以看到启动的后台线程,接下来查看启动的后台线程所做的事情。
2. 线程开启
1. 服务启动配置类
com.xxl.job.admin.core.conf.XxlJobAdminConfig
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
package com.xxl.job.admin.core.conf; import com.xxl.job.admin.core.alarm.JobAlarmer; import com.xxl.job.admin.core.scheduler.XxlJobScheduler; import com.xxl.job.admin.dao.*; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; import org.springframework.mail.javamail.JavaMailSender; import org.springframework.stereotype.Component; import javax.annotation.Resource; import javax.sql.DataSource; import java.util.Arrays; /** * xxl-job config * * @author xuxueli 2017-04-28 */ @Component public class XxlJobAdminConfig implements InitializingBean, DisposableBean { private static XxlJobAdminConfig adminConfig = null; public static XxlJobAdminConfig getAdminConfig() { return adminConfig; } // ---------------------- XxlJobScheduler ---------------------- private XxlJobScheduler xxlJobScheduler; @Override public void afterPropertiesSet() throws Exception { adminConfig = this; xxlJobScheduler = new XxlJobScheduler(); xxlJobScheduler.init(); } @Override public void destroy() throws Exception { xxlJobScheduler.destroy(); } // ---------------------- XxlJobScheduler ---------------------- // conf @Value("${xxl.job.i18n}") private String i18n; @Value("${xxl.job.accessToken}") private String accessToken; @Value("${spring.mail.from}") private String emailFrom; @Value("${xxl.job.triggerpool.fast.max}") private int triggerPoolFastMax; @Value("${xxl.job.triggerpool.slow.max}") private int triggerPoolSlowMax; @Value("${xxl.job.logretentiondays}") private int logretentiondays; // dao, service @Resource private XxlJobLogDao xxlJobLogDao; @Resource private XxlJobInfoDao xxlJobInfoDao; @Resource private XxlJobRegistryDao xxlJobRegistryDao; @Resource private XxlJobGroupDao xxlJobGroupDao; @Resource private XxlJobLogReportDao xxlJobLogReportDao; @Resource private JavaMailSender mailSender; @Resource private DataSource dataSource; @Resource private JobAlarmer jobAlarmer; public String getI18n() { if (!Arrays.asList("zh_CN", "zh_TC", "en").contains(i18n)) { return "zh_CN"; } return i18n; } public String getAccessToken() { return accessToken; } public String getEmailFrom() { return emailFrom; } public int getTriggerPoolFastMax() { if (triggerPoolFastMax < 200) { return 200; } return triggerPoolFastMax; } public int getTriggerPoolSlowMax() { if (triggerPoolSlowMax < 100) { return 100; } return triggerPoolSlowMax; } public int getLogretentiondays() { if (logretentiondays < 7) { return -1; // Limit greater than or equal to 7, otherwise close } return logretentiondays; } public XxlJobLogDao getXxlJobLogDao() { return xxlJobLogDao; } public XxlJobInfoDao getXxlJobInfoDao() { return xxlJobInfoDao; } public XxlJobRegistryDao getXxlJobRegistryDao() { return xxlJobRegistryDao; } public XxlJobGroupDao getXxlJobGroupDao() { return xxlJobGroupDao; } public XxlJobLogReportDao getXxlJobLogReportDao() { return xxlJobLogReportDao; } public JavaMailSender getMailSender() { return mailSender; } public DataSource getDataSource() { return dataSource; } public JobAlarmer getJobAlarmer() { return jobAlarmer; } }
如下配置类可以看到,请求转发到了:com.xxl.job.admin.core.scheduler.XxlJobScheduler
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
package com.xxl.job.admin.core.scheduler; import com.xxl.job.admin.core.conf.XxlJobAdminConfig; import com.xxl.job.admin.core.thread.*; import com.xxl.job.admin.core.util.I18nUtil; import com.xxl.job.core.biz.ExecutorBiz; import com.xxl.job.core.biz.client.ExecutorBizClient; import com.xxl.job.core.enums.ExecutorBlockStrategyEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; /** * @author xuxueli 2018-10-28 00:18:17 */ public class XxlJobScheduler { private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class); public void init() throws Exception { // init i18n initI18n(); // admin trigger pool start JobTriggerPoolHelper.toStart(); // admin registry monitor run JobRegistryHelper.getInstance().start(); // admin fail-monitor run JobFailMonitorHelper.getInstance().start(); // admin lose-monitor run ( depend on JobTriggerPoolHelper ) JobCompleteHelper.getInstance().start(); // admin log report start JobLogReportHelper.getInstance().start(); // start-schedule ( depend on JobTriggerPoolHelper ) JobScheduleHelper.getInstance().start(); logger.info(">>>>>>>>> init xxl-job admin success."); } public void destroy() throws Exception { // stop-schedule JobScheduleHelper.getInstance().toStop(); // admin log report stop JobLogReportHelper.getInstance().toStop(); // admin lose-monitor stop JobCompleteHelper.getInstance().toStop(); // admin fail-monitor stop JobFailMonitorHelper.getInstance().toStop(); // admin registry stop JobRegistryHelper.getInstance().toStop(); // admin trigger pool stop JobTriggerPoolHelper.toStop(); } // ---------------------- I18n ---------------------- private void initI18n(){ for (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum.values()) { item.setTitle(I18nUtil.getString("jobconf_block_".concat(item.name()))); } } // ---------------------- executor-client ---------------------- private static ConcurrentMap<String, ExecutorBiz> executorBizRepository = new ConcurrentHashMap<String, ExecutorBiz>(); public static ExecutorBiz getExecutorBiz(String address) throws Exception { // valid if (address==null || address.trim().length()==0) { return null; } // load-cache address = address.trim(); ExecutorBiz executorBiz = executorBizRepository.get(address); if (executorBiz != null) { return executorBiz; } // set-cache executorBiz = new ExecutorBizClient(address, XxlJobAdminConfig.getAdminConfig().getAccessToken()); executorBizRepository.put(address, executorBiz); return executorBiz; } }
从init 方法可以看到启动了一系列的线程。下面分析几个线程的作用。
2. com.xxl.job.admin.core.thread.JobTriggerPoolHelper
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
package com.xxl.job.admin.core.thread; import com.xxl.job.admin.core.conf.XxlJobAdminConfig; import com.xxl.job.admin.core.trigger.TriggerTypeEnum; import com.xxl.job.admin.core.trigger.XxlJobTrigger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * job trigger thread pool helper * * @author xuxueli 2018-07-03 21:08:07 */ public class JobTriggerPoolHelper { private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class); // ---------------------- trigger pool ---------------------- // fast/slow thread pool private ThreadPoolExecutor fastTriggerPool = null; private ThreadPoolExecutor slowTriggerPool = null; public void start(){ fastTriggerPool = new ThreadPoolExecutor( 10, XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode()); } }); slowTriggerPool = new ThreadPoolExecutor( 10, XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode()); } }); } public void stop() { //triggerPool.shutdown(); fastTriggerPool.shutdownNow(); slowTriggerPool.shutdownNow(); logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success."); } // job timeout count private volatile long minTim = System.currentTimeMillis()/60000; // ms > min private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>(); /** * add trigger */ public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam, final String addressList) { // choose thread pool ThreadPoolExecutor triggerPool_ = fastTriggerPool; AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId); if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min triggerPool_ = slowTriggerPool; } // trigger triggerPool_.execute(new Runnable() { @Override public void run() { long start = System.currentTimeMillis(); try { // do trigger XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList); } catch (Exception e) { logger.error(e.getMessage(), e); } finally { // check timeout-count-map long minTim_now = System.currentTimeMillis()/60000; if (minTim != minTim_now) { minTim = minTim_now; jobTimeoutCountMap.clear(); } // incr timeout-count-map long cost = System.currentTimeMillis()-start; if (cost > 500) { // ob-timeout threshold 500ms AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1)); if (timeoutCount != null) { timeoutCount.incrementAndGet(); } } } } }); } // ---------------------- helper ---------------------- private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper(); public static void toStart() { helper.start(); } public static void toStop() { helper.stop(); } /** * @param jobId * @param triggerType * @param failRetryCount * >=0: use this param * <0: use param from job info config * @param executorShardingParam * @param executorParam * null: use job param * not null: cover job param */ public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) { helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList); } }
这个线程的核心作用是触发定时任务的线程。这里快慢线程池,用于执行不同频率的线程。
3. com.xxl.job.admin.core.thread.JobRegistryHelper
注册线程源码如下:
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
package com.xxl.job.admin.core.thread; import com.xxl.job.admin.core.conf.XxlJobAdminConfig; import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobRegistry; import com.xxl.job.core.biz.model.RegistryParam; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.enums.RegistryConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.StringUtils; import java.util.*; import java.util.concurrent.*; /** * job registry instance * @author xuxueli 2016-10-02 19:10:24 */ public class JobRegistryHelper { private static Logger logger = LoggerFactory.getLogger(JobRegistryHelper.class); private static JobRegistryHelper instance = new JobRegistryHelper(); public static JobRegistryHelper getInstance(){ return instance; } private ThreadPoolExecutor registryOrRemoveThreadPool = null; private Thread registryMonitorThread; private volatile boolean toStop = false; public void start(){ // for registry or remove registryOrRemoveThreadPool = new ThreadPoolExecutor( 2, 10, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode()); } }, new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { r.run(); logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now)."); } }); // for monitor registryMonitorThread = new Thread(new Runnable() { @Override public void run() { while (!toStop) { try { // auto registry group List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0); if (groupList!=null && !groupList.isEmpty()) { // remove dead address (admin/executor) List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date()); if (ids!=null && ids.size()>0) { XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids); } // fresh online address (admin/executor) HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>(); List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date()); if (list != null) { for (XxlJobRegistry item: list) { if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) { String appname = item.getRegistryKey(); List<String> registryList = appAddressMap.get(appname); if (registryList == null) { registryList = new ArrayList<String>(); } if (!registryList.contains(item.getRegistryValue())) { registryList.add(item.getRegistryValue()); } appAddressMap.put(appname, registryList); } } } // fresh group address for (XxlJobGroup group: groupList) { List<String> registryList = appAddressMap.get(group.getAppname()); String addressListStr = null; if (registryList!=null && !registryList.isEmpty()) { Collections.sort(registryList); StringBuilder addressListSB = new StringBuilder(); for (String item:registryList) { addressListSB.append(item).append(","); } addressListStr = addressListSB.toString(); addressListStr = addressListStr.substring(0, addressListStr.length()-1); } group.setAddressList(addressListStr); group.setUpdateTime(new Date()); XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group); } } } catch (Exception e) { if (!toStop) { logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e); } } try { TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); } catch (InterruptedException e) { if (!toStop) { logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e); } } } logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop"); } }); registryMonitorThread.setDaemon(true); registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread"); registryMonitorThread.start(); } public void toStop(){ toStop = true; // stop registryOrRemoveThreadPool registryOrRemoveThreadPool.shutdownNow(); // stop monitir (interrupt and wait) registryMonitorThread.interrupt(); try { registryMonitorThread.join(); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } } // ---------------------- helper ---------------------- public ReturnT<String> registry(RegistryParam registryParam) { // valid if (!StringUtils.hasText(registryParam.getRegistryGroup()) || !StringUtils.hasText(registryParam.getRegistryKey()) || !StringUtils.hasText(registryParam.getRegistryValue())) { return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument."); } // async execute registryOrRemoveThreadPool.execute(new Runnable() { @Override public void run() { int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date()); if (ret < 1) { XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date()); // fresh freshGroupRegistryInfo(registryParam); } } }); return ReturnT.SUCCESS; } public ReturnT<String> registryRemove(RegistryParam registryParam) { // valid if (!StringUtils.hasText(registryParam.getRegistryGroup()) || !StringUtils.hasText(registryParam.getRegistryKey()) || !StringUtils.hasText(registryParam.getRegistryValue())) { return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument."); } // async execute registryOrRemoveThreadPool.execute(new Runnable() { @Override public void run() { int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryDelete(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue()); if (ret > 0) { // fresh freshGroupRegistryInfo(registryParam); } } }); return ReturnT.SUCCESS; } private void freshGroupRegistryInfo(RegistryParam registryParam){ // Under consideration, prevent affecting core tables } }
这个类开启了一个线程池registryOrRemoveThreadPool , 一个定时任务 registryMonitorThread 。
(1) registryOrRemoveThreadPool 线程池主要用于处理执行客户端的注册和删除任务。对于服务端生效的主要是com.xxl.job.admin.core.thread.JobRegistryHelper#registryRemove 方法,也就是执行com.xxl.job.admin.dao.XxlJobRegistryDao#registryDelete 删除方法。其SQL如下:
<delete id="registryDelete" > DELETE FROM xxl_job_registry WHERE registry_group = #{registryGroup} AND registry_key = #{registryKey} AND registry_value = #{registryValue} </delete>
(2) registryMonitorThread 相当于个定时任务, 以30 s 为周期。 每次获取过去90 s 没有收到心跳的注册信息, 然后删掉之后更新XxlJobGroup 的地址信息。
1》com.xxl.job.admin.dao.XxlJobRegistryDao#findDead 找90 s 没有发送心跳的注册信息
<select id="findDead" parameterType="java.util.HashMap" resultType="java.lang.Integer" > SELECT t.id FROM xxl_job_registry AS t WHERE t.update_time <![CDATA[ < ]]> DATE_ADD(#{nowTime},INTERVAL -#{timeout} SECOND) </select>
2》 com.xxl.job.admin.dao.XxlJobRegistryDao#removeDead 删除过期注册信息
<delete id="removeDead" parameterType="java.lang.Integer" > DELETE FROM xxl_job_registry WHERE id in <foreach collection="ids" item="item" open="(" close=")" separator="," > #{item} </foreach> </delete>
3》 com.xxl.job.admin.dao.XxlJobGroupDao#update 更新任务组的地址信息,这个就不贴sql 了
public int update(XxlJobGroup xxlJobGroup);
4. com.xxl.job.admin.core.thread.JobFailMonitorHelper 处理失败任务
源码如下:
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
package com.xxl.job.admin.core.thread; import com.xxl.job.admin.core.conf.XxlJobAdminConfig; import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobLog; import com.xxl.job.admin.core.trigger.TriggerTypeEnum; import com.xxl.job.admin.core.util.I18nUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.concurrent.TimeUnit; /** * job monitor instance * * @author xuxueli 2015-9-1 18:05:56 */ public class JobFailMonitorHelper { private static Logger logger = LoggerFactory.getLogger(JobFailMonitorHelper.class); private static JobFailMonitorHelper instance = new JobFailMonitorHelper(); public static JobFailMonitorHelper getInstance(){ return instance; } // ---------------------- monitor ---------------------- private Thread monitorThread; private volatile boolean toStop = false; public void start(){ monitorThread = new Thread(new Runnable() { @Override public void run() { // monitor while (!toStop) { try { List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000); if (failLogIds!=null && !failLogIds.isEmpty()) { for (long failLogId: failLogIds) { // lock log int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1); if (lockRet < 1) { continue; } XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId); XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId()); // 1、fail retry monitor if (log.getExecutorFailRetryCount() > 0) { JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null); String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>"; log.setTriggerMsg(log.getTriggerMsg() + retryMsg); XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log); } // 2、fail alarm monitor int newAlarmStatus = 0; // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败 if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) { boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log); newAlarmStatus = alarmResult?2:3; } else { newAlarmStatus = 1; } XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus); } } } catch (Exception e) { if (!toStop) { logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e); } } try { TimeUnit.SECONDS.sleep(10); } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } } logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop"); } }); monitorThread.setDaemon(true); monitorThread.setName("xxl-job, admin JobFailMonitorHelper"); monitorThread.start(); } public void toStop(){ toStop = true; // interrupt and wait monitorThread.interrupt(); try { monitorThread.join(); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } } }
可以看出也是以10 s 为周期执行定时任务。
1》 com.xxl.job.admin.dao.XxlJobLogDao#findFailJobLogIds 查找失败日志的ID
2》 遍历上面的ID集合, 执行com.xxl.job.admin.dao.XxlJobLogDao#updateAlarmStatus 修改alarm 状态为 -1, 如果修改行数大于1 就执行下面, 否则跳过该ID
3》 com.xxl.job.admin.dao.XxlJobLogDao#load 找失败的日志
4》 com.xxl.job.admin.dao.XxlJobInfoDao#loadById 找到XxlJobInfo 任务信息
5》 com.xxl.job.admin.core.thread.JobTriggerPoolHelper#trigger 尝试再次执行
6》 com.xxl.job.admin.core.alarm.JobAlarm#doAlarm 发送失败日志, 配置了 email 就发送日志
7》 com.xxl.job.admin.dao.XxlJobLogDao#updateAlarmStatus 更新alarm 状态
5. com.xxl.job.admin.core.thread.JobCompleteHelper
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
package com.xxl.job.admin.core.thread; import com.xxl.job.admin.core.complete.XxlJobCompleter; import com.xxl.job.admin.core.conf.XxlJobAdminConfig; import com.xxl.job.admin.core.model.XxlJobLog; import com.xxl.job.admin.core.util.I18nUtil; import com.xxl.job.core.biz.model.HandleCallbackParam; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.util.DateUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Date; import java.util.List; import java.util.concurrent.*; /** * job lose-monitor instance * * @author xuxueli 2015-9-1 18:05:56 */ public class JobCompleteHelper { private static Logger logger = LoggerFactory.getLogger(JobCompleteHelper.class); private static JobCompleteHelper instance = new JobCompleteHelper(); public static JobCompleteHelper getInstance(){ return instance; } // ---------------------- monitor ---------------------- private ThreadPoolExecutor callbackThreadPool = null; private Thread monitorThread; private volatile boolean toStop = false; public void start(){ // for callback callbackThreadPool = new ThreadPoolExecutor( 2, 20, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" + r.hashCode()); } }, new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { r.run(); logger.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now)."); } }); // for monitor monitorThread = new Thread(new Runnable() { @Override public void run() { // wait for JobTriggerPoolHelper-init try { TimeUnit.MILLISECONDS.sleep(50); } catch (InterruptedException e) { if (!toStop) { logger.error(e.getMessage(), e); } } // monitor while (!toStop) { try { // 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败; Date losedTime = DateUtil.addMinutes(new Date(), -10); List<Long> losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime); if (losedJobIds!=null && losedJobIds.size()>0) { for (Long logId: losedJobIds) { XxlJobLog jobLog = new XxlJobLog(); jobLog.setId(logId); jobLog.setHandleTime(new Date()); jobLog.setHandleCode(ReturnT.FAIL_CODE); jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") ); XxlJobCompleter.updateHandleInfoAndFinish(jobLog); } } } catch (Exception e) { if (!toStop) { logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e); } } try { TimeUnit.SECONDS.sleep(60); } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } } logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop"); } }); monitorThread.setDaemon(true); monitorThread.setName("xxl-job, admin JobLosedMonitorHelper"); monitorThread.start(); } public void toStop(){ toStop = true; // stop registryOrRemoveThreadPool callbackThreadPool.shutdownNow(); // stop monitorThread (interrupt and wait) monitorThread.interrupt(); try { monitorThread.join(); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } } // ---------------------- helper ---------------------- public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) { callbackThreadPool.execute(new Runnable() { @Override public void run() { for (HandleCallbackParam handleCallbackParam: callbackParamList) { ReturnT<String> callbackResult = callback(handleCallbackParam); logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}", (callbackResult.getCode()== ReturnT.SUCCESS_CODE?"success":"fail"), handleCallbackParam, callbackResult); } } }); return ReturnT.SUCCESS; } private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) { // valid log item XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(handleCallbackParam.getLogId()); if (log == null) { return new ReturnT<String>(ReturnT.FAIL_CODE, "log item not found."); } if (log.getHandleCode() > 0) { return new ReturnT<String>(ReturnT.FAIL_CODE, "log repeate callback."); // avoid repeat callback, trigger child job etc } // handle msg StringBuffer handleMsg = new StringBuffer(); if (log.getHandleMsg()!=null) { handleMsg.append(log.getHandleMsg()).append("<br>"); } if (handleCallbackParam.getHandleMsg() != null) { handleMsg.append(handleCallbackParam.getHandleMsg()); } // success, save log log.setHandleTime(new Date()); log.setHandleCode(handleCallbackParam.getHandleCode()); log.setHandleMsg(handleMsg.toString()); XxlJobCompleter.updateHandleInfoAndFinish(log); return ReturnT.SUCCESS; } }
callbackThreadPool 线程池执行callbackThreadPool 回调, 用于记录日志、触发子任务等操作。
monitorThread 线程池任务结果丢失处理, 相当于不执行任务,直接结束。
6. com.xxl.job.admin.core.thread.JobLogReportHelper
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
package com.xxl.job.admin.core.thread; import com.xxl.job.admin.core.conf.XxlJobAdminConfig; import com.xxl.job.admin.core.model.XxlJobLogReport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Calendar; import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; /** * job log report helper * * @author xuxueli 2019-11-22 */ public class JobLogReportHelper { private static Logger logger = LoggerFactory.getLogger(JobLogReportHelper.class); private static JobLogReportHelper instance = new JobLogReportHelper(); public static JobLogReportHelper getInstance(){ return instance; } private Thread logrThread; private volatile boolean toStop = false; public void start(){ logrThread = new Thread(new Runnable() { @Override public void run() { // last clean log time long lastCleanLogTime = 0; while (!toStop) { // 1、log-report refresh: refresh log report in 3 days try { for (int i = 0; i < 3; i++) { // today Calendar itemDay = Calendar.getInstance(); itemDay.add(Calendar.DAY_OF_MONTH, -i); itemDay.set(Calendar.HOUR_OF_DAY, 0); itemDay.set(Calendar.MINUTE, 0); itemDay.set(Calendar.SECOND, 0); itemDay.set(Calendar.MILLISECOND, 0); Date todayFrom = itemDay.getTime(); itemDay.set(Calendar.HOUR_OF_DAY, 23); itemDay.set(Calendar.MINUTE, 59); itemDay.set(Calendar.SECOND, 59); itemDay.set(Calendar.MILLISECOND, 999); Date todayTo = itemDay.getTime(); // refresh log-report every minute XxlJobLogReport xxlJobLogReport = new XxlJobLogReport(); xxlJobLogReport.setTriggerDay(todayFrom); xxlJobLogReport.setRunningCount(0); xxlJobLogReport.setSucCount(0); xxlJobLogReport.setFailCount(0); Map<String, Object> triggerCountMap = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLogReport(todayFrom, todayTo); if (triggerCountMap!=null && triggerCountMap.size()>0) { int triggerDayCount = triggerCountMap.containsKey("triggerDayCount")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCount"))):0; int triggerDayCountRunning = triggerCountMap.containsKey("triggerDayCountRunning")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountRunning"))):0; int triggerDayCountSuc = triggerCountMap.containsKey("triggerDayCountSuc")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountSuc"))):0; int triggerDayCountFail = triggerDayCount - triggerDayCountRunning - triggerDayCountSuc; xxlJobLogReport.setRunningCount(triggerDayCountRunning); xxlJobLogReport.setSucCount(triggerDayCountSuc); xxlJobLogReport.setFailCount(triggerDayCountFail); } // do refresh int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport); if (ret < 1) { XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport); } } } catch (Exception e) { if (!toStop) { logger.error(">>>>>>>>>>> xxl-job, job log report thread error:{}", e); } } // 2、log-clean: switch open & once each day if (XxlJobAdminConfig.getAdminConfig().getLogretentiondays()>0 && System.currentTimeMillis() - lastCleanLogTime > 24*60*60*1000) { // expire-time Calendar expiredDay = Calendar.getInstance(); expiredDay.add(Calendar.DAY_OF_MONTH, -1 * XxlJobAdminConfig.getAdminConfig().getLogretentiondays()); expiredDay.set(Calendar.HOUR_OF_DAY, 0); expiredDay.set(Calendar.MINUTE, 0); expiredDay.set(Calendar.SECOND, 0); expiredDay.set(Calendar.MILLISECOND, 0); Date clearBeforeTime = expiredDay.getTime(); // clean expired log List<Long> logIds = null; do { logIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findClearLogIds(0, 0, clearBeforeTime, 0, 1000); if (logIds!=null && logIds.size()>0) { XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().clearLog(logIds); } } while (logIds!=null && logIds.size()>0); // update clean time lastCleanLogTime = System.currentTimeMillis(); } try { TimeUnit.MINUTES.sleep(1); } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } } logger.info(">>>>>>>>>>> xxl-job, job log report thread stop"); } }); logrThread.setDaemon(true); logrThread.setName("xxl-job, admin JobLogReportHelper"); logrThread.start(); } public void toStop(){ toStop = true; // interrupt and wait logrThread.interrupt(); try { logrThread.join(); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } } }
com.xxl.job.admin.core.thread.JobLogReportHelper#logrThread 线程用于生成调度报告,然后清除过期日志。
7. com.xxl.job.admin.core.thread.JobScheduleHelper
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
package com.xxl.job.admin.core.thread; import com.xxl.job.admin.core.conf.XxlJobAdminConfig; import com.xxl.job.admin.core.cron.CronExpression; import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.scheduler.MisfireStrategyEnum; import com.xxl.job.admin.core.scheduler.ScheduleTypeEnum; import com.xxl.job.admin.core.trigger.TriggerTypeEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; /** * @author xuxueli 2019-05-21 */ public class JobScheduleHelper { private static Logger logger = LoggerFactory.getLogger(JobScheduleHelper.class); private static JobScheduleHelper instance = new JobScheduleHelper(); public static JobScheduleHelper getInstance(){ return instance; } public static final long PRE_READ_MS = 5000; // pre read private Thread scheduleThread; private Thread ringThread; private volatile boolean scheduleThreadToStop = false; private volatile boolean ringThreadToStop = false; private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>(); public void start(){ // schedule thread scheduleThread = new Thread(new Runnable() { @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 ); } catch (InterruptedException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } logger.info(">>>>>>>>> init xxl-job admin scheduler success."); // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20) int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20; while (!scheduleThreadToStop) { // Scan Job long start = System.currentTimeMillis(); Connection conn = null; Boolean connAutoCommit = null; PreparedStatement preparedStatement = null; boolean preReadSuc = true; try { conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection(); connAutoCommit = conn.getAutoCommit(); conn.setAutoCommit(false); preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" ); preparedStatement.execute(); // tx start // 1、pre read long nowTime = System.currentTimeMillis(); List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount); if (scheduleList!=null && scheduleList.size()>0) { // 2、push time-ring for (XxlJobInfo jobInfo: scheduleList) { // time-ring jump if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) { // 2.1、trigger-expire > 5s:pass && make next-trigger-time logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId()); // 1、misfire match MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING); if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) { // FIRE_ONCE_NOW 》 trigger JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null); logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ); } // 2、fresh next refreshNextValidTime(jobInfo, new Date()); } else if (nowTime > jobInfo.getTriggerNextTime()) { // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time // 1、trigger JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null); logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ); // 2、fresh next refreshNextValidTime(jobInfo, new Date()); // next-trigger-time in 5s, pre-read again if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) { // 1、make ring second int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); // 2、push time ring pushTimeRing(ringSecond, jobInfo.getId()); // 3、fresh next refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime())); } } else { // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time // 1、make ring second int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); // 2、push time ring pushTimeRing(ringSecond, jobInfo.getId()); // 3、fresh next refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime())); } } // 3、update trigger info for (XxlJobInfo jobInfo: scheduleList) { XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo); } } else { preReadSuc = false; } // tx stop } catch (Exception e) { if (!scheduleThreadToStop) { logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e); } } finally { // commit if (conn != null) { try { conn.commit(); } catch (SQLException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } try { conn.setAutoCommit(connAutoCommit); } catch (SQLException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } try { conn.close(); } catch (SQLException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } } // close PreparedStatement if (null != preparedStatement) { try { preparedStatement.close(); } catch (SQLException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } } } long cost = System.currentTimeMillis()-start; // Wait seconds, align second if (cost < 1000) { // scan-overtime, not wait try { // pre-read period: success > scan each second; fail > skip this period; TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000); } catch (InterruptedException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } } } logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop"); } }); scheduleThread.setDaemon(true); scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread"); scheduleThread.start(); // ring thread ringThread = new Thread(new Runnable() { @Override public void run() { while (!ringThreadToStop) { // align second try { TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000); } catch (InterruptedException e) { if (!ringThreadToStop) { logger.error(e.getMessage(), e); } } try { // second data List<Integer> ringItemData = new ArrayList<>(); int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度; for (int i = 0; i < 2; i++) { List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 ); if (tmpData != null) { ringItemData.addAll(tmpData); } } // ring trigger logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) ); if (ringItemData.size() > 0) { // do trigger for (int jobId: ringItemData) { // do trigger JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null); } // clear ringItemData.clear(); } } catch (Exception e) { if (!ringThreadToStop) { logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e); } } } logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop"); } }); ringThread.setDaemon(true); ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread"); ringThread.start(); } private void refreshNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception { Date nextValidTime = generateNextValidTime(jobInfo, fromTime); if (nextValidTime != null) { jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime()); jobInfo.setTriggerNextTime(nextValidTime.getTime()); } else { jobInfo.setTriggerStatus(0); jobInfo.setTriggerLastTime(0); jobInfo.setTriggerNextTime(0); logger.warn(">>>>>>>>>>> xxl-job, refreshNextValidTime fail for job: jobId={}, scheduleType={}, scheduleConf={}", jobInfo.getId(), jobInfo.getScheduleType(), jobInfo.getScheduleConf()); } } private void pushTimeRing(int ringSecond, int jobId){ // push async ring List<Integer> ringItemData = ringData.get(ringSecond); if (ringItemData == null) { ringItemData = new ArrayList<Integer>(); ringData.put(ringSecond, ringItemData); } ringItemData.add(jobId); logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) ); } public void toStop(){ // 1、stop schedule scheduleThreadToStop = true; try { TimeUnit.SECONDS.sleep(1); // wait } catch (InterruptedException e) { logger.error(e.getMessage(), e); } if (scheduleThread.getState() != Thread.State.TERMINATED){ // interrupt and wait scheduleThread.interrupt(); try { scheduleThread.join(); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } } // if has ring data boolean hasRingData = false; if (!ringData.isEmpty()) { for (int second : ringData.keySet()) { List<Integer> tmpData = ringData.get(second); if (tmpData!=null && tmpData.size()>0) { hasRingData = true; break; } } } if (hasRingData) { try { TimeUnit.SECONDS.sleep(8); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } } // stop ring (wait job-in-memory stop) ringThreadToStop = true; try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } if (ringThread.getState() != Thread.State.TERMINATED){ // interrupt and wait ringThread.interrupt(); try { ringThread.join(); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } } logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper stop"); } // ---------------------- tools ---------------------- public static Date generateNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception { ScheduleTypeEnum scheduleTypeEnum = ScheduleTypeEnum.match(jobInfo.getScheduleType(), null); if (ScheduleTypeEnum.CRON == scheduleTypeEnum) { Date nextValidTime = new CronExpression(jobInfo.getScheduleConf()).getNextValidTimeAfter(fromTime); return nextValidTime; } else if (ScheduleTypeEnum.FIX_RATE == scheduleTypeEnum /*|| ScheduleTypeEnum.FIX_DELAY == scheduleTypeEnum*/) { return new Date(fromTime.getTime() + Integer.valueOf(jobInfo.getScheduleConf())*1000 ); } return null; } }
触发任务调度。cron 表达式以及获取数据库开启运行的任务也是在这里运行的。这里也是Cron 表达式计算的核心逻辑。
1》线程 scheduleThread 相当于以固定周期从数据库提前读取快要加载的job的id, 然后判断添加到ringData 中, 最后根据cron 表达式计算出下次触发时间,然后同步到数据库中。
2》ringThread 从上面的ringData 中获取到数据执行,触发任务的执行。
3. 一次任务执行流程
1. com.xxl.job.admin.core.thread.JobTriggerPoolHelper#trigger 添加到任务队列
2. com.xxl.job.admin.core.thread.JobTriggerPoolHelper#addTrigger 线程池中跑任务
3. com.xxl.job.admin.core.trigger.XxlJobTrigger#trigger 触发任务
继续调用 com.xxl.job.admin.core.trigger.XxlJobTrigger#processTrigger 获取到参数之后构造TriggerParam 发送http 请求调用(调用到com.xxl.job.core.biz.client.ExecutorBizClient#run 发送http 请求), 同时构造XxlJobLog 保存相关日志信息。
4. 客戶端收到请求,会调用到 com.xxl.job.core.server.EmbedServer.EmbedHttpServerHandler#process 执行请求。 最后交给异步线程池: com.xxl.job.core.thread.JobThread。
线程池, 最后会com.xxl.job.core.thread.TriggerCallbackThread#pushCallBack 生成回调信息。 然后调用com.xxl.job.core.biz.client.AdminBizClient#callback 向admin 调度中心发送 /callback 信息 (走http 发送信息到admin 调度信息)。
5. admin 调度中心收到callback 回调后, 调用到: com.xxl.job.admin.controller.JobApiController#api 。 然后调用到: com.xxl.job.admin.core.thread.JobCompleteHelper#callback(java.util.List<com.xxl.job.core.biz.model.HandleCallbackParam>) -》 com.xxl.job.admin.core.thread.JobCompleteHelper#callback(com.xxl.job.core.biz.model.HandleCallbackParam)
-》 com.xxl.job.admin.core.complete.XxlJobCompleter#finishJob 结束任务, 如果有子任务, 继续执行子任务
补充:对于分片任务的执行
1. 测试jiob
@XxlJob("shardingJobHandler") public void shardingJobHandler() throws Exception { // 分片参数 int shardIndex = XxlJobHelper.getShardIndex(); int shardTotal = XxlJobHelper.getShardTotal(); XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal); System.out.println(Thread.currentThread().getName()); // 业务逻辑 for (int i = 0; i < shardTotal; i++) { if (i == shardIndex) { System.out.println("第 " + i+ " 片, 命中分片开始处理"); XxlJobHelper.log("第 {} 片, 命中分片开始处理", i); } else { System.out.println("第 " + i+ " 片, 忽略"); XxlJobHelper.log("第 {} 片, 忽略", i); } } }
2. 配置
3. 查看触发机制
com.xxl.job.admin.core.trigger.XxlJobTrigger#trigger
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) { // load data XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId); if (jobInfo == null) { logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId); return; } if (executorParam != null) { jobInfo.setExecutorParam(executorParam); } int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount(); XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup()); // cover addressList if (addressList!=null && addressList.trim().length()>0) { group.setAddressType(1); group.setAddressList(addressList.trim()); } // sharding param int[] shardingParam = null; if (executorShardingParam!=null){ String[] shardingArr = executorShardingParam.split("/"); if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) { shardingParam = new int[2]; shardingParam[0] = Integer.valueOf(shardingArr[0]); shardingParam[1] = Integer.valueOf(shardingArr[1]); } } if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null) && group.getRegistryList()!=null && !group.getRegistryList().isEmpty() && shardingParam==null) { for (int i = 0; i < group.getRegistryList().size(); i++) { processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size()); } } else { if (shardingParam == null) { shardingParam = new int[]{0, 1}; } processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]); } }
这里判断如果是分片策略。获取到数量,给每个主机一个分片顺序,然后遍历主机进行调用。相当于在线的主机都会调用一次,然后在线主机可以根据自己的顺序执行不同的业务逻辑。最后发送的请求参数个数如下: