zoukankan      html  css  js  c++  java
  • xxljob学习3:服务端一次调度

    在服务端,手动执行一次调用

    承载请求地址:com.xxl.job.admin.controller.JobInfoController#triggerJob方法,顺着往下

    public static void trigger(int jobId,
                                   TriggerTypeEnum triggerType,
                                   int failRetryCount,
                                   String executorShardingParam,
                                   String executorParam,
                                   String addressList) {
    
            // 1:获取执行任务信息
            XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
            //2:获取执行器
            XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
    
            // cover addressList
            if (addressList!=null && addressList.trim().length()>0) {
                group.setAddressType(1);
                group.setAddressList(addressList.trim());
            }
    
            // sharding param
            int[] shardingParam = null;
            if (executorShardingParam!=null){
                String[] shardingArr = executorShardingParam.split("/");
                if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
                    shardingParam = new int[2];
                    shardingParam[0] = Integer.valueOf(shardingArr[0]);
                    shardingParam[1] = Integer.valueOf(shardingArr[1]);
                }
            }
    //分片
    if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null) && group.getRegistryList()!=null && !group.getRegistryList().isEmpty() && shardingParam==null) { for (int i = 0; i < group.getRegistryList().size(); i++) { processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size()); } } else { if (shardingParam == null) { shardingParam = new int[]{0, 1}; } processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]); } }

    在获取到任务信息和执行器信息之后,通过路由策略寻址,进行远程调度

    private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
    
            ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  
           //1:路由策略 route strategy
            ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    
            String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
    
            // save log-id
    
     
            // init trigger-param
            
            
            // 2:寻址
            String address = null;
            ReturnT<String> routeAddressResult = null;
            if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
                if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
                    if (index < group.getRegistryList().size()) {
                        address = group.getRegistryList().get(index);
                    } else {
                        address = group.getRegistryList().get(0);
                    }
                } else {
                    routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
                    if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
                        address = routeAddressResult.getContent();
                    }
                }
            } else {
                routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
            }
    
            // 3、远程调度
            ReturnT<String> triggerResult = null;
            if (address != null) {
                triggerResult = runExecutor(triggerParam, address);
            } else {
                triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
            }
    
            // collection trigger info
            
            // save log trigger-info
           XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
    
        }

    路由的实现类有多个,“第一个”实现:

    public class ExecutorRouteFirst extends ExecutorRouter {
    
        @Override
        public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList){
            return new ReturnT<String>(addressList.get(0));
        }
    
    }

    整个debug信息,默认传参如下:

    获取地址后,进行远程调度

    public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
            ReturnT<String> runResult = null;
            try {
                ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
                runResult = executorBiz.run(triggerParam);
            } catch (Exception e) {
               
            }
            return runResult;
        }
    XxlJobScheduler.getExecutorBiz通过address获取对应的执行器客户端,最终通过com.xxl.job.core.biz.client.ExecutorBizClient#run发送了post请求进行远程调度,通过channel进行通信
    com.xxl.job.core.server.EmbedServer.EmbedHttpServerHandler#channelRead0
    protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
    
                // request parse
                //final byte[] requestBytes = ByteBufUtil.getBytes(msg.content());    // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);
                String requestData = msg.content().toString(CharsetUtil.UTF_8);
                String uri = msg.uri();
                HttpMethod httpMethod = msg.method();
                boolean keepAlive = HttpUtil.isKeepAlive(msg);
                String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);
    
                // invoke
                bizThreadPool.execute(new Runnable() {
                    @Override
                    public void run() {
                        // do invoke,业务调用
                        Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
    
                        // to json
                        String responseJson = GsonTool.toJson(responseObj);
    
                        // write response
                        writeResponse(ctx, keepAlive, responseJson);
                    }
                });
            }

    在process方法中,通过URI匹配到要执行的方法调用com.xxl.job.core.biz.impl.ExecutorBizImpl#run

        public ReturnT<String> run(TriggerParam triggerParam) {
            // load old:jobHandler + jobThread
            JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
           //1:获取job任务的执行方法,此处实现是MethodJobHandle
            IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
            String removeOldReason = null;
    
            // valid:jobHandler + jobThread
            GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
            if (GlueTypeEnum.BEAN == glueTypeEnum) {
    
                // 2:同1
                IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
    
                // valid old jobThread
                if (jobThread!=null && jobHandler != newJobHandler) {
                    // change handler, need kill old thread
                    removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
    
                    jobThread = null;
                    jobHandler = null;
                }
    
                // valid handler
                if (jobHandler == null) {
                    jobHandler = newJobHandler;
                    if (jobHandler == null) {
                        return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
                    }
                }
    
            } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) 
          {
    
          } else {}
           // executor block strategy
            if (jobThread != null) {
                //2:获取对应的阻塞策略,"顺序执行"
                ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
                
            }
    
            // 3:把构建好的要执行的job赛道queue中,进行解耦调度
            ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
            return pushResult;
        }

    通过上述1,2,3把最终要执行的job对象放到调度队列

    public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
            // avoid repeat
            if (triggerLogIdSet.contains(triggerParam.getLogId())) {
                logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
                return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
            }
    
            triggerLogIdSet.add(triggerParam.getLogId());
            triggerQueue.add(triggerParam);
            return ReturnT.SUCCESS;
        }

    通过com.xxl.job.core.thread.JobThread#run方法,从队列取任务执行

    @Override
        public void run() {
    
            // init
            try {
                handler.init();
            } catch (Throwable e) {
                logger.error(e.getMessage(), e);
            }
    
            // execute
            while(!toStop){
    
                        try {
                                FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
                                    @Override
                                    public Boolean call() throws Exception {
    
                                        // init job context
                                        XxlJobContext.setXxlJobContext(xxlJobContext);
                                        //method handler执行最终的结果
                                        handler.execute();
                                        return true;
                                    }
                                });
                                futureThread = new Thread(futureTask);
                                futureThread.start();
    
                                Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
                            } catch (TimeoutException e) {
    
                   }

    通过反射,调用到最终的client端对应的方法执行任务

  • 相关阅读:
    I.MX6 Surfaceflinger 机制
    理解 Android Fragment
    RPi 2B DDNS 动态域名
    RPi 2B IPC webcam server
    理解 Android MVP 开发模式
    I.MX6 system.img unpack repack
    can't set android permissions
    VMware Ubuntu 共享文件夹
    解决oracle数据库连接不上的问题
    perfect-scrollbar示例
  • 原文地址:https://www.cnblogs.com/at20191018/p/15783714.html
Copyright © 2011-2022 走看看