zoukankan      html  css  js  c++  java
  • xxljob源码(一)服务端客户端简单理解

    1. 下载源码

    1. 到 https://gitee.com/xuxueli0323/xxl-job 下载源码

    2. 源码查看:

       可以看到有三个主要项目。 

    (1) xxl-job-admin:调度中心。 这里可以理解为调度中心,主要就是从界面添加用户、执行器(可以理解为一个任务组)、任务等都是服务写入到数据库的。可以看出这里实际是一个web 工程, 有自己的静态页面。 

    其工程结构如下:(有自己内部用的controller, 也有接收客户端注册等事件的接口 com.xxl.job.admin.controller.JobApiController。 这个controller 会根据客户端请求的uri 调用对应的方法进行操作)

    (2) xxl-job-core:公共依赖

    其结构如下:

       可以看到主要是作为核心组件, 在admin 调度中心和客户端都存在该依赖。  且该服务包含了调度中心和客户端操作对应的api。 比如com.xxl.job.core.biz.client.ExecutorBizClient 就是用于服务端调用客户端的时候走http 接口进行调用, 实现对应的相关方法。 com.xxl.job.core.biz.impl.ExecutorBizImpl 是用于客户端, 调用对应的handler 方法进行处理业务。(这也是一种设计思想。 客户端和服务端实现相同接口,类似于策略模式,服务端和客户端的机制分离。)

    (3 ) xxl-job-executor-samples:执行器Sample示例

    3. 倒入数据表之后查看数据库相关数据表

     几个重要的数据表如下:

    xxl_job_group    执行器

    xxl_job_info    任务

    xxl_job_lock    用于在任务调度的时候加行锁

    log: 日志记录表

    log_report: 统计报表相关

    logglue  记录glue模式的日志

    redistry  注册信息,客户端向服务端注册维持心跳日志记录的

    user  用户表

    4. 启动后录入几条数据查看数据

    (1) 新建用户    查看调用的接口是:/user/add, 查看源码是:

        @RequestMapping("/add")
        @ResponseBody
        @PermissionLimit(adminuser = true)
        public ReturnT<String> add(XxlJobUser xxlJobUser) {
    
            // valid username
            if (!StringUtils.hasText(xxlJobUser.getUsername())) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("system_please_input")+I18nUtil.getString("user_username") );
            }
            xxlJobUser.setUsername(xxlJobUser.getUsername().trim());
            if (!(xxlJobUser.getUsername().length()>=4 && xxlJobUser.getUsername().length()<=20)) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("system_lengh_limit")+"[4-20]" );
            }
            // valid password
            if (!StringUtils.hasText(xxlJobUser.getPassword())) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("system_please_input")+I18nUtil.getString("user_password") );
            }
            xxlJobUser.setPassword(xxlJobUser.getPassword().trim());
            if (!(xxlJobUser.getPassword().length()>=4 && xxlJobUser.getPassword().length()<=20)) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("system_lengh_limit")+"[4-20]" );
            }
            // md5 password
            xxlJobUser.setPassword(DigestUtils.md5DigestAsHex(xxlJobUser.getPassword().getBytes()));
    
            // check repeat
            XxlJobUser existUser = xxlJobUserDao.loadByUserName(xxlJobUser.getUsername());
            if (existUser != null) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("user_username_repeat") );
            }
    
            // write
            xxlJobUserDao.save(xxlJobUser);
            return ReturnT.SUCCESS;
        }

      可以看出是一个添加到user表的操作。这里也有一个基于角色的一个简单的设计。

    (2) 新建一个执行器(执行器可以理解为一个任务组)

    5. 到数据库查看:

    (1) job_group 表:

    (2) user 表:

     6. 新建一客户端项目:

    (1) pom 如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>cloud</artifactId>
            <groupId>cn.qz.cloud</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>cloud-xxl-job-8081</artifactId>
    
        <dependencies>
            <!-- xxl-job-core -->
            <dependency>
                <groupId>com.xuxueli</groupId>
                <artifactId>xxl-job-core</artifactId>
                <version>2.2.0</version>
            </dependency>
            <!--引入自己抽取的工具包-->
            <dependency>
                <groupId>cn.qz.cloud</groupId>
                <artifactId>cloud-api-commons</artifactId>
                <version>${project.version}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-actuator</artifactId>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    </project>

    (2) properties 配置文件:

    # web port
    server.port=8081
    # no web
    #spring.main.web-environment=false
    # log config
    logging.config=classpath:logback.xml
    ### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
    xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
    ### xxl-job, access token
    xxl.job.accessToken=
    ### xxl-job executor appname
    xxl.job.executor.appname=xxl-job-executor-test
    ### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
    xxl.job.executor.address=
    ### xxl-job executor server-info
    xxl.job.executor.ip=
    xxl.job.executor.port=9999
    ### xxl-job executor log-path
    xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
    ### xxl-job executor log-retention-days
    xxl.job.executor.logretentiondays=30

    (3) 建立两个Handler:

    cn.qz.cloud.job.FirstJob:

    package cn.qz.cloud.job;
    
    import com.xxl.job.core.biz.model.ReturnT;
    import com.xxl.job.core.handler.annotation.XxlJob;
    import com.xxl.job.core.log.XxlJobLogger;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;
    
    @Slf4j
    @Component
    public class FirstJob {
    
        @XxlJob(value = "firstJob", init = "init", destroy = "destroy")
        public ReturnT<String> execute(String param) {
            XxlJobLogger.log("XXL-JOB, firstJob. param: {}", param);
            log.info("XXL-JOB, firstJob. param: {}", param);
            return ReturnT.SUCCESS;
        }
    
        public void init() {
            log.info("init");
        }
    
        public void destroy() {
            log.info("destory");
        }
    }

    cn.qz.cloud.job.XXLClassJob:

    package cn.qz.cloud.job;
    
    import com.xxl.job.core.biz.model.ReturnT;
    import com.xxl.job.core.handler.IJobHandler;
    import com.xxl.job.core.log.XxlJobLogger;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.TimeUnit;
    @Slf4j
    public class XXLClassJob extends IJobHandler {
    
        @Override
        public ReturnT<String> execute(String param) throws Exception {
            log.info("XXLClassJob execute, param: {}", param);
            XxlJobLogger.log("XXLClassJob execute, param: {}", param);
    
            for (int i = 0; i < 5; i++) {
                log.info("XXLClassJob start, i: {} ", i);
                XxlJobLogger.log("XXLClassJob start, i: {} ", i);
                TimeUnit.SECONDS.sleep(2);
            }
            return ReturnT.SUCCESS;
        }
    }

     (4) 增加Configuration 配置信息:

    package cn.qz.cloud.config;
    
    import cn.qz.cloud.job.XXLClassJob;
    import com.xxl.job.core.executor.XxlJobExecutor;
    import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class XxlJobConfig {
        private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
    
        static {
            // 手动通过如下方式注入到执行器容器。
            XxlJobExecutor.registJobHandler("XXLClassJob", new XXLClassJob());
        }
    
        @Value("${xxl.job.admin.addresses}")
        private String adminAddresses;
    
        @Value("${xxl.job.accessToken}")
        private String accessToken;
    
        @Value("${xxl.job.executor.appname}")
        private String appname;
    
        @Value("${xxl.job.executor.address}")
        private String address;
    
        @Value("${xxl.job.executor.ip}")
        private String ip;
    
        @Value("${xxl.job.executor.port}")
        private int port;
    
        @Value("${xxl.job.executor.logpath}")
        private String logPath;
    
        @Value("${xxl.job.executor.logretentiondays}")
        private int logRetentionDays;
    
        @Bean
        public XxlJobSpringExecutor xxlJobExecutor() {
            logger.info(">>>>>>>>>>> xxl-job config init.");
            XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
            xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
            xxlJobSpringExecutor.setAppname(appname);
            xxlJobSpringExecutor.setAddress(address);
            xxlJobSpringExecutor.setIp(ip);
            xxlJobSpringExecutor.setPort(port);
            xxlJobSpringExecutor.setAccessToken(accessToken);
            xxlJobSpringExecutor.setLogPath(logPath);
            xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
    
            return xxlJobSpringExecutor;
        }
    }

    7. 启动上面新建的客户端

    8. 查看任务调度中心, 可以看到有一个在线列表:

       可以看出客户端启动之后会自动向admin调度中心注册信息。

    9. 新增一个任务:

     再次查看数据库如下:

    mysql> select * from xxl_job_info where id = 2\G
    *************************** 1. row ***************************
                           id: 2
                    job_group: 2
                     job_desc: 自己的测试任务
                     add_time: 2021-11-03 22:18:15
                  update_time: 2021-11-03 22:18:15
                       author: qz
                  alarm_email: 99999@qq.com
                schedule_type: CRON
                schedule_conf: 59 * * * * ?
             misfire_strategy: DO_NOTHING
      executor_route_strategy: RANDOM
             executor_handler: XXLClassJob
               executor_param: defaultparam
      executor_block_strategy: SERIAL_EXECUTION
             executor_timeout: 0
    executor_fail_retry_count: 0
                    glue_type: BEAN
                  glue_source:
                  glue_remark: GLUE代码初始化
              glue_updatetime: 2021-11-03 22:18:15
                  child_jobid:
               trigger_status: 0
            trigger_last_time: 0
            trigger_next_time: 0
    1 row in set (0.00 sec)

     2. 调度中心和客户端通信

    1. 调度中心手动调用

     1. 从客户端选择任务然后选择执行一次, 输入参数: [1 2 3 4]

    2. 查看请求地址如下:

    /xxl-job-admin/jobinfo/trigger

    传递的参数如下:

    3. 查看接口/jobinfo/trigger, 源码如下:

        @RequestMapping("/trigger")
        @ResponseBody
        //@PermissionLimit(limit = false)
        public ReturnT<String> triggerJob(int id, String executorParam, String addressList) {
            // force cover job param
            if (executorParam == null) {
                executorParam = "";
            }
    
            JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, executorParam, addressList);
            return ReturnT.SUCCESS;
        }

    1》继续调用 com.xxl.job.admin.core.thread.JobTriggerPoolHelper#trigger

        public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
            helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
        }

    2》继续调用com.xxl.job.admin.core.thread.JobTriggerPoolHelper#addTrigger

        /**
         * add trigger
         */
        public void addTrigger(final int jobId,
                               final TriggerTypeEnum triggerType,
                               final int failRetryCount,
                               final String executorShardingParam,
                               final String executorParam,
                               final String addressList) {
    
            // choose thread pool
            ThreadPoolExecutor triggerPool_ = fastTriggerPool;
            AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
            if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
                triggerPool_ = slowTriggerPool;
            }
    
            // trigger
            triggerPool_.execute(new Runnable() {
                @Override
                public void run() {
    
                    long start = System.currentTimeMillis();
    
                    try {
                        // do trigger
                        XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    } finally {
    
                        // check timeout-count-map
                        long minTim_now = System.currentTimeMillis()/60000;
                        if (minTim != minTim_now) {
                            minTim = minTim_now;
                            jobTimeoutCountMap.clear();
                        }
    
                        // incr timeout-count-map
                        long cost = System.currentTimeMillis()-start;
                        if (cost > 500) {       // ob-timeout threshold 500ms
                            AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                            if (timeoutCount != null) {
                                timeoutCount.incrementAndGet();
                            }
                        }
    
                    }
    
                }
            });
        }

      这里可以看到核心逻辑是在线程池中去触发任务的执行。

    3》com.xxl.job.admin.core.trigger.XxlJobTrigger#trigger

        public static void trigger(int jobId,
                                   TriggerTypeEnum triggerType,
                                   int failRetryCount,
                                   String executorShardingParam,
                                   String executorParam,
                                   String addressList) {
    
            // load data
            XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
            if (jobInfo == null) {
                logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
                return;
            }
            if (executorParam != null) {
                jobInfo.setExecutorParam(executorParam);
            }
            int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
            XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
    
            // cover addressList
            if (addressList!=null && addressList.trim().length()>0) {
                group.setAddressType(1);
                group.setAddressList(addressList.trim());
            }
    
            // sharding param
            int[] shardingParam = null;
            if (executorShardingParam!=null){
                String[] shardingArr = executorShardingParam.split("/");
                if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
                    shardingParam = new int[2];
                    shardingParam[0] = Integer.valueOf(shardingArr[0]);
                    shardingParam[1] = Integer.valueOf(shardingArr[1]);
                }
            }
            if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
                    && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
                    && shardingParam==null) {
                for (int i = 0; i < group.getRegistryList().size(); i++) {
                    processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
                }
            } else {
                if (shardingParam == null) {
                    shardingParam = new int[]{0, 1};
                }
                processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
            }
    
        }

      这里可以看到根据任务ID,来查询到XxlJobInfo 任务信息。然后找到XxlJobGroup(任务所属组)。

    4》然后调用com.xxl.job.admin.core.trigger.XxlJobTrigger#processTrigger进行触发任务:

        /**
         * @param group                     job group, registry list may be empty
         * @param jobInfo
         * @param finalFailRetryCount
         * @param triggerType
         * @param index                     sharding index
         * @param total                     sharding index
         */
        private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
    
            // param
            ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy
            ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
            String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
    
            // 1、save log-id
            XxlJobLog jobLog = new XxlJobLog();
            jobLog.setJobGroup(jobInfo.getJobGroup());
            jobLog.setJobId(jobInfo.getId());
            jobLog.setTriggerTime(new Date());
            XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
            logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
    
            // 2、init trigger-param
            TriggerParam triggerParam = new TriggerParam();
            triggerParam.setJobId(jobInfo.getId());
            triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
            triggerParam.setExecutorParams(jobInfo.getExecutorParam());
            triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
            triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
            triggerParam.setLogId(jobLog.getId());
            triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
            triggerParam.setGlueType(jobInfo.getGlueType());
            triggerParam.setGlueSource(jobInfo.getGlueSource());
            triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
            triggerParam.setBroadcastIndex(index);
            triggerParam.setBroadcastTotal(total);
    
            // 3、init address
            String address = null;
            ReturnT<String> routeAddressResult = null;
            if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
                if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
                    if (index < group.getRegistryList().size()) {
                        address = group.getRegistryList().get(index);
                    } else {
                        address = group.getRegistryList().get(0);
                    }
                } else {
                    routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
                    if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
                        address = routeAddressResult.getContent();
                    }
                }
            } else {
                routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
            }
    
            // 4、trigger remote executor
            ReturnT<String> triggerResult = null;
            if (address != null) {
                triggerResult = runExecutor(triggerParam, address);
            } else {
                triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
            }
    
            // 5、collection trigger info
            StringBuffer triggerMsgSb = new StringBuffer();
            triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
            triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
            triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
                    .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
            triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
            triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
            if (shardingParam != null) {
                triggerMsgSb.append("("+shardingParam+")");
            }
            triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
            triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
            triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);
    
            triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
                    .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");
    
            // 6、save log trigger-info
            jobLog.setExecutorAddress(address);
            jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
            jobLog.setExecutorParam(jobInfo.getExecutorParam());
            jobLog.setExecutorShardingParam(shardingParam);
            jobLog.setExecutorFailRetryCount(finalFailRetryCount);
            //jobLog.setTriggerTime();
            jobLog.setTriggerCode(triggerResult.getCode());
            jobLog.setTriggerMsg(triggerMsgSb.toString());
            XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
    
            logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
        }

      构造成 TriggerParam 对象,然后继续调用。

    5》 如果有服务地址address,就继续执行:com.xxl.job.admin.core.trigger.XxlJobTrigger#runExecutor

        public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
            ReturnT<String> runResult = null;
            try {
                ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
                runResult = executorBiz.run(triggerParam);
            } catch (Exception e) {
                logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
                runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
            }
    
            StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
            runResultSB.append("<br>address:").append(address);
            runResultSB.append("<br>code:").append(runResult.getCode());
            runResultSB.append("<br>msg:").append(runResult.getMsg());
    
            runResult.setMsg(runResultSB.toString());
            return runResult;
        }

    第一步是获取:ExecutorBiz, 也就是根据地址获取对应的客户端执行器。com.xxl.job.admin.core.scheduler.XxlJobScheduler#getExecutorBiz:

        // ---------------------- executor-client ----------------------
        private static ConcurrentMap<String, ExecutorBiz> executorBizRepository = new ConcurrentHashMap<String, ExecutorBiz>();
        public static ExecutorBiz getExecutorBiz(String address) throws Exception {
            // valid
            if (address==null || address.trim().length()==0) {
                return null;
            }
    
            // load-cache
            address = address.trim();
            ExecutorBiz executorBiz = executorBizRepository.get(address);
            if (executorBiz != null) {
                return executorBiz;
            }
    
            // set-cache
            executorBiz = new ExecutorBizClient(address, XxlJobAdminConfig.getAdminConfig().getAccessToken());
    
            executorBizRepository.put(address, executorBiz);
            return executorBiz;
        }

    第二步: 继续调用com.xxl.job.core.biz.client.ExecutorBizClient#run

        public ReturnT<String> run(TriggerParam triggerParam) {
            return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
        }

      可以看出是地址加了run 进行远程调用。

    最后发起远程调用:com.xxl.job.core.util.XxlJobRemotingUtil#postBody

        public static ReturnT postBody(String url, String accessToken, int timeout, Object requestObj, Class returnTargClassOfT) {
            HttpURLConnection connection = null;
            BufferedReader bufferedReader = null;
            try {
                // connection
                URL realUrl = new URL(url);
                connection = (HttpURLConnection) realUrl.openConnection();
    
                // trust-https
                boolean useHttps = url.startsWith("https");
                if (useHttps) {
                    HttpsURLConnection https = (HttpsURLConnection) connection;
                    trustAllHosts(https);
                }
    
                // connection setting
                connection.setRequestMethod("POST");
                connection.setDoOutput(true);
                connection.setDoInput(true);
                connection.setUseCaches(false);
                connection.setReadTimeout(timeout * 1000);
                connection.setConnectTimeout(3 * 1000);
                connection.setRequestProperty("connection", "Keep-Alive");
                connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
                connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");
    
                if(accessToken!=null && accessToken.trim().length()>0){
                    connection.setRequestProperty(XXL_JOB_ACCESS_TOKEN, accessToken);
                }
    
                // do connection
                connection.connect();
    
                // write requestBody
                if (requestObj != null) {
                    String requestBody = GsonTool.toJson(requestObj);
    
                    DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
                    dataOutputStream.write(requestBody.getBytes("UTF-8"));
                    dataOutputStream.flush();
                    dataOutputStream.close();
                }
    
                /*byte[] requestBodyBytes = requestBody.getBytes("UTF-8");
                connection.setRequestProperty("Content-Length", String.valueOf(requestBodyBytes.length));
                OutputStream outwritestream = connection.getOutputStream();
                outwritestream.write(requestBodyBytes);
                outwritestream.flush();
                outwritestream.close();*/
    
                // valid StatusCode
                int statusCode = connection.getResponseCode();
                if (statusCode != 200) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-rpc remoting fail, StatusCode("+ statusCode +") invalid. for url : " + url);
                }
    
                // result
                bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
                StringBuilder result = new StringBuilder();
                String line;
                while ((line = bufferedReader.readLine()) != null) {
                    result.append(line);
                }
                String resultJson = result.toString();
    
                // parse returnT
                try {
                    ReturnT returnT = GsonTool.fromJson(resultJson, ReturnT.class, returnTargClassOfT);
                    return returnT;
                } catch (Exception e) {
                    logger.error("xxl-rpc remoting (url="+url+") response content invalid("+ resultJson +").", e);
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-rpc remoting (url="+url+") response content invalid("+ resultJson +").");
                }
    
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-rpc remoting error("+ e.getMessage() +"), for url : " + url);
            } finally {
                try {
                    if (bufferedReader != null) {
                        bufferedReader.close();
                    }
                    if (connection != null) {
                        connection.disconnect();
                    }
                } catch (Exception e2) {
                    logger.error(e2.getMessage(), e2);
                }
            }
        }

       可以看出是走的Java的HttpURLConnection 进行连接。也就是走HTTP进行数据传输。发送的时候携带了一些请求头,然后将上面的参数转为JSON传输到客户端。并且在最后的时候调用 connection.disconnect(); 将该连接关闭。 也就是每次请求都会新进行http 连接的建立。(和dubbo 有区别的是,dubbo 的客户端共用一个通道,走的dubbo 协议)。

      这里的XxlJobRemotingUtil 也可以用来自己作为http 工具类使用。

    2. 客户端接收数据逻辑

       上面可以看到admin 调用中心触发任务的时候实际是以http 的形式访问一个链接,链接的地址为 http://ip:port/run。 ip 是客户端的ip, port 是客户端配置的 xxl.job.executor.port 的信息。也就是cn.qz.cloud.config.XxlJobConfig 配置的信息。

      客户端我们引入的依赖只有: xxl-job-core, 并且通过如下方式注入了一个XXLexecotor:

    @Configuration
    public class XxlJobConfig {
    
        private static final Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
    
        static {
            // 手动通过如下方式注入到执行器容器。
            XxlJobExecutor.registJobHandler("XXLClassJob", new XXLClassJob());
        }
    
        @Value("${xxl.job.admin.addresses}")
        private String adminAddresses;
    
        @Value("${xxl.job.accessToken}")
        private String accessToken;
    
        @Value("${xxl.job.executor.appname}")
        private String appname;
    
        @Value("${xxl.job.executor.address}")
        private String address;
    
        @Value("${xxl.job.executor.ip}")
        private String ip;
    
        @Value("${xxl.job.executor.port}")
        private int port;
    
        @Value("${xxl.job.executor.logpath}")
        private String logPath;
    
        @Value("${xxl.job.executor.logretentiondays}")
        private int logRetentionDays;
    
        @Bean
        public XxlJobSpringExecutor xxlJobExecutor() {
            logger.info(">>>>>>>>>>> xxl-job config init.");
            XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
            xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
            xxlJobSpringExecutor.setAppname(appname);
            xxlJobSpringExecutor.setAddress(address);
            xxlJobSpringExecutor.setIp(ip);
            xxlJobSpringExecutor.setPort(port);
            xxlJobSpringExecutor.setAccessToken(accessToken);
            xxlJobSpringExecutor.setLogPath(logPath);
            xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
    
            return xxlJobSpringExecutor;
        }
    }

    继续跟踪可以看到是启动了一个EmbedServer, 也就是启动一个NettyServer 用于接收Http 请求。 查看源码如下:com.xxl.job.core.server.EmbedServer

    package com.xxl.job.core.server;
    
    import com.xxl.job.core.biz.ExecutorBiz;
    import com.xxl.job.core.biz.impl.ExecutorBizImpl;
    import com.xxl.job.core.biz.model.*;
    import com.xxl.job.core.thread.ExecutorRegistryThread;
    import com.xxl.job.core.util.GsonTool;
    import com.xxl.job.core.util.ThrowableUtil;
    import com.xxl.job.core.util.XxlJobRemotingUtil;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.http.*;
    import io.netty.handler.timeout.IdleStateEvent;
    import io.netty.handler.timeout.IdleStateHandler;
    import io.netty.util.CharsetUtil;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.concurrent.*;
    
    /**
     * Copy from : https://github.com/xuxueli/xxl-rpc
     *
     * @author xuxueli 2020-04-11 21:25
     */
    public class EmbedServer {
        private static final Logger logger = LoggerFactory.getLogger(EmbedServer.class);
    
        private ExecutorBiz executorBiz;
        private Thread thread;
    
        public void start(final String address, final int port, final String appname, final String accessToken) {
            executorBiz = new ExecutorBizImpl();
            thread = new Thread(new Runnable() {
    
                @Override
                public void run() {
    
                    // param
                    EventLoopGroup bossGroup = new NioEventLoopGroup();
                    EventLoopGroup workerGroup = new NioEventLoopGroup();
                    ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
                            0,
                            200,
                            60L,
                            TimeUnit.SECONDS,
                            new LinkedBlockingQueue<Runnable>(2000),
                            new ThreadFactory() {
                                @Override
                                public Thread newThread(Runnable r) {
                                    return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
                                }
                            },
                            new RejectedExecutionHandler() {
                                @Override
                                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                                    throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
                                }
                            });
    
    
                    try {
                        // start server
                        ServerBootstrap bootstrap = new ServerBootstrap();
                        bootstrap.group(bossGroup, workerGroup)
                                .channel(NioServerSocketChannel.class)
                                .childHandler(new ChannelInitializer<SocketChannel>() {
                                    @Override
                                    public void initChannel(SocketChannel channel) throws Exception {
                                        channel.pipeline()
                                                .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                                                .addLast(new HttpServerCodec())
                                                .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                                                .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                                    }
                                })
                                .childOption(ChannelOption.SO_KEEPALIVE, true);
    
                        // bind
                        ChannelFuture future = bootstrap.bind(port).sync();
    
                        logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
    
                        // start registry
                        startRegistry(appname, address);
    
                        // wait util stop
                        future.channel().closeFuture().sync();
    
                    } catch (InterruptedException e) {
                        if (e instanceof InterruptedException) {
                            logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
                        } else {
                            logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
                        }
                    } finally {
                        // stop
                        try {
                            workerGroup.shutdownGracefully();
                            bossGroup.shutdownGracefully();
                        } catch (Exception e) {
                            logger.error(e.getMessage(), e);
                        }
                    }
    
                }
    
            });
            thread.setDaemon(true);    // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
            thread.start();
        }
    
        public void stop() throws Exception {
            // destroy server thread
            if (thread!=null && thread.isAlive()) {
                thread.interrupt();
            }
    
            // stop registry
            stopRegistry();
            logger.info(">>>>>>>>>>> xxl-job remoting server destroy success.");
        }
    
    
        // ---------------------- registry ----------------------
    
        /**
         * netty_http
         *
         * Copy from : https://github.com/xuxueli/xxl-rpc
         *
         * @author xuxueli 2015-11-24 22:25:15
         */
        public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
            private static final Logger logger = LoggerFactory.getLogger(EmbedHttpServerHandler.class);
    
            private ExecutorBiz executorBiz;
            private String accessToken;
            private ThreadPoolExecutor bizThreadPool;
            public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) {
                this.executorBiz = executorBiz;
                this.accessToken = accessToken;
                this.bizThreadPool = bizThreadPool;
            }
    
            @Override
            protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
    
                // request parse
                //final byte[] requestBytes = ByteBufUtil.getBytes(msg.content());    // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);
                String requestData = msg.content().toString(CharsetUtil.UTF_8);
                String uri = msg.uri();
                HttpMethod httpMethod = msg.method();
                boolean keepAlive = HttpUtil.isKeepAlive(msg);
                String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);
    
                // invoke
                bizThreadPool.execute(new Runnable() {
                    @Override
                    public void run() {
                        // do invoke
                        Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
    
                        // to json
                        String responseJson = GsonTool.toJson(responseObj);
    
                        // write response
                        writeResponse(ctx, keepAlive, responseJson);
                    }
                });
            }
    
            private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
    
                // valid
                if (HttpMethod.POST != httpMethod) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
                }
                if (uri==null || uri.trim().length()==0) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
                }
                if (accessToken!=null
                        && accessToken.trim().length()>0
                        && !accessToken.equals(accessTokenReq)) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
                }
    
                // services mapping
                try {
                    if ("/beat".equals(uri)) {
                        return executorBiz.beat();
                    } else if ("/idleBeat".equals(uri)) {
                        IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
                        return executorBiz.idleBeat(idleBeatParam);
                    } else if ("/run".equals(uri)) {
                        TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
                        return executorBiz.run(triggerParam);
                    } else if ("/kill".equals(uri)) {
                        KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
                        return executorBiz.kill(killParam);
                    } else if ("/log".equals(uri)) {
                        LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
                        return executorBiz.log(logParam);
                    } else {
                        return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
                    }
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
                }
            }
    
            /**
             * write response
             */
            private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {
                // write response
                FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8));   //  Unpooled.wrappedBuffer(responseJson)
                response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8");       // HttpHeaderValues.TEXT_PLAIN.toString()
                response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
                if (keepAlive) {
                    response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
                }
                ctx.writeAndFlush(response);
            }
    
            @Override
            public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                ctx.flush();
            }
    
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                logger.error(">>>>>>>>>>> xxl-job provider netty_http server caught exception", cause);
                ctx.close();
            }
    
            @Override
            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                if (evt instanceof IdleStateEvent) {
                    ctx.channel().close();      // beat 3N, close if idle
                    logger.debug(">>>>>>>>>>> xxl-job provider netty_http server close an idle channel.");
                } else {
                    super.userEventTriggered(ctx, evt);
                }
            }
        }
    
        // ---------------------- registry ----------------------
    
        public void startRegistry(final String appname, final String address) {
            // start registry
            ExecutorRegistryThread.getInstance().start(appname, address);
        }
    
        public void stopRegistry() {
            // stop registry
            ExecutorRegistryThread.getInstance().toStop();
        }
    
    
    }

      可以看到,这里的逻辑是用ServerBootstrap 启动一个Netty的引导类。 并且添加了四个handler,这里的IdleStateHandler 实际上不会起很大的作用,因为客户端每次新进请求都是新的链接然后创建新的IdleStateHandler 以及后面三个Handler, 都会创建新的channel进行数据的传输。最重要的是EmbedHttpServerHandler。 com.xxl.job.core.server.EmbedServer.EmbedHttpServerHandler#channelRead0 是处理调度中心发来的数据的核心逻辑。 可以看到是用线程池: bizThreadPool 异步处理任务,大概分为三个逻辑。 调用方法、将结果转为JSON、写回调度中心。

      核心的调用方法run 逻辑交给: com.xxl.job.core.biz.impl.ExecutorBizImpl#run

        @Override
        public ReturnT<String> run(TriggerParam triggerParam) {
            // load old:jobHandler + jobThread
            JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
            IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
            String removeOldReason = null;
    
            // valid:jobHandler + jobThread
            GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
            if (GlueTypeEnum.BEAN == glueTypeEnum) {
    
                // new jobhandler
                IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
    
                // valid old jobThread
                if (jobThread!=null && jobHandler != newJobHandler) {
                    // change handler, need kill old thread
                    removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
    
                    jobThread = null;
                    jobHandler = null;
                }
    
                // valid handler
                if (jobHandler == null) {
                    jobHandler = newJobHandler;
                    if (jobHandler == null) {
                        return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
                    }
                }
    
            } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
    
                // valid old jobThread
                if (jobThread != null &&
                        !(jobThread.getHandler() instanceof GlueJobHandler
                            && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
                    // change handler or gluesource updated, need kill old thread
                    removeOldReason = "change job source or glue type, and terminate the old job thread.";
    
                    jobThread = null;
                    jobHandler = null;
                }
    
                // valid handler
                if (jobHandler == null) {
                    try {
                        IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
                        jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                        return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
                    }
                }
            } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
    
                // valid old jobThread
                if (jobThread != null &&
                        !(jobThread.getHandler() instanceof ScriptJobHandler
                                && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
                    // change script or gluesource updated, need kill old thread
                    removeOldReason = "change job source or glue type, and terminate the old job thread.";
    
                    jobThread = null;
                    jobHandler = null;
                }
    
                // valid handler
                if (jobHandler == null) {
                    jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
                }
            } else {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
            }
    
            // executor block strategy
            if (jobThread != null) {
                ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
                if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
                    // discard when running
                    if (jobThread.isRunningOrHasQueue()) {
                        return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
                    }
                } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
                    // kill running jobThread
                    if (jobThread.isRunningOrHasQueue()) {
                        removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
    
                        jobThread = null;
                    }
                } else {
                    // just queue trigger
                }
            }
    
            // replace thread (new or exists invalid)
            if (jobThread == null) {
                jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
            }
    
            // push data to queue
            ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
            return pushResult;
        }

        第一步: com.xxl.job.core.executor.XxlJobExecutor#loadJobHandler 找到handler

        private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
        public static IJobHandler loadJobHandler(String name){
            return jobHandlerRepository.get(name);
        }

        第二步: com.xxl.job.core.executor.XxlJobExecutor#registJobThread 注册到job 线程

        public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
            JobThread newJobThread = new JobThread(jobId, handler);
            newJobThread.start();
            logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});
    
            JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);    // putIfAbsent | oh my god, map's put method return the old value!!!
            if (oldJobThread != null) {
                oldJobThread.toStop(removeOldReason);
                oldJobThread.interrupt();
            }
    
            return newJobThread;
        }

        第三步: com.xxl.job.core.thread.JobThread 启动线程任务调用 (最终会调用到对应handler的execute 方法)

    package com.xxl.job.core.thread;
    
    import com.xxl.job.core.biz.model.HandleCallbackParam;
    import com.xxl.job.core.biz.model.ReturnT;
    import com.xxl.job.core.biz.model.TriggerParam;
    import com.xxl.job.core.context.XxlJobContext;
    import com.xxl.job.core.context.XxlJobHelper;
    import com.xxl.job.core.executor.XxlJobExecutor;
    import com.xxl.job.core.handler.IJobHandler;
    import com.xxl.job.core.log.XxlJobFileAppender;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.PrintWriter;
    import java.io.StringWriter;
    import java.util.Collections;
    import java.util.Date;
    import java.util.HashSet;
    import java.util.Set;
    import java.util.concurrent.*;
    
    
    /**
     * handler thread
     * @author xuxueli 2016-1-16 19:52:47
     */
    public class JobThread extends Thread{
        private static Logger logger = LoggerFactory.getLogger(JobThread.class);
    
        private int jobId;
        private IJobHandler handler;
        private LinkedBlockingQueue<TriggerParam> triggerQueue;
        private Set<Long> triggerLogIdSet;        // avoid repeat trigger for the same TRIGGER_LOG_ID
    
        private volatile boolean toStop = false;
        private String stopReason;
    
        private boolean running = false;    // if running job
        private int idleTimes = 0;            // idel times
    
    
        public JobThread(int jobId, IJobHandler handler) {
            this.jobId = jobId;
            this.handler = handler;
            this.triggerQueue = new LinkedBlockingQueue<TriggerParam>();
            this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>());
        }
        public IJobHandler getHandler() {
            return handler;
        }
    
        /**
         * new trigger to queue
         *
         * @param triggerParam
         * @return
         */
        public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
            // avoid repeat
            if (triggerLogIdSet.contains(triggerParam.getLogId())) {
                logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
                return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
            }
    
            triggerLogIdSet.add(triggerParam.getLogId());
            triggerQueue.add(triggerParam);
            return ReturnT.SUCCESS;
        }
    
        /**
         * kill job thread
         *
         * @param stopReason
         */
        public void toStop(String stopReason) {
            /**
             * Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep),
             * 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身;
             * 所以需要注意,此处彻底销毁本线程,需要通过共享变量方式;
             */
            this.toStop = true;
            this.stopReason = stopReason;
        }
    
        /**
         * is running job
         * @return
         */
        public boolean isRunningOrHasQueue() {
            return running || triggerQueue.size()>0;
        }
    
        @Override
        public void run() {
    
            // init
            try {
                handler.init();
            } catch (Throwable e) {
                logger.error(e.getMessage(), e);
            }
    
            // execute
            while(!toStop){
                running = false;
                idleTimes++;
    
                TriggerParam triggerParam = null;
                try {
                    // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
                    triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
                    if (triggerParam!=null) {
                        running = true;
                        idleTimes = 0;
                        triggerLogIdSet.remove(triggerParam.getLogId());
    
                        // log filename, like "logPath/yyyy-MM-dd/9999.log"
                        String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
                        XxlJobContext xxlJobContext = new XxlJobContext(
                                triggerParam.getJobId(),
                                triggerParam.getExecutorParams(),
                                logFileName,
                                triggerParam.getBroadcastIndex(),
                                triggerParam.getBroadcastTotal());
    
                        // init job context
                        XxlJobContext.setXxlJobContext(xxlJobContext);
    
                        // execute
                        XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());
    
                        if (triggerParam.getExecutorTimeout() > 0) {
                            // limit timeout
                            Thread futureThread = null;
                            try {
                                FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
                                    @Override
                                    public Boolean call() throws Exception {
    
                                        // init job context
                                        XxlJobContext.setXxlJobContext(xxlJobContext);
    
                                        handler.execute();
                                        return true;
                                    }
                                });
                                futureThread = new Thread(futureTask);
                                futureThread.start();
    
                                Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
                            } catch (TimeoutException e) {
    
                                XxlJobHelper.log("<br>----------- xxl-job job execute timeout");
                                XxlJobHelper.log(e);
    
                                // handle result
                                XxlJobHelper.handleTimeout("job execute timeout ");
                            } finally {
                                futureThread.interrupt();
                            }
                        } else {
                            // just execute
                            handler.execute();
                        }
    
                        // valid execute handle data
                        if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {
                            XxlJobHelper.handleFail("job handle result lost.");
                        } else {
                            String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();
                            tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)
                                    ?tempHandleMsg.substring(0, 50000).concat("...")
                                    :tempHandleMsg;
                            XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);
                        }
                        XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="
                                + XxlJobContext.getXxlJobContext().getHandleCode()
                                + ", handleMsg = "
                                + XxlJobContext.getXxlJobContext().getHandleMsg()
                        );
    
                    } else {
                        if (idleTimes > 30) {
                            if(triggerQueue.size() == 0) {    // avoid concurrent trigger causes jobId-lost
                                XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
                            }
                        }
                    }
                } catch (Throwable e) {
                    if (toStop) {
                        XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
                    }
    
                    // handle result
                    StringWriter stringWriter = new StringWriter();
                    e.printStackTrace(new PrintWriter(stringWriter));
                    String errorMsg = stringWriter.toString();
    
                    XxlJobHelper.handleFail(errorMsg);
    
                    XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
                } finally {
                    if(triggerParam != null) {
                        // callback handler info
                        if (!toStop) {
                            // commonm
                            TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                                    triggerParam.getLogId(),
                                    triggerParam.getLogDateTime(),
                                    XxlJobContext.getXxlJobContext().getHandleCode(),
                                    XxlJobContext.getXxlJobContext().getHandleMsg() )
                            );
                        } else {
                            // is killed
                            TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                                    triggerParam.getLogId(),
                                    triggerParam.getLogDateTime(),
                                    XxlJobContext.HANDLE_COCE_FAIL,
                                    stopReason + " [job running, killed]" )
                            );
                        }
                    }
                }
            }
    
            // callback trigger request in queue
            while(triggerQueue !=null && triggerQueue.size()>0){
                TriggerParam triggerParam = triggerQueue.poll();
                if (triggerParam!=null) {
                    // is killed
                    TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                            triggerParam.getLogId(),
                            triggerParam.getLogDateTime(),
                            XxlJobContext.HANDLE_COCE_FAIL,
                            stopReason + " [job not executed, in the job queue, killed.]")
                    );
                }
            }
    
            // destroy
            try {
                handler.destroy();
            } catch (Throwable e) {
                logger.error(e.getMessage(), e);
            }
    
            logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
        }
    }
    View Code

    总结:

    (1) xxl 客户端启动之后注入一个 XxlJobSpringExecutor, 这个在Spring 生命周期结束后会启动一个NettyServer, 以http 协议监听指定的端口
    (2) 调度中心进入任务调度的时候,实际是以JAVA的HttpURLConnection 和上面的NettyServer 建立连接之后发送数据。数据传输完成之后连接会断开。

  • 相关阅读:
    Android中实现下拉刷新
    Android中Parcelable接口用法
    Android px、dp、sp之间相互转换
    Android平台调用WebService详解
    Android开发之WebService介绍
    Xamarin.Forms XAML控件的公共属性
    构建伪Update服务器工具isr-evilgrade
    Xcode文件名后的字母含义
    设置USB数据监听
    Xamarin.Forms的基本页面和基本视图
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/15506173.html
Copyright © 2011-2022 走看看