zoukankan      html  css  js  c++  java
  • xxl-job滥用netty导致的问题和解决方案

    netty作为一种高性能的网络编程框架,在很多开源项目中大放异彩,十分亮眼,但是在有些项目中却被滥用,导致使用者使用起来非常的难受。

    本篇文章将会讲解xxl-job作为一款分布式任务调度系统是如何滥用netty的,导致了怎样的后果以及如何修改源码解决这些问题。

    笔者使用的是2.3.0版本的xxl-job,也是当前的最新版本;下面所有的代码修改全部基于2.3.0版本的xxl-job源代码

    https://github.com/xuxueli/xxl-job/tree/2.3.0

    其中,xxl-job-admin对应着项目:https://github.com/xuxueli/xxl-job/tree/2.3.0/xxl-job-admin

    spring-boot项目对应着示例项目:https://github.com/xuxueli/xxl-job/tree/master/xxl-job-executor-samples/xxl-job-executor-sample-springboot

    一、xxl-job存在的多端口问题

    关于xxl-job如何使用的问题,可以参考我的另外一篇文章:分布式任务调度系统:xxl-job

    现在java开发基本上已经离不开spring boot了吧,我在spring boot中集成了xxl-job-core组件并且已经能够正常使用,但是一旦部署到测试环境就不行了,这是因为测试环境使用了docker,spring boot集成xxl-job-core组件之后会额外开启9999端口号给xxl-job-admin调用使用,如果docker不开启宿主机到docker的端口映射,xxl-job-admin自然就会调用失败。这导致了以下问题:

    • 每个spring boot程序都要开两个端口号,意味着同时运行着两个服务进行端口监听,浪费计算和内存资源
    • 如果使用docker部署,需要再额外做宿主机和容器的9999端口号的映射,否则外部的xxl-job-admin将无法访问。

    那如果两个不同的服务都集成了xxl-job,但是部署在同一台机器上,又会发生什么呢?答案是如果不指定特定端口号,两个服务肯定都要使用9999端口号,势必会端口冲突,但是xxl-job已经想到了9999端口号被占用的情况,如果9999端口号被占用,则会端口号加一再重试。

    xxl-job-core组件额外开启9999端口号到底合不合理?

    举个例子:spring boot程序集成swagger-ui是很常见的操作吧,也没见swagger-ui再额外开启端口号啊,我认为是不合理的。但是,我认为作者这样做也有他的考虑---并非所有程序都是spring-boot的程序,也有使用其它框架的程序,使用独立的netty server作为客户端能够保证在使用java的任意xxl-job客户端都能稳定的向xxl-job-admin提供服务。然而java开发者们绝大多数情况下都是使用spirng-boot构建程序,在这种情况下,作者偷懒没有构建专门在spirng boot框架下使用的xxl-job-core,而是想了个类似万金油的蠢招解决问题,让所有在spring-boot框架下的开发者都一起难受,实在是令人费解。

    二、源码追踪

    一切的起点要从spring-boot程序集成xxl-job-core说起,集成方式很简单,只需要成功创建一个XxlJobSpringExecutor Bean对象即可。

        @Bean
        public XxlJobSpringExecutor xxlJobExecutor() {
            logger.info(">>>>>>>>>>> xxl-job config init.");
            XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
            xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
            xxlJobSpringExecutor.setAppname(appname);
            xxlJobSpringExecutor.setAddress(address);
            xxlJobSpringExecutor.setIp(ip);
            xxlJobSpringExecutor.setPort(port);
            xxlJobSpringExecutor.setAccessToken(accessToken);
            xxlJobSpringExecutor.setLogPath(logPath);
            xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
    
            return xxlJobSpringExecutor;
        }
    

    XxlJobSpringExecutor对象创建完成之后会做一些xxl-job初始化的操作,包含连接xxl-job-admin以及启动netty server。

    展开XxlJobSpringExecutor源码,可以看到它实现了SmartInitializingSingleton接口,这就意味着Bean对象创建完成之后会回调afterSingletonsInstantiated接口

    // start
        @Override
        public void afterSingletonsInstantiated() {
    
            // init JobHandler Repository
            /*initJobHandlerRepository(applicationContext);*/
    
            // init JobHandler Repository (for method)
            initJobHandlerMethodRepository(applicationContext);
    
            // refresh GlueFactory
            GlueFactory.refreshInstance(1);
    
            // super start
            try {
                super.start();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    

    super.start();这行代码中,会调用父类XxlJobExecutor的start方法做初始化

    public void start() throws Exception {
    
            // init logpath
            XxlJobFileAppender.initLogPath(logPath);
    
            // init invoker, admin-client
            initAdminBizList(adminAddresses, accessToken);
    
    
            // init JobLogFileCleanThread
            JobLogFileCleanThread.getInstance().start(logRetentionDays);
    
            // init TriggerCallbackThread
            TriggerCallbackThread.getInstance().start();
    
            // init executor-server
            initEmbedServer(address, ip, port, appname, accessToken);
        }
    

    initEmbedServer(address, ip, port, appname, accessToken);这行代码做开启netty-server的操作

        private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
    
            // fill ip port
            port = port>0?port: NetUtil.findAvailablePort(9999);
            ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
    
            // generate address
            if (address==null || address.trim().length()==0) {
                String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is null
                address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
            }
    
            // accessToken
            if (accessToken==null || accessToken.trim().length()==0) {
                logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
            }
    
            // start
            embedServer = new EmbedServer();
            embedServer.start(address, port, appname, accessToken);
        }
    

    可以看到这里会创建EmbedServer对象,并且使用start方法开启netty-server,在这里就能看到熟悉的一大坨了

    image-20210507161810178

    除了开启读写空闲检测之外,就只做了一件事:开启http服务,也就是说,xxl-job-admin是通过http请求调用客户端的接口触发客户端的任务调度的。最终处理方法在EmbedHttpServerHandler类中,顺着EmbedHttpServerHandler类的方法找,可以最终找到处理的方法com.xxl.job.core.server.EmbedServer.EmbedHttpServerHandler#process

    private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
        // valid
        if (HttpMethod.POST != httpMethod) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
        }
        if (uri==null || uri.trim().length()==0) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
        }
        if (accessToken!=null
            && accessToken.trim().length()>0
            && !accessToken.equals(accessTokenReq)) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
        }
    
        // services mapping
        try {
            if ("/beat".equals(uri)) {
                return executorBiz.beat();
            } else if ("/idleBeat".equals(uri)) {
                IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
                return executorBiz.idleBeat(idleBeatParam);
            } else if ("/run".equals(uri)) {
                TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
                return executorBiz.run(triggerParam);
            } else if ("/kill".equals(uri)) {
                KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
                return executorBiz.kill(killParam);
            } else if ("/log".equals(uri)) {
                LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
                return executorBiz.log(logParam);
            } else {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
        }
    }
    

    从这段代码的逻辑可以看到

    • 只接受POST请求
    • 如果有token,则会校验token
    • 只提供/beat、/idelBeat、/run、/kill、/log 五个接口,所有请求的处理都会委托给executorBiz处理。

    最后,netty将executorBiz处理结果写回xxl-job-admin,然后请求就结束了。这里netty扮演的角色非常简单,我认为可以使用spring-mvc非常容易的替换掉它的功能。

    三、使用spring-mvc替换netty的功能

    1.新增spring-mvc代码

    这里要修改xxl-job-core的源代码,首先,加入spring-mvc的依赖

    		<!-- spring-web -->
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-web</artifactId>
    			<version>${spring.version}</version>
    			<scope>provided</scope>
    		</dependency>
    

    然后新增Controller文件

    package com.xxl.job.core.controller;
    
    import com.xxl.job.core.biz.impl.ExecutorBizImpl;
    import com.xxl.job.core.biz.model.*;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * @author kdyzm
     * @date 2021/5/7
     */
    @RestController
    public class XxlJobController {
    
        @PostMapping("/beat")
        public ReturnT<String> beat() {
            return new ExecutorBizImpl().beat();
        }
    
        @PostMapping("/idleBeat")
        public ReturnT<String> idleBeat(@RequestBody IdleBeatParam param) {
            return new ExecutorBizImpl().idleBeat(param);
        }
    
        @PostMapping("/run")
        public ReturnT<String> run(@RequestBody TriggerParam param) {
            return new ExecutorBizImpl().run(param);
        }
    
        @PostMapping("/kill")
        public ReturnT<String> kill(@RequestBody KillParam param) {
            return new ExecutorBizImpl().kill(param);
        }
    
        @PostMapping("/log")
        public ReturnT<LogResult> log(@RequestBody LogParam param) {
            return new ExecutorBizImpl().log(param);
        }
    }
    
    

    2.删除老代码&移除netty依赖

    之后,就要删除老的代码了,修改com.xxl.job.core.server.EmbedServer#start方法,清空所有代码,新增

    // start registry
    startRegistry(appname, address);
    

    然后删除EmbedServer类中的以下两个变量及相关的引用

        private ExecutorBiz executorBiz;
        private Thread thread;
    

    之后删除netty的依赖

    		<!-- ********************** embed server: netty + gson ********************** -->
    		<dependency>
    			<groupId>io.netty</groupId>
    			<artifactId>netty-all</artifactId>
    			<version>${netty-all.version}</version>
    		</dependency>
    

    将报错的代码全部删除,之后就可以编译成功了,当然这还不行。

    3.修改注册到xxl-job-admin的端口号

    注册的ip地址可以不用改,但是端口号要取spring-boot程序的端口号。

    因为要复用springk-boot容器的端口号,所以这里注册的端口号要和它保持一致,修改com.xxl.job.core.executor.XxlJobExecutor#initEmbedServer方法,注释掉

    port = port > 0 ? port : NetUtil.findAvailablePort(9999);
    

    然后修改spring-boot的配置文件,xxl-job的端口号配置改成server.port

    server.port=8081
    xxl.job.executor.port=${server.port}
    

    在创建XxlJobSpringExecutor Bean对象的时候将改值传递给它。

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
        return xxlJobSpringExecutor;
    }
    

    4.将xxl-job-core改造成spring-boot-starter

    上面改造完了之后已经将逻辑变更为使用spring-mvc,但是spring-boot程序还没有办法扫描到xxl-job-core中的controller,可以手动扫描包,这里推荐使用spring-boot-starter,这样只需要将xxl-job-core加入classpath,就可以自动生效。

    在 com.xxl.job.core.config包下新建Config类

    package com.xxl.job.core.config;
    
    import org.springframework.context.annotation.ComponentScan;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * @author kdyzm
     * @date 2021/5/7
     */
    @Configuration
    @ComponentScan(basePackages = {"com.xxl.job.core.controller"})
    public class Config {
    }
    

    src/main/resources/META-INF文件夹下新建spring.factories文件,文件内容如下

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=
    com.xxl.job.core.config.Config
    

    5.增加特殊前缀匹配

    上面修改之后将使用spring mvc接口替代原netty功能提供的http接口,但是暴露出的接口是/run、/beat、/kill这种有可能和宿主服务路径冲突的接口,为了防止出现路径冲突,做出以下修改

    修改com.xxl.job.core.controller.XxlJobController类,添加@RequestMapping("/xxl-job")

    @RestController
    @RequestMapping("/xxl-job")
    public class XxlJobController {
    	...
    }
    

    修改com.xxl.job.core.biz.client.ExecutorBizClient类,为每个请求添加/xxl-job前缀

    package com.xxl.job.core.biz.client;
    
    import com.xxl.job.core.biz.ExecutorBiz;
    import com.xxl.job.core.biz.model.*;
    import com.xxl.job.core.util.XxlJobRemotingUtil;
    
    /**
     * admin api test
     *
     * @author xuxueli 2017-07-28 22:14:52
     */
    public class ExecutorBizClient implements ExecutorBiz {
    
        public ExecutorBizClient() {
        }
        public ExecutorBizClient(String addressUrl, String accessToken) {
            this.addressUrl = addressUrl;
            this.accessToken = accessToken;
    
            // valid
            if (!this.addressUrl.endsWith("/")) {
                this.addressUrl = this.addressUrl + "/";
            }
        }
    
        private String addressUrl ;
        private String accessToken;
        private int timeout = 3;
    
    
        @Override
        public ReturnT<String> beat() {
            return XxlJobRemotingUtil.postBody(addressUrl+"xxl-job/beat", accessToken, timeout, "", String.class);
        }
    
        @Override
        public ReturnT<String> idleBeat(IdleBeatParam idleBeatParam){
            return XxlJobRemotingUtil.postBody(addressUrl+"xxl-job/idleBeat", accessToken, timeout, idleBeatParam, String.class);
        }
    
        @Override
        public ReturnT<String> run(TriggerParam triggerParam) {
            return XxlJobRemotingUtil.postBody(addressUrl + "xxl-job/run", accessToken, timeout, triggerParam, String.class);
        }
    
        @Override
        public ReturnT<String> kill(KillParam killParam) {
            return XxlJobRemotingUtil.postBody(addressUrl + "xxl-job/kill", accessToken, timeout, killParam, String.class);
        }
    
        @Override
        public ReturnT<LogResult> log(LogParam logParam) {
            return XxlJobRemotingUtil.postBody(addressUrl + "xxl-job/log", accessToken, timeout, logParam, LogResult.class);
        }
    
    }
    

    这样,就全部修改完了。

    四、测试

    重启xxl-job-executor-sample-springboot项目,查看注册到xxl-job-admin上的信息

    image-20210507173458329

    可以看到端口号已经不是默认的9999,而是和spring-boot程序保持一致的端口号,然后执行默认的job

    image-20210507173617917

    可以看到已经执行成功,在查看日志详情

    image-20210507173651632

    日志也一切正常,表示一切都改造成功了。

    完整的代码修改:https://github.com/kdyzm/xxl-job/commit/449ee5c7bbb659356af25b164c251f960b9a6891

    五、实际使用

    由于原作者基本上不理睬人,我克隆了项目2.3.0版本并且新增了2.4.1版本:https://github.com/kdyzm/xxl-job/releases/tag/2.4.1

    有需要的可以下载源代码自己打包xxl-job-core项目上传私服后就可以使用了

  • 相关阅读:
    抽象工厂模式
    工厂方法模式
    简单工厂模式
    多例模式
    深入分析 Java 中的中文编码问题
    PipedInputStream和PipedOutputStream详解
    单例模式
    Java IO和NIO文章目录
    wsdlLocation可以写成项目的相对路劲吗
    ssh框架配置事务管理器
  • 原文地址:https://www.cnblogs.com/kuangdaoyizhimei/p/14742067.html
Copyright © 2011-2022 走看看