zoukankan      html  css  js  c++  java
  • xxljob学习2:用户端注册

    用户端注册主要是执行器和任务注册:对应接入的业务系统

    在服务启动时时,XxlJobSpringExecutor继承了SmartInitializingSingleton,在bean初始化完成之后,调用afterSingletonsInstantiated方法

    @Override
    public void afterSingletonsInstantiated() {
    //1:初始化job任务
    // init JobHandler Repository (for method)
    initJobHandlerMethodRepository(applicationContext);

    // refresh GlueFactory
    GlueFactory.refreshInstance(1);

    // super start
    try {
    //2:注册执行器
    super.start();
    } catch (Exception e) {
    throw new RuntimeException(e);
    }
    }

    1):初始化job任务

     1 private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
     2         if (applicationContext == null) {
     3             return;
     4         }
     5         // init job handler from method
     6         String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
     7         for (String beanDefinitionName : beanDefinitionNames) {
     8             Object bean = applicationContext.getBean(beanDefinitionName);
     9 
    10             Map<Method, XxlJob> annotatedMethods = null;   // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
    11             try {
    12                 annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
    13                         new MethodIntrospector.MetadataLookup<XxlJob>() {
    14                             @Override
    15                             public XxlJob inspect(Method method) {
    16                                 return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
    17                             }
    18                         });
    19             } catch (Throwable ex) {
    20                 logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
    21             }
    22             if (annotatedMethods==null || annotatedMethods.isEmpty()) {
    23                 continue;
    24             }
    25 
    26             for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
    27                 Method executeMethod = methodXxlJobEntry.getKey();
    28                 XxlJob xxlJob = methodXxlJobEntry.getValue();
    29                 // regist 注册
    30                 registJobHandler(xxlJob, bean, executeMethod);
    31             }
    32         }
    33     }

    通过上下文applicationContext获取打了XxlJob注解的方法,并进行注册

    public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
            logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
            return jobHandlerRepository.put(name, jobHandler);
        }

    最终保存到ConcurrentMap<String, IJobHandler>,key即为注解的value值 @XxlJob("demoJobHandler")。在执行任务的时候,根据填写的JobHandler找到对应的方法执行

    2):注册执行器

    在初始化调度任务之后,调用XxlJobExecutor的start()方法进行执行器的注册
    public void start() throws Exception {
    
            // init logpath
            XxlJobFileAppender.initLogPath(logPath);
    
    
            // 1:admin地址
            initAdminBizList(adminAddresses, accessToken);
    
    
            // init JobLogFileCleanThread
            JobLogFileCleanThread.getInstance().start(logRetentionDays);
    
            // init TriggerCallbackThread
            TriggerCallbackThread.getInstance().start();
    
            // 2:通过netty,注册执行器监听端口
            initEmbedServer(address, ip, port, appname, accessToken);
        }

    第一步:登记admin的部署地址,以便接收注册请求,塞到AdminBizCilent对象供注册执行器使用;如下address:

    配置文件为application.properties,可在服务中自由配置,其中

    xxl.job.admin.addresses:服务端地址,注册任务对应执行器并持久化
    xxl.job.executor.ip:执行器地址
    xxl.job.executor.port=9999 执行器监听端口
    # web port
    server.port=8081
    # no web
    #spring.main.web-environment=false
    
    # log config
    logging.config=classpath:logback.xml
    
    
    ### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
    xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
    
    ### xxl-job, access token
    xxl.job.accessToken=
    
    ### xxl-job executor appname
    xxl.job.executor.appname=xxl-job-executor-sample
    ### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
    xxl.job.executor.address=
    ### xxl-job executor server-info
    xxl.job.executor.ip=
    xxl.job.executor.port=9999
    ### xxl-job executor log-path
    xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
    ### xxl-job executor log-retention-days
    xxl.job.executor.logretentiondays=30

     第二部:注册执行器

    调用com.xxl.job.core.executor.XxlJobExecutor#initEmbedServer方法,获取服务器ip后传入com.xxl.job.core.server.EmbedServer#start方法

    public void start(final String address, final int port, final String appname, final String accessToken) {
            executorBiz = new ExecutorBizImpl();
            thread = new Thread(new Runnable() {
    
                @Override
                public void run() {
    
                    // param
                    EventLoopGroup bossGroup = new NioEventLoopGroup();
                    EventLoopGroup workerGroup = new NioEventLoopGroup();
                    ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
                            0,
                            200,
                            60L,
                            TimeUnit.SECONDS,
                            new LinkedBlockingQueue<Runnable>(2000),
                            new ThreadFactory() {
                                @Override
                                public Thread newThread(Runnable r) {
                                    return new Thread(r, "xxl-job, EmbedServer bizThreadPool-" + r.hashCode());
                                }
                            },
                            new RejectedExecutionHandler() {
                                @Override
                                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                                    throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
                                }
                            });
    
    
                    try {
                        // start server
                        ServerBootstrap bootstrap = new ServerBootstrap();
                        bootstrap.group(bossGroup, workerGroup)
                                .channel(NioServerSocketChannel.class)
                                .childHandler(new ChannelInitializer<SocketChannel>() {
                                    @Override
                                    public void initChannel(SocketChannel channel) throws Exception {
                                        channel.pipeline()
                                                .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                                                .addLast(new HttpServerCodec())
                                                .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                                                 //执行的业务handler
                                                .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                                    }
                                })
                                .childOption(ChannelOption.SO_KEEPALIVE, true);
    
                        // bind
                        ChannelFuture future = bootstrap.bind(port).sync();
    
                        // start registry
                        startRegistry(appname, address);

    通过netty进行http业务交互,并通过EmbedHttpServerHandler.channelRead0方法接收服务端调度任务请求,执行器IP(取服务器IP)和端口(默认9999)。

    调用到admin的部署地址(http://127.0.0.1:8080/xxl-job-admin),名称为配置文件appName,如下:

    发送一个注册的请求到admin

    @Override
        public ReturnT<String> registry(RegistryParam registryParam) {
            return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
        }

    最终请求打到admin模块的com.xxl.job.admin.controller.JobApiController#api方法,收到注册信息:

     保存一条执行器注册信息到mysql。

    至此调度任务和执行器注册完成

  • 相关阅读:
    软件工程导论P53,习题2.4
    视图和数据表的区别
    无法从“object”转换为“string”
    Oracle 密码重置
    Struts2 上传下载
    Spring 事务管理
    JSP 指令和动作
    JS 禁用回车、后退事件、form 表单不可编辑
    关于 in 和 exist 的理解
    Oracle clob 操作函数
  • 原文地址:https://www.cnblogs.com/at20191018/p/15774198.html
Copyright © 2011-2022 走看看