zoukankan      html  css  js  c++  java
  • xxl-job源码分析

    1 调度中心API服务
    1、任务结果回调服务;
    2、执行器注册服务;
    3、执行器注册摘除服务;
    4、触发任务单次执行服务,支持任务根据业务事件触发;
    API暴露代码:com.xxl.job.admin.controller.JobApiController.java
    API服务位置:com.xxl.job.core.biz.AdminBiz.java
    通过请求参数匹配对应方法反射执行后把结果返回给客户端,见代码com.xxl.rpc.remoting.provider.XxlRpcProviderFactoryinvokeService方法invokeService
     
    2 任务注册/执行器注册
    任务注册以 "执行器" 为最小粒度进行注册; 每个任务通过其绑定的执行器可感知对应的执行器机器列表;注册表: 见"XXL_JOB_QRTZ_TRIGGER_REGISTRY"表。"执行器"注册代码见com.xxl.job.core.thread.ExecutorRegistryThread方法start,在"执行器"启动时通过远程调用com.xxl.job.core.biz.AdminBiz方法注册。"执行器" 在进行任务注册时将会周期性维护一条注册记录,即机器地址和AppName的绑定关系; "调度中心" 从而可以动态感知每个AppName在线的机器列表; 执行器注册: 任务注册Beat周期默认30s; 执行器以一倍Beat进行执行器注册, 调度中心以一倍Beat进行动态任务发现; 注册信息的失效时间被三倍Beat; 执行器注册摘除:执行器销毁时,将会主动上报调度中心并摘除对应的执行器机器信息,提高心跳注册的实时性。见代码com.xxl.job.admin.core.thread.JobRegistryMonitorHelper
     
    3 执行器API服务

    执行器提供了API服务,供调度中心选择使用,目前提供的API服务有:

    1、心跳检测:调度中心使用
    2、忙碌检测:调度中心使用
    3、触发任务执行:调度中心使用;本地进行任务开发时,可使用该API服务模拟触发任务;
    4、获取Rolling Log:调度中心使用
    5、终止任务:调度中心使用

    API服务位置:com.xxl.job.core.biz.ExecutorBiz
    API服务请求参考代码:com.xxl.job.executor.ExecutorBizTest

    API暴露代码片段

    com.xxl.job.core.executor.XxlJobExecutor.initRpcProvider()
            // add services
            xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());

    API远程调用代码,jetty启动注册一个handler,handler含有API远程调用处理逻辑

    com.xxl.rpc.remoting.net.impl.jetty.server.JettyServer.start()        
    JettyServer.this.server.setConnectors(new Connector[]{connector}); HandlerCollection handlerc = new HandlerCollection(); handlerc.setHandlers(new Handler[]{new JettyServerHandler(xxlRpcProviderFactory)}); JettyServer.this.server.setHandler(handlerc);
    com.xxl.rpc.remoting.net.impl.jetty.server.JettyServerHandler.handle()
          XxlRpcResponse xxlRpcResponse = this.xxlRpcProviderFactory.invokeService(xxlRpcRequest);
          byte[] responseBytes = this.xxlRpcProviderFactory.getSerializer().serialize(xxlRpcResponse);
          this.writeResponse(baseRequest, response, responseBytes);
    com.xxl.rpc.remoting.provider.XxlRpcProviderFactory.invokeService()
        XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
        xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());
        String serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion());
        Object serviceBean = this.serviceData.get(serviceKey);
        if (serviceBean == null) {
          xxlRpcResponse.setErrorMsg("The serviceKey[" + serviceKey + "] not found.");
          return xxlRpcResponse;
        } else if (System.currentTimeMillis() - xxlRpcRequest.getCreateMillisTime() > 180000L) {
          xxlRpcResponse.setErrorMsg("The timestamp difference between admin and executor exceeds the limit.");
          return xxlRpcResponse;
        } else if (this.accessToken != null && this.accessToken.trim().length() > 0 && !this.accessToken.trim().equals(xxlRpcRequest.getAccessToken())) {
          xxlRpcResponse.setErrorMsg("The access token[" + xxlRpcRequest.getAccessToken() + "] is wrong.");
          return xxlRpcResponse;
        } else {
          try {
            Class<?> serviceClass = serviceBean.getClass();
            String methodName = xxlRpcRequest.getMethodName();
            Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
            Object[] parameters = xxlRpcRequest.getParameters();
            Method method = serviceClass.getMethod(methodName, parameterTypes);
            method.setAccessible(true);
            Object result = method.invoke(serviceBean, parameters);
            xxlRpcResponse.setResult(result);
          } catch (Throwable var11) {
            logger.error("xxl-rpc provider invokeService error.", var11);
            xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(var11));
          }
    
          return xxlRpcResponse;
        }


     
     
    参考资料
  • 相关阅读:
    gc buffer busy/gcs log flush sync与log file sync
    给Oracle年轻的初学者的几点建议
    Android 编程下帧动画在 Activity 启动时自动运行的几种方式
    Android 编程下 Touch 事件的分发和消费机制
    Java 编程下 static 关键字
    Java 编程下 final 关键字
    Android 编程下模拟 HOME 键效果
    Why Are Thread.stop, Thread.suspend, Thread.resume and Runtime.runFinalizersOnExit Deprecated ?
    Extjs4 大型项目目录结构重构
    [转]SQLServer 2008 允许远程连接的配置方法
  • 原文地址:https://www.cnblogs.com/birdstudio/p/10246351.html
Copyright © 2011-2022 走看看