zoukankan      html  css  js  c++  java
  • xxljob源码(二)客户端源码

      简单了解下xxl-job 客户端启动过程相关操作。

     1. 客户端搭建过程

    1. pom 增加

            <!-- xxl-job-core -->
            <dependency>
                <groupId>com.xuxueli</groupId>
                <artifactId>xxl-job-core</artifactId>
                <version>2.2.0</version>
            </dependency>

    2. properties 配置文件增加:

    ### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
    xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
    ### xxl-job, access token
    xxl.job.accessToken=
    ### xxl-job executor appname
    xxl.job.executor.appname=xxl-job-executor-test
    ### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
    xxl.job.executor.address=
    ### xxl-job executor server-info
    xxl.job.executor.ip=
    xxl.job.executor.port=9999
    ### xxl-job executor log-path
    xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
    ### xxl-job executor log-retention-days
    xxl.job.executor.logretentiondays=30

    3. 增加配置类:

    package cn.qz.cloud.config;
    
    import cn.qz.cloud.job.XXLClassJob;
    import com.xxl.job.core.executor.XxlJobExecutor;
    import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class XxlJobConfig {
        private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
    
        static {
            // 手动通过如下方式注入到执行器容器。
            XxlJobExecutor.registJobHandler("XXLClassJob", new XXLClassJob());
        }
    
        @Value("${xxl.job.admin.addresses}")
        private String adminAddresses;
    
        @Value("${xxl.job.accessToken}")
        private String accessToken;
    
        @Value("${xxl.job.executor.appname}")
        private String appname;
    
        @Value("${xxl.job.executor.address}")
        private String address;
    
        @Value("${xxl.job.executor.ip}")
        private String ip;
    
        @Value("${xxl.job.executor.port}")
        private int port;
    
        @Value("${xxl.job.executor.logpath}")
        private String logPath;
    
        @Value("${xxl.job.executor.logretentiondays}")
        private int logRetentionDays;
    
        @Bean
        public XxlJobSpringExecutor xxlJobExecutor() {
            logger.info(">>>>>>>>>>> xxl-job config init.");
            XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
            xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
            xxlJobSpringExecutor.setAppname(appname);
            xxlJobSpringExecutor.setAddress(address);
            xxlJobSpringExecutor.setIp(ip);
            xxlJobSpringExecutor.setPort(port);
            xxlJobSpringExecutor.setAccessToken(accessToken);
            xxlJobSpringExecutor.setLogPath(logPath);
            xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
    
            return xxlJobSpringExecutor;
        }
    }

    4. 增加job

    (1) 第一个:

    package cn.qz.cloud.job;
    
    import com.xxl.job.core.biz.model.ReturnT;
    import com.xxl.job.core.handler.annotation.XxlJob;
    import com.xxl.job.core.log.XxlJobLogger;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;
    
    @Slf4j
    @Component
    public class FirstJob {
    
        @XxlJob(value = "firstJob", init = "init", destroy = "destroy")
        public ReturnT<String> execute(String param) {
            XxlJobLogger.log("XXL-JOB, firstJob. param: {}", param);
            log.info("XXL-JOB, firstJob. param: {}", param);
            return ReturnT.SUCCESS;
        }
    
        public void init() {
            log.info("init");
        }
    
        public void destroy() {
            log.info("destory");
        }
    }

    (2) 第二个

    package cn.qz.cloud.job;
    
    import com.xxl.job.core.biz.model.ReturnT;
    import com.xxl.job.core.handler.IJobHandler;
    import com.xxl.job.core.log.XxlJobLogger;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.TimeUnit;
    @Slf4j
    public class XXLClassJob extends IJobHandler {
    
        @Override
        public ReturnT<String> execute(String param) throws Exception {
            log.info("XXLClassJob execute, param: {}", param);
            XxlJobLogger.log("XXLClassJob execute, param: {}", param);
    
            for (int i = 0; i < 5; i++) {
                log.info("XXLClassJob start, i: {} ", i);
                XxlJobLogger.log("XXLClassJob start, i: {} ", i);
                TimeUnit.SECONDS.sleep(2);
            }
            return ReturnT.SUCCESS;
        }
    }

    5. Springboot 项目启动即可

    2. 启动原理研究

      xxl-job 启动核心是在com.xxl.job.core.executor.impl.XxlJobSpringExecutor 类。所以以这个类入口进行查看

    1. com.xxl.job.core.executor.impl.XxlJobSpringExecutor 源码:

    package com.xxl.job.core.executor.impl;
    
    import com.xxl.job.core.executor.XxlJobExecutor;
    import com.xxl.job.core.glue.GlueFactory;
    import com.xxl.job.core.handler.annotation.XxlJob;
    import com.xxl.job.core.handler.impl.MethodJobHandler;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.BeansException;
    import org.springframework.beans.factory.DisposableBean;
    import org.springframework.beans.factory.SmartInitializingSingleton;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.ApplicationContextAware;
    import org.springframework.core.MethodIntrospector;
    import org.springframework.core.annotation.AnnotatedElementUtils;
    
    import java.lang.reflect.Method;
    import java.util.Map;
    
    
    /**
     * xxl-job executor (for spring)
     *
     * @author xuxueli 2018-11-01 09:24:52
     */
    public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {
        private static final Logger logger = LoggerFactory.getLogger(XxlJobSpringExecutor.class);
    
    
        // start
        @Override
        public void afterSingletonsInstantiated() {
    
            // init JobHandler Repository
            /*initJobHandlerRepository(applicationContext);*/
    
            // init JobHandler Repository (for method)
            initJobHandlerMethodRepository(applicationContext);
    
            // refresh GlueFactory
            GlueFactory.refreshInstance(1);
    
            // super start
            try {
                super.start();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    
        // destroy
        @Override
        public void destroy() {
            super.destroy();
        }
    
    
        /*private void initJobHandlerRepository(ApplicationContext applicationContext) {
            if (applicationContext == null) {
                return;
            }
    
            // init job handler action
            Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class);
    
            if (serviceBeanMap != null && serviceBeanMap.size() > 0) {
                for (Object serviceBean : serviceBeanMap.values()) {
                    if (serviceBean instanceof IJobHandler) {
                        String name = serviceBean.getClass().getAnnotation(JobHandler.class).value();
                        IJobHandler handler = (IJobHandler) serviceBean;
                        if (loadJobHandler(name) != null) {
                            throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
                        }
                        registJobHandler(name, handler);
                    }
                }
            }
        }*/
    
        private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
            if (applicationContext == null) {
                return;
            }
            // init job handler from method
            String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
            for (String beanDefinitionName : beanDefinitionNames) {
                Object bean = applicationContext.getBean(beanDefinitionName);
    
                Map<Method, XxlJob> annotatedMethods = null;   // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
                try {
                    annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                            new MethodIntrospector.MetadataLookup<XxlJob>() {
                                @Override
                                public XxlJob inspect(Method method) {
                                    return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
                                }
                            });
                } catch (Throwable ex) {
                    logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
                }
                if (annotatedMethods==null || annotatedMethods.isEmpty()) {
                    continue;
                }
    
                for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
                    Method executeMethod = methodXxlJobEntry.getKey();
                    XxlJob xxlJob = methodXxlJobEntry.getValue();
                    if (xxlJob == null) {
                        continue;
                    }
    
                    String name = xxlJob.value();
                    if (name.trim().length() == 0) {
                        throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
                    }
                    if (loadJobHandler(name) != null) {
                        throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
                    }
    
                    // execute method
                    /*if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
                        throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
                                "The correct method format like \" public ReturnT<String> execute(String param) \" .");
                    }
                    if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {
                        throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
                                "The correct method format like \" public ReturnT<String> execute(String param) \" .");
                    }*/
    
                    executeMethod.setAccessible(true);
    
                    // init and destory
                    Method initMethod = null;
                    Method destroyMethod = null;
    
                    if (xxlJob.init().trim().length() > 0) {
                        try {
                            initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
                            initMethod.setAccessible(true);
                        } catch (NoSuchMethodException e) {
                            throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
                        }
                    }
                    if (xxlJob.destroy().trim().length() > 0) {
                        try {
                            destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
                            destroyMethod.setAccessible(true);
                        } catch (NoSuchMethodException e) {
                            throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
                        }
                    }
    
                    // registry jobhandler
                    registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
                }
            }
    
        }
    
        // ---------------------- applicationContext ----------------------
        private static ApplicationContext applicationContext;
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = applicationContext;
        }
    
        public static ApplicationContext getApplicationContext() {
            return applicationContext;
        }
    
    }

    1. 父类查看: com.xxl.job.core.executor.XxlJobExecutor 

    package com.xxl.job.core.executor;
    
    import com.xxl.job.core.biz.AdminBiz;
    import com.xxl.job.core.biz.client.AdminBizClient;
    import com.xxl.job.core.handler.IJobHandler;
    import com.xxl.job.core.log.XxlJobFileAppender;
    import com.xxl.job.core.server.EmbedServer;
    import com.xxl.job.core.thread.JobLogFileCleanThread;
    import com.xxl.job.core.thread.JobThread;
    import com.xxl.job.core.thread.TriggerCallbackThread;
    import com.xxl.job.core.util.IpUtil;
    import com.xxl.job.core.util.NetUtil;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentMap;
    
    /**
     * Created by xuxueli on 2016/3/2 21:14.
     */
    public class XxlJobExecutor  {
        private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class);
    
        // ---------------------- param ----------------------
        private String adminAddresses;
        private String accessToken;
        private String appname;
        private String address;
        private String ip;
        private int port;
        private String logPath;
        private int logRetentionDays;
    
        public void setAdminAddresses(String adminAddresses) {
            this.adminAddresses = adminAddresses;
        }
        public void setAccessToken(String accessToken) {
            this.accessToken = accessToken;
        }
        public void setAppname(String appname) {
            this.appname = appname;
        }
        public void setAddress(String address) {
            this.address = address;
        }
        public void setIp(String ip) {
            this.ip = ip;
        }
        public void setPort(int port) {
            this.port = port;
        }
        public void setLogPath(String logPath) {
            this.logPath = logPath;
        }
        public void setLogRetentionDays(int logRetentionDays) {
            this.logRetentionDays = logRetentionDays;
        }
    
    
        // ---------------------- start + stop ----------------------
        public void start() throws Exception {
    
            // init logpath
            XxlJobFileAppender.initLogPath(logPath);
    
            // init invoker, admin-client
            initAdminBizList(adminAddresses, accessToken);
    
    
            // init JobLogFileCleanThread
            JobLogFileCleanThread.getInstance().start(logRetentionDays);
    
            // init TriggerCallbackThread
            TriggerCallbackThread.getInstance().start();
    
            // init executor-server
            initEmbedServer(address, ip, port, appname, accessToken);
        }
        public void destroy(){
            // destory executor-server
            stopEmbedServer();
    
            // destory jobThreadRepository
            if (jobThreadRepository.size() > 0) {
                for (Map.Entry<Integer, JobThread> item: jobThreadRepository.entrySet()) {
                    JobThread oldJobThread = removeJobThread(item.getKey(), "web container destroy and kill the job.");
                    // wait for job thread push result to callback queue
                    if (oldJobThread != null) {
                        try {
                            oldJobThread.join();
                        } catch (InterruptedException e) {
                            logger.error(">>>>>>>>>>> xxl-job, JobThread destroy(join) error, jobId:{}", item.getKey(), e);
                        }
                    }
                }
                jobThreadRepository.clear();
            }
            jobHandlerRepository.clear();
    
    
            // destory JobLogFileCleanThread
            JobLogFileCleanThread.getInstance().toStop();
    
            // destory TriggerCallbackThread
            TriggerCallbackThread.getInstance().toStop();
    
        }
    
    
        // ---------------------- admin-client (rpc invoker) ----------------------
        private static List<AdminBiz> adminBizList;
        private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
            if (adminAddresses!=null && adminAddresses.trim().length()>0) {
                for (String address: adminAddresses.trim().split(",")) {
                    if (address!=null && address.trim().length()>0) {
    
                        AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);
    
                        if (adminBizList == null) {
                            adminBizList = new ArrayList<AdminBiz>();
                        }
                        adminBizList.add(adminBiz);
                    }
                }
            }
        }
        public static List<AdminBiz> getAdminBizList(){
            return adminBizList;
        }
    
        // ---------------------- executor-server (rpc provider) ----------------------
        private EmbedServer embedServer = null;
    
        private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
    
            // fill ip port
            port = port>0?port: NetUtil.findAvailablePort(9999);
            ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
    
            // generate address
            if (address==null || address.trim().length()==0) {
                String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is null
                address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
            }
    
            // accessToken
            if (accessToken==null || accessToken.trim().length()==0) {
                logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
            }
    
            // start
            embedServer = new EmbedServer();
            embedServer.start(address, port, appname, accessToken);
        }
    
        private void stopEmbedServer() {
            // stop provider factory
            if (embedServer != null) {
                try {
                    embedServer.stop();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    
    
        // ---------------------- job handler repository ----------------------
        private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
        public static IJobHandler loadJobHandler(String name){
            return jobHandlerRepository.get(name);
        }
        public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
            logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
            return jobHandlerRepository.put(name, jobHandler);
        }
    
    
        // ---------------------- job thread repository ----------------------
        private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();
        public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
            JobThread newJobThread = new JobThread(jobId, handler);
            newJobThread.start();
            logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});
    
            JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);    // putIfAbsent | oh my god, map's put method return the old value!!!
            if (oldJobThread != null) {
                oldJobThread.toStop(removeOldReason);
                oldJobThread.interrupt();
            }
    
            return newJobThread;
        }
        public static JobThread removeJobThread(int jobId, String removeOldReason){
            JobThread oldJobThread = jobThreadRepository.remove(jobId);
            if (oldJobThread != null) {
                oldJobThread.toStop(removeOldReason);
                oldJobThread.interrupt();
    
                return oldJobThread;
            }
            return null;
        }
        public static JobThread loadJobThread(int jobId){
            JobThread jobThread = jobThreadRepository.get(jobId);
            return jobThread;
        }
    
    }
    View Code

    这里面核心逻辑如下:

    (1) 记录param

    (2) start 和 stop 方法。 start 方法比较重要。下面着重看start 方法

    1》 XxlJobFileAppender.initLogPath(logPath); 创建以及记录日志路径

    package com.xxl.job.core.log;
    
    import com.xxl.job.core.biz.model.LogResult;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.*;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    /**
     * store trigger log in each log-file
     * @author xuxueli 2016-3-12 19:25:12
     */
    public class XxlJobFileAppender {
        private static Logger logger = LoggerFactory.getLogger(XxlJobFileAppender.class);
    
        /**
         * log base path
         *
         * strut like:
         *     ---/
         *     ---/gluesource/
         *     ---/gluesource/10_1514171108000.js
         *     ---/gluesource/10_1514171108000.js
         *     ---/2017-12-25/
         *     ---/2017-12-25/639.log
         *     ---/2017-12-25/821.log
         *
         */
        private static String logBasePath = "/data/applogs/xxl-job/jobhandler";
        private static String glueSrcPath = logBasePath.concat("/gluesource");
        public static void initLogPath(String logPath){
            // init
            if (logPath!=null && logPath.trim().length()>0) {
                logBasePath = logPath;
            }
            // mk base dir
            File logPathDir = new File(logBasePath);
            if (!logPathDir.exists()) {
                logPathDir.mkdirs();
            }
            logBasePath = logPathDir.getPath();
    
            // mk glue dir
            File glueBaseDir = new File(logPathDir, "gluesource");
            if (!glueBaseDir.exists()) {
                glueBaseDir.mkdirs();
            }
            glueSrcPath = glueBaseDir.getPath();
        }
        public static String getLogPath() {
            return logBasePath;
        }
        public static String getGlueSrcPath() {
            return glueSrcPath;
        }
    
        /**
         * log filename, like "logPath/yyyy-MM-dd/9999.log"
         *
         * @param triggerDate
         * @param logId
         * @return
         */
        public static String makeLogFileName(Date triggerDate, long logId) {
    
            // filePath/yyyy-MM-dd
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");    // avoid concurrent problem, can not be static
            File logFilePath = new File(getLogPath(), sdf.format(triggerDate));
            if (!logFilePath.exists()) {
                logFilePath.mkdir();
            }
    
            // filePath/yyyy-MM-dd/9999.log
            String logFileName = logFilePath.getPath()
                    .concat(File.separator)
                    .concat(String.valueOf(logId))
                    .concat(".log");
            return logFileName;
        }
    
        /**
         * append log
         *
         * @param logFileName
         * @param appendLog
         */
        public static void appendLog(String logFileName, String appendLog) {
    
            // log file
            if (logFileName==null || logFileName.trim().length()==0) {
                return;
            }
            File logFile = new File(logFileName);
    
            if (!logFile.exists()) {
                try {
                    logFile.createNewFile();
                } catch (IOException e) {
                    logger.error(e.getMessage(), e);
                    return;
                }
            }
    
            // log
            if (appendLog == null) {
                appendLog = "";
            }
            appendLog += "\r\n";
            
            // append file content
            FileOutputStream fos = null;
            try {
                fos = new FileOutputStream(logFile, true);
                fos.write(appendLog.getBytes("utf-8"));
                fos.flush();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {
                if (fos != null) {
                    try {
                        fos.close();
                    } catch (IOException e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
            
        }
    
        /**
         * support read log-file
         *
         * @param logFileName
         * @return log content
         */
        public static LogResult readLog(String logFileName, int fromLineNum){
    
            // valid log file
            if (logFileName==null || logFileName.trim().length()==0) {
                return new LogResult(fromLineNum, 0, "readLog fail, logFile not found", true);
            }
            File logFile = new File(logFileName);
    
            if (!logFile.exists()) {
                return new LogResult(fromLineNum, 0, "readLog fail, logFile not exists", true);
            }
    
            // read file
            StringBuffer logContentBuffer = new StringBuffer();
            int toLineNum = 0;
            LineNumberReader reader = null;
            try {
                //reader = new LineNumberReader(new FileReader(logFile));
                reader = new LineNumberReader(new InputStreamReader(new FileInputStream(logFile), "utf-8"));
                String line = null;
    
                while ((line = reader.readLine())!=null) {
                    toLineNum = reader.getLineNumber();        // [from, to], start as 1
                    if (toLineNum >= fromLineNum) {
                        logContentBuffer.append(line).append("\n");
                    }
                }
            } catch (IOException e) {
                logger.error(e.getMessage(), e);
            } finally {
                if (reader != null) {
                    try {
                        reader.close();
                    } catch (IOException e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
    
            // result
            LogResult logResult = new LogResult(fromLineNum, toLineNum, logContentBuffer.toString(), false);
            return logResult;
    
            /*
            // it will return the number of characters actually skipped
            reader.skip(Long.MAX_VALUE);
            int maxLineNum = reader.getLineNumber();
            maxLineNum++;    // 最大行号
            */
        }
    
        /**
         * read log data
         * @param logFile
         * @return log line content
         */
        public static String readLines(File logFile){
            BufferedReader reader = null;
            try {
                reader = new BufferedReader(new InputStreamReader(new FileInputStream(logFile), "utf-8"));
                if (reader != null) {
                    StringBuilder sb = new StringBuilder();
                    String line = null;
                    while ((line = reader.readLine()) != null) {
                        sb.append(line).append("\n");
                    }
                    return sb.toString();
                }
            } catch (IOException e) {
                logger.error(e.getMessage(), e);
            } finally {
                if (reader != null) {
                    try {
                        reader.close();
                    } catch (IOException e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
            return null;
        }
    
    }
    View Code

    2》 initAdminBizList(adminAddresses, accessToken); 这里是记录服务器相关的地址和accessToken, 用于向服务器端发送数据,主要是注册本机地址和端口。

    com.xxl.job.core.biz.client.AdminBizClient:

    package com.xxl.job.core.biz.client;
    
    import com.xxl.job.core.biz.AdminBiz;
    import com.xxl.job.core.biz.model.HandleCallbackParam;
    import com.xxl.job.core.biz.model.RegistryParam;
    import com.xxl.job.core.biz.model.ReturnT;
    import com.xxl.job.core.util.XxlJobRemotingUtil;
    
    import java.util.List;
    
    /**
     * admin api test
     *
     * @author xuxueli 2017-07-28 22:14:52
     */
    public class AdminBizClient implements AdminBiz {
    
        public AdminBizClient() {
        }
        public AdminBizClient(String addressUrl, String accessToken) {
            this.addressUrl = addressUrl;
            this.accessToken = accessToken;
    
            // valid
            if (!this.addressUrl.endsWith("/")) {
                this.addressUrl = this.addressUrl + "/";
            }
        }
    
        private String addressUrl ;
        private String accessToken;
        private int timeout = 3;
    
    
        @Override
        public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
            return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
        }
    
        @Override
        public ReturnT<String> registry(RegistryParam registryParam) {
            return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
        }
    
        @Override
        public ReturnT<String> registryRemove(RegistryParam registryParam) {
            return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, timeout, registryParam, String.class);
        }
    
    }
    View Code

      可以看到其方法主要也是通过com.xxl.job.core.util.XxlJobRemotingUtil#postBody 和调用中心建立连接后走http 协议发送数据。‘’

        public static ReturnT postBody(String url, String accessToken, int timeout, Object requestObj, Class returnTargClassOfT) {
            HttpURLConnection connection = null;
            BufferedReader bufferedReader = null;
            try {
                // connection
                URL realUrl = new URL(url);
                connection = (HttpURLConnection) realUrl.openConnection();
    
                // trust-https
                boolean useHttps = url.startsWith("https");
                if (useHttps) {
                    HttpsURLConnection https = (HttpsURLConnection) connection;
                    trustAllHosts(https);
                }
    
                // connection setting
                connection.setRequestMethod("POST");
                connection.setDoOutput(true);
                connection.setDoInput(true);
                connection.setUseCaches(false);
                connection.setReadTimeout(timeout * 1000);
                connection.setConnectTimeout(3 * 1000);
                connection.setRequestProperty("connection", "Keep-Alive");
                connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
                connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");
    
                if(accessToken!=null && accessToken.trim().length()>0){
                    connection.setRequestProperty(XXL_JOB_ACCESS_TOKEN, accessToken);
                }
    
                // do connection
                connection.connect();
    
                // write requestBody
                if (requestObj != null) {
                    String requestBody = GsonTool.toJson(requestObj);
    
                    DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
                    dataOutputStream.write(requestBody.getBytes("UTF-8"));
                    dataOutputStream.flush();
                    dataOutputStream.close();
                }
    
                /*byte[] requestBodyBytes = requestBody.getBytes("UTF-8");
                connection.setRequestProperty("Content-Length", String.valueOf(requestBodyBytes.length));
                OutputStream outwritestream = connection.getOutputStream();
                outwritestream.write(requestBodyBytes);
                outwritestream.flush();
                outwritestream.close();*/
    
                // valid StatusCode
                int statusCode = connection.getResponseCode();
                if (statusCode != 200) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-rpc remoting fail, StatusCode("+ statusCode +") invalid. for url : " + url);
                }
    
                // result
                bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
                StringBuilder result = new StringBuilder();
                String line;
                while ((line = bufferedReader.readLine()) != null) {
                    result.append(line);
                }
                String resultJson = result.toString();
    
                // parse returnT
                try {
                    ReturnT returnT = GsonTool.fromJson(resultJson, ReturnT.class, returnTargClassOfT);
                    return returnT;
                } catch (Exception e) {
                    logger.error("xxl-rpc remoting (url="+url+") response content invalid("+ resultJson +").", e);
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-rpc remoting (url="+url+") response content invalid("+ resultJson +").");
                }
    
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-rpc remoting error("+ e.getMessage() +"), for url : " + url);
            } finally {
                try {
                    if (bufferedReader != null) {
                        bufferedReader.close();
                    }
                    if (connection != null) {
                        connection.disconnect();
                    }
                } catch (Exception e2) {
                    logger.error(e2.getMessage(), e2);
                }
            }
        }
    View Code

    3》 JobLogFileCleanThread.getInstance().start(logRetentionDays);

      这个方法实际上是启动一个单线程去清空指定的日志文件。

    package com.xxl.job.core.thread;
    
    import com.xxl.job.core.log.XxlJobFileAppender;
    import com.xxl.job.core.util.FileUtil;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.File;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Calendar;
    import java.util.Date;
    import java.util.concurrent.TimeUnit;
    
    /**
     * job file clean thread
     *
     * @author xuxueli 2017-12-29 16:23:43
     */
    public class JobLogFileCleanThread {
        private static Logger logger = LoggerFactory.getLogger(JobLogFileCleanThread.class);
    
        private static JobLogFileCleanThread instance = new JobLogFileCleanThread();
        public static JobLogFileCleanThread getInstance(){
            return instance;
        }
    
        private Thread localThread;
        private volatile boolean toStop = false;
        public void start(final long logRetentionDays){
    
            // limit min value
            if (logRetentionDays < 3 ) {
                return;
            }
    
            localThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (!toStop) {
                        try {
                            // clean log dir, over logRetentionDays
                            File[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles();
                            if (childDirs!=null && childDirs.length>0) {
    
                                // today
                                Calendar todayCal = Calendar.getInstance();
                                todayCal.set(Calendar.HOUR_OF_DAY,0);
                                todayCal.set(Calendar.MINUTE,0);
                                todayCal.set(Calendar.SECOND,0);
                                todayCal.set(Calendar.MILLISECOND,0);
    
                                Date todayDate = todayCal.getTime();
    
                                for (File childFile: childDirs) {
    
                                    // valid
                                    if (!childFile.isDirectory()) {
                                        continue;
                                    }
                                    if (childFile.getName().indexOf("-") == -1) {
                                        continue;
                                    }
    
                                    // file create date
                                    Date logFileCreateDate = null;
                                    try {
                                        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
                                        logFileCreateDate = simpleDateFormat.parse(childFile.getName());
                                    } catch (ParseException e) {
                                        logger.error(e.getMessage(), e);
                                    }
                                    if (logFileCreateDate == null) {
                                        continue;
                                    }
    
                                    if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) {
                                        FileUtil.deleteRecursively(childFile);
                                    }
    
                                }
                            }
    
                        } catch (Exception e) {
                            if (!toStop) {
                                logger.error(e.getMessage(), e);
                            }
    
                        }
    
                        try {
                            TimeUnit.DAYS.sleep(1);
                        } catch (InterruptedException e) {
                            if (!toStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                    }
                    logger.info(">>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destory.");
    
                }
            });
            localThread.setDaemon(true);
            localThread.setName("xxl-job, executor JobLogFileCleanThread");
            localThread.start();
        }
    
        public void toStop() {
            toStop = true;
    
            if (localThread == null) {
                return;
            }
    
            // interrupt and wait
            localThread.interrupt();
            try {
                localThread.join();
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            }
        }
    
    }
    View Code

      可以看出这里是一个单线程的用法。 这个线程可以理解为一个定时任务,while 中线程休眠一天,也就是以1天为单位进行扫描文件进行删除。(这里也可以看出用while + sleep 实现定时任务的用法, sleep 会释放CPU)

    4》 TriggerCallbackThread.getInstance().start(); 这里启动一个callback 任务执行线程

    5》 initEmbedServer(address, ip, port, appname, accessToken);  这个就是获取到配置信息,然后启动一个内嵌的NettyServer, 接收http 请求的数据

    这里面核心的几部分:

    第一步: NetUtil.findAvailablePort(9999) 发现一个可用的端口(也就是用new ServerSocket(port) 进行监测, 不抛异常证明可用)。

    第二步: new EmbedServer() 创建一个内嵌的server, 并且启动

    com.xxl.job.core.server.EmbedServer 源码如下:

    package com.xxl.job.core.server;
    
    import com.xxl.job.core.biz.ExecutorBiz;
    import com.xxl.job.core.biz.impl.ExecutorBizImpl;
    import com.xxl.job.core.biz.model.*;
    import com.xxl.job.core.thread.ExecutorRegistryThread;
    import com.xxl.job.core.util.GsonTool;
    import com.xxl.job.core.util.ThrowableUtil;
    import com.xxl.job.core.util.XxlJobRemotingUtil;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.http.*;
    import io.netty.handler.timeout.IdleStateEvent;
    import io.netty.handler.timeout.IdleStateHandler;
    import io.netty.util.CharsetUtil;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.concurrent.*;
    
    /**
     * Copy from : https://github.com/xuxueli/xxl-rpc
     *
     * @author xuxueli 2020-04-11 21:25
     */
    public class EmbedServer {
        private static final Logger logger = LoggerFactory.getLogger(EmbedServer.class);
    
        private ExecutorBiz executorBiz;
        private Thread thread;
    
        public void start(final String address, final int port, final String appname, final String accessToken) {
            executorBiz = new ExecutorBizImpl();
            thread = new Thread(new Runnable() {
    
                @Override
                public void run() {
    
                    // param
                    EventLoopGroup bossGroup = new NioEventLoopGroup();
                    EventLoopGroup workerGroup = new NioEventLoopGroup();
                    ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
                            0,
                            200,
                            60L,
                            TimeUnit.SECONDS,
                            new LinkedBlockingQueue<Runnable>(2000),
                            new ThreadFactory() {
                                @Override
                                public Thread newThread(Runnable r) {
                                    return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
                                }
                            },
                            new RejectedExecutionHandler() {
                                @Override
                                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                                    throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
                                }
                            });
    
    
                    try {
                        // start server
                        ServerBootstrap bootstrap = new ServerBootstrap();
                        bootstrap.group(bossGroup, workerGroup)
                                .channel(NioServerSocketChannel.class)
                                .childHandler(new ChannelInitializer<SocketChannel>() {
                                    @Override
                                    public void initChannel(SocketChannel channel) throws Exception {
                                        channel.pipeline()
                                                .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                                                .addLast(new HttpServerCodec())
                                                .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                                                .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                                    }
                                })
                                .childOption(ChannelOption.SO_KEEPALIVE, true);
    
                        // bind
                        ChannelFuture future = bootstrap.bind(port).sync();
    
                        logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
    
                        // start registry
                        startRegistry(appname, address);
    
                        // wait util stop
                        future.channel().closeFuture().sync();
    
                    } catch (InterruptedException e) {
                        if (e instanceof InterruptedException) {
                            logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
                        } else {
                            logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
                        }
                    } finally {
                        // stop
                        try {
                            workerGroup.shutdownGracefully();
                            bossGroup.shutdownGracefully();
                        } catch (Exception e) {
                            logger.error(e.getMessage(), e);
                        }
                    }
    
                }
    
            });
            thread.setDaemon(true);    // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
            thread.start();
        }
    
        public void stop() throws Exception {
            // destroy server thread
            if (thread!=null && thread.isAlive()) {
                thread.interrupt();
            }
    
            // stop registry
            stopRegistry();
            logger.info(">>>>>>>>>>> xxl-job remoting server destroy success.");
        }
    
    
        // ---------------------- registry ----------------------
    
        /**
         * netty_http
         *
         * Copy from : https://github.com/xuxueli/xxl-rpc
         *
         * @author xuxueli 2015-11-24 22:25:15
         */
        public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
            private static final Logger logger = LoggerFactory.getLogger(EmbedHttpServerHandler.class);
    
            private ExecutorBiz executorBiz;
            private String accessToken;
            private ThreadPoolExecutor bizThreadPool;
            public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) {
                this.executorBiz = executorBiz;
                this.accessToken = accessToken;
                this.bizThreadPool = bizThreadPool;
            }
    
            @Override
            protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
    
                // request parse
                //final byte[] requestBytes = ByteBufUtil.getBytes(msg.content());    // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);
                String requestData = msg.content().toString(CharsetUtil.UTF_8);
                String uri = msg.uri();
                HttpMethod httpMethod = msg.method();
                boolean keepAlive = HttpUtil.isKeepAlive(msg);
                String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);
    
                // invoke
                bizThreadPool.execute(new Runnable() {
                    @Override
                    public void run() {
                        // do invoke
                        Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
    
                        // to json
                        String responseJson = GsonTool.toJson(responseObj);
    
                        // write response
                        writeResponse(ctx, keepAlive, responseJson);
                    }
                });
            }
    
            private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
    
                // valid
                if (HttpMethod.POST != httpMethod) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
                }
                if (uri==null || uri.trim().length()==0) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
                }
                if (accessToken!=null
                        && accessToken.trim().length()>0
                        && !accessToken.equals(accessTokenReq)) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
                }
    
                // services mapping
                try {
                    if ("/beat".equals(uri)) {
                        return executorBiz.beat();
                    } else if ("/idleBeat".equals(uri)) {
                        IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
                        return executorBiz.idleBeat(idleBeatParam);
                    } else if ("/run".equals(uri)) {
                        TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
                        return executorBiz.run(triggerParam);
                    } else if ("/kill".equals(uri)) {
                        KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
                        return executorBiz.kill(killParam);
                    } else if ("/log".equals(uri)) {
                        LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
                        return executorBiz.log(logParam);
                    } else {
                        return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
                    }
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
                }
            }
    
            /**
             * write response
             */
            private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {
                // write response
                FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8));   //  Unpooled.wrappedBuffer(responseJson)
                response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8");       // HttpHeaderValues.TEXT_PLAIN.toString()
                response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
                if (keepAlive) {
                    response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
                }
                ctx.writeAndFlush(response);
            }
    
            @Override
            public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                ctx.flush();
            }
    
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                logger.error(">>>>>>>>>>> xxl-job provider netty_http server caught exception", cause);
                ctx.close();
            }
    
            @Override
            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                if (evt instanceof IdleStateEvent) {
                    ctx.channel().close();      // beat 3N, close if idle
                    logger.debug(">>>>>>>>>>> xxl-job provider netty_http server close an idle channel.");
                } else {
                    super.userEventTriggered(ctx, evt);
                }
            }
        }
    
        // ---------------------- registry ----------------------
    
        public void startRegistry(final String appname, final String address) {
            // start registry
            ExecutorRegistryThread.getInstance().start(appname, address);
        }
    
        public void stopRegistry() {
            // stop registry
            ExecutorRegistryThread.getInstance().toStop();
        }
    
    
    }
    View Code

      这里启动NettyServer 是在start() 方法中。方法中异步开启线程,然后线程中去启动NettyServer。并且核心的一个Handler 是 com.xxl.job.core.server.EmbedServer.EmbedHttpServerHandler#EmbedHttpServerHandler。 这个Handler 内部核心的逻辑是在一个线程池中处理的。处理逻辑就是先拿到 请求的uri, 然后根据uri 选择不同的方法进行调用,调用完之后转为JSON,并写回到调用方。

    第三步: com.xxl.job.core.server.EmbedServer#startRegistry 开启注册服务的线程, 最终交给线程: com.xxl.job.core.thread.ExecutorRegistryThread#start

    package com.xxl.job.core.thread;
    
    import com.xxl.job.core.biz.AdminBiz;
    import com.xxl.job.core.biz.model.RegistryParam;
    import com.xxl.job.core.biz.model.ReturnT;
    import com.xxl.job.core.enums.RegistryConfig;
    import com.xxl.job.core.executor.XxlJobExecutor;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * Created by xuxueli on 17/3/2.
     */
    public class ExecutorRegistryThread {
        private static Logger logger = LoggerFactory.getLogger(ExecutorRegistryThread.class);
    
        private static ExecutorRegistryThread instance = new ExecutorRegistryThread();
        public static ExecutorRegistryThread getInstance(){
            return instance;
        }
    
        private Thread registryThread;
        private volatile boolean toStop = false;
        public void start(final String appname, final String address){
    
            // valid
            if (appname==null || appname.trim().length()==0) {
                logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appname is null.");
                return;
            }
            if (XxlJobExecutor.getAdminBizList() == null) {
                logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");
                return;
            }
    
            registryThread = new Thread(new Runnable() {
                @Override
                public void run() {
    
                    // registry
                    while (!toStop) {
                        try {
                            RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
                            for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                                try {
                                    ReturnT<String> registryResult = adminBiz.registry(registryParam);
                                    if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                                        registryResult = ReturnT.SUCCESS;
                                        logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                        break;
                                    } else {
                                        logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                    }
                                } catch (Exception e) {
                                    logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
                                }
    
                            }
                        } catch (Exception e) {
                            if (!toStop) {
                                logger.error(e.getMessage(), e);
                            }
    
                        }
    
                        try {
                            if (!toStop) {
                                TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                            }
                        } catch (InterruptedException e) {
                            if (!toStop) {
                                logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
                            }
                        }
                    }
    
                    // registry remove
                    try {
                        RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
                        for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                            try {
                                ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
                                if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                                    registryResult = ReturnT.SUCCESS;
                                    logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                    break;
                                } else {
                                    logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                }
                            } catch (Exception e) {
                                if (!toStop) {
                                    logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
                                }
    
                            }
    
                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                    logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");
    
                }
            });
            registryThread.setDaemon(true);
            registryThread.setName("xxl-job, executor ExecutorRegistryThread");
            registryThread.start();
        }
    
        public void toStop() {
            toStop = true;
    
            // interrupt and wait
            if (registryThread != null) {
                registryThread.interrupt();
                try {
                    registryThread.join();
                } catch (InterruptedException e) {
                    logger.error(e.getMessage(), e);
                }
            }
    
        }
    
    }

       可以看到注册和删除都是这个线程进行的。 注册的时候是在一个while 循环中, 以30s 为周期进行休眠后发送注册信息(也就是发送appname和address)。发送注册信息实际也是获取com.xxl.job.core.executor.XxlJobExecutor#getAdminBizList 的列表,然后进行注册,注册的时候实际就是以http 接口的形式调用api/registry 接口(实际就是会调用到调度中心的controller 接口), 也可以看出这里通信走的是http 协议。

    2. com.xxl.job.core.executor.impl.XxlJobSpringExecutor 类查看

      从名字也可以看出是为了整合Spring 进行包装的一个类。实现了ApplicationContextAware, SmartInitializingSingleton, DisposableBean 接口。 ApplicationContextAware 接口是为了获取到ApplicationContext对象,DisposableBean 是为了在容器销毁的时候调用 destroy 方法,SmartInitializingSingleton 是容器创建完所有单例对象之后调用afterSingletonsInstantiated 方法。

    1. afterSingletonsInstantiated 方法调用时机:

     org.springframework.context.support.AbstractApplicationContext#refresh:

        public void refresh() throws BeansException, IllegalStateException {
            synchronized(this.startupShutdownMonitor) {
                StartupStep contextRefresh = this.applicationStartup.start("spring.context.refresh");
                this.prepareRefresh();
                ConfigurableListableBeanFactory beanFactory = this.obtainFreshBeanFactory();
                this.prepareBeanFactory(beanFactory);
    
                try {
                    this.postProcessBeanFactory(beanFactory);
                    StartupStep beanPostProcess = this.applicationStartup.start("spring.context.beans.post-process");
                    this.invokeBeanFactoryPostProcessors(beanFactory);
                    this.registerBeanPostProcessors(beanFactory);
                    beanPostProcess.end();
                    this.initMessageSource();
                    this.initApplicationEventMulticaster();
                    this.onRefresh();
                    this.registerListeners();
                    this.finishBeanFactoryInitialization(beanFactory);
                    this.finishRefresh();
                } catch (BeansException var10) {
                    if (this.logger.isWarnEnabled()) {
                        this.logger.warn("Exception encountered during context initialization - cancelling refresh attempt: " + var10);
                    }
    
                    this.destroyBeans();
                    this.cancelRefresh(var10);
                    throw var10;
                } finally {
                    this.resetCommonCaches();
                    contextRefresh.end();
                }
    
            }
        }
    View Code

    this.finishBeanFactoryInitialization(beanFactory); 构造所有单例对象

        protected void finishBeanFactoryInitialization(ConfigurableListableBeanFactory beanFactory) {
            if (beanFactory.containsBean("conversionService") && beanFactory.isTypeMatch("conversionService", ConversionService.class)) {
                beanFactory.setConversionService((ConversionService)beanFactory.getBean("conversionService", ConversionService.class));
            }
    
            if (!beanFactory.hasEmbeddedValueResolver()) {
                beanFactory.addEmbeddedValueResolver((strVal) -> {
                    return this.getEnvironment().resolvePlaceholders(strVal);
                });
            }
    
            String[] weaverAwareNames = beanFactory.getBeanNamesForType(LoadTimeWeaverAware.class, false, false);
            String[] var3 = weaverAwareNames;
            int var4 = weaverAwareNames.length;
    
            for(int var5 = 0; var5 < var4; ++var5) {
                String weaverAwareName = var3[var5];
                this.getBean(weaverAwareName);
            }
    
            beanFactory.setTempClassLoader((ClassLoader)null);
            beanFactory.freezeConfiguration();
            beanFactory.preInstantiateSingletons();
        }

    org.springframework.beans.factory.support.DefaultListableBeanFactory#preInstantiateSingletons:

        public void preInstantiateSingletons() throws BeansException {
            if (logger.isTraceEnabled()) {
                logger.trace("Pre-instantiating singletons in " + this);
            }
    
            // Iterate over a copy to allow for init methods which in turn register new bean definitions.
            // While this may not be part of the regular factory bootstrap, it does otherwise work fine.
            List<String> beanNames = new ArrayList<>(this.beanDefinitionNames);
    
            // Trigger initialization of all non-lazy singleton beans...
            for (String beanName : beanNames) {
                RootBeanDefinition bd = getMergedLocalBeanDefinition(beanName);
                if (!bd.isAbstract() && bd.isSingleton() && !bd.isLazyInit()) {
                    if (isFactoryBean(beanName)) {
                        Object bean = getBean(FACTORY_BEAN_PREFIX + beanName);
                        if (bean instanceof FactoryBean) {
                            FactoryBean<?> factory = (FactoryBean<?>) bean;
                            boolean isEagerInit;
                            if (System.getSecurityManager() != null && factory instanceof SmartFactoryBean) {
                                isEagerInit = AccessController.doPrivileged(
                                        (PrivilegedAction<Boolean>) ((SmartFactoryBean<?>) factory)::isEagerInit,
                                        getAccessControlContext());
                            }
                            else {
                                isEagerInit = (factory instanceof SmartFactoryBean &&
                                        ((SmartFactoryBean<?>) factory).isEagerInit());
                            }
                            if (isEagerInit) {
                                getBean(beanName);
                            }
                        }
                    }
                    else {
                        getBean(beanName);
                    }
                }
            }
    
            // Trigger post-initialization callback for all applicable beans...
            for (String beanName : beanNames) {
                Object singletonInstance = getSingleton(beanName);
                if (singletonInstance instanceof SmartInitializingSingleton) {
                    StartupStep smartInitialize = this.getApplicationStartup().start("spring.beans.smart-initialize")
                            .tag("beanName", beanName);
                    SmartInitializingSingleton smartSingleton = (SmartInitializingSingleton) singletonInstance;
                    if (System.getSecurityManager() != null) {
                        AccessController.doPrivileged((PrivilegedAction<Object>) () -> {
                            smartSingleton.afterSingletonsInstantiated();
                            return null;
                        }, getAccessControlContext());
                    }
                    else {
                        smartSingleton.afterSingletonsInstantiated();
                    }
                    smartInitialize.end();
                }
            }
        }

      这里先获取bean 走bean 的生命流程,然后走org.springframework.beans.factory.SmartInitializingSingleton#afterSingletonsInstantiated 执行单例对象实例化完方法。(也可以理解为这个是spring 走完生命流程再走这个接口的方法)

    2. com.xxl.job.core.executor.impl.XxlJobSpringExecutor#initJobHandlerMethodRepository

      这个方法就是扫描Spring 容器中的对象,然后拿到对象带 XxlJob 注解的方法,最终生成 com.xxl.job.core.handler.impl.MethodJobHandler 对象,也就是可以反射调用的对象, 缓存到com.xxl.job.core.executor.XxlJobExecutor#jobHandlerRepository 静态map 中。

    IJobHandler 默认结构如下: (这里实际就是一种模板模式的应用)

         还有另一种方式就是直接手动注册:(缓存是存在静态map 中, 所以我们也可以自己注册。我们只需要让其根据name 能找到对应的handler即可)

        static {
            // 手动通过如下方式注入到执行器容器。
            XxlJobExecutor.registJobHandler("XXLClassJob", new XXLClassJob());
        }

    3. 调用原理

      以run 调用为例子分析从admin 调度中心如何调用到服务内部。

    1.  比如服务器端我们手动执行一个的时候是通过和nettyServer 建立连接之后, 调用/run 然后传递相关参数进行调用。 然后调用到:com.xxl.job.core.biz.impl.ExecutorBizImpl#run

        @Override
        public ReturnT<String> run(TriggerParam triggerParam) {
            // load old:jobHandler + jobThread
            JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
            IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
            String removeOldReason = null;
    
            // valid:jobHandler + jobThread
            GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
            if (GlueTypeEnum.BEAN == glueTypeEnum) {
    
                // new jobhandler
                IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
    
                // valid old jobThread
                if (jobThread!=null && jobHandler != newJobHandler) {
                    // change handler, need kill old thread
                    removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
    
                    jobThread = null;
                    jobHandler = null;
                }
    
                // valid handler
                if (jobHandler == null) {
                    jobHandler = newJobHandler;
                    if (jobHandler == null) {
                        return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
                    }
                }
    
            } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
    
                // valid old jobThread
                if (jobThread != null &&
                        !(jobThread.getHandler() instanceof GlueJobHandler
                            && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
                    // change handler or gluesource updated, need kill old thread
                    removeOldReason = "change job source or glue type, and terminate the old job thread.";
    
                    jobThread = null;
                    jobHandler = null;
                }
    
                // valid handler
                if (jobHandler == null) {
                    try {
                        IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
                        jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                        return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
                    }
                }
            } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
    
                // valid old jobThread
                if (jobThread != null &&
                        !(jobThread.getHandler() instanceof ScriptJobHandler
                                && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
                    // change script or gluesource updated, need kill old thread
                    removeOldReason = "change job source or glue type, and terminate the old job thread.";
    
                    jobThread = null;
                    jobHandler = null;
                }
    
                // valid handler
                if (jobHandler == null) {
                    jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
                }
            } else {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
            }
    
            // executor block strategy
            if (jobThread != null) {
                ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
                if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
                    // discard when running
                    if (jobThread.isRunningOrHasQueue()) {
                        return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
                    }
                } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
                    // kill running jobThread
                    if (jobThread.isRunningOrHasQueue()) {
                        removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
    
                        jobThread = null;
                    }
                } else {
                    // just queue trigger
                }
            }
    
            // replace thread (new or exists invalid)
            if (jobThread == null) {
                jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
            }
    
            // push data to queue
            ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
            return pushResult;
        }

    (1) 这里会根据工作ID去判断。 如果第一次调用com.xxl.job.core.executor.XxlJobExecutor#loadJobHandler 先找到handler, 然后 调用 com.xxl.job.core.executor.XxlJobExecutor#registJobThread 创建JobThread 并调用start 方法开启线程

        private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();
        public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
            JobThread newJobThread = new JobThread(jobId, handler);
            newJobThread.start();
            logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});
    
            JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);    // putIfAbsent | oh my god, map's put method return the old value!!!
            if (oldJobThread != null) {
                oldJobThread.toStop(removeOldReason);
                oldJobThread.interrupt();
            }
    
            return newJobThread;
        }
        public static JobThread removeJobThread(int jobId, String removeOldReason){
            JobThread oldJobThread = jobThreadRepository.remove(jobId);
            if (oldJobThread != null) {
                oldJobThread.toStop(removeOldReason);
                oldJobThread.interrupt();
    
                return oldJobThread;
            }
            return null;
        }
        public static JobThread loadJobThread(int jobId){
            JobThread jobThread = jobThreadRepository.get(jobId);
            return jobThread;
        }
    View Code

    (2) com.xxl.job.core.thread.JobThread 源码如下:

    package com.xxl.job.core.thread;
    
    import com.xxl.job.core.biz.model.HandleCallbackParam;
    import com.xxl.job.core.biz.model.ReturnT;
    import com.xxl.job.core.biz.model.TriggerParam;
    import com.xxl.job.core.context.XxlJobContext;
    import com.xxl.job.core.context.XxlJobHelper;
    import com.xxl.job.core.executor.XxlJobExecutor;
    import com.xxl.job.core.handler.IJobHandler;
    import com.xxl.job.core.log.XxlJobFileAppender;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.PrintWriter;
    import java.io.StringWriter;
    import java.util.Collections;
    import java.util.Date;
    import java.util.HashSet;
    import java.util.Set;
    import java.util.concurrent.*;
    
    
    /**
     * handler thread
     * @author xuxueli 2016-1-16 19:52:47
     */
    public class JobThread extends Thread{
        private static Logger logger = LoggerFactory.getLogger(JobThread.class);
    
        private int jobId;
        private IJobHandler handler;
        private LinkedBlockingQueue<TriggerParam> triggerQueue;
        private Set<Long> triggerLogIdSet;        // avoid repeat trigger for the same TRIGGER_LOG_ID
    
        private volatile boolean toStop = false;
        private String stopReason;
    
        private boolean running = false;    // if running job
        private int idleTimes = 0;            // idel times
    
    
        public JobThread(int jobId, IJobHandler handler) {
            this.jobId = jobId;
            this.handler = handler;
            this.triggerQueue = new LinkedBlockingQueue<TriggerParam>();
            this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>());
        }
        public IJobHandler getHandler() {
            return handler;
        }
    
        /**
         * new trigger to queue
         *
         * @param triggerParam
         * @return
         */
        public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
            // avoid repeat
            if (triggerLogIdSet.contains(triggerParam.getLogId())) {
                logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
                return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
            }
    
            triggerLogIdSet.add(triggerParam.getLogId());
            triggerQueue.add(triggerParam);
            return ReturnT.SUCCESS;
        }
    
        /**
         * kill job thread
         *
         * @param stopReason
         */
        public void toStop(String stopReason) {
            /**
             * Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep),
             * 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身;
             * 所以需要注意,此处彻底销毁本线程,需要通过共享变量方式;
             */
            this.toStop = true;
            this.stopReason = stopReason;
        }
    
        /**
         * is running job
         * @return
         */
        public boolean isRunningOrHasQueue() {
            return running || triggerQueue.size()>0;
        }
    
        @Override
        public void run() {
    
            // init
            try {
                handler.init();
            } catch (Throwable e) {
                logger.error(e.getMessage(), e);
            }
    
            // execute
            while(!toStop){
                running = false;
                idleTimes++;
    
                TriggerParam triggerParam = null;
                try {
                    // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
                    triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
                    if (triggerParam!=null) {
                        running = true;
                        idleTimes = 0;
                        triggerLogIdSet.remove(triggerParam.getLogId());
    
                        // log filename, like "logPath/yyyy-MM-dd/9999.log"
                        String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
                        XxlJobContext xxlJobContext = new XxlJobContext(
                                triggerParam.getJobId(),
                                triggerParam.getExecutorParams(),
                                logFileName,
                                triggerParam.getBroadcastIndex(),
                                triggerParam.getBroadcastTotal());
    
                        // init job context
                        XxlJobContext.setXxlJobContext(xxlJobContext);
    
                        // execute
                        XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());
    
                        if (triggerParam.getExecutorTimeout() > 0) {
                            // limit timeout
                            Thread futureThread = null;
                            try {
                                FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
                                    @Override
                                    public Boolean call() throws Exception {
    
                                        // init job context
                                        XxlJobContext.setXxlJobContext(xxlJobContext);
    
                                        handler.execute();
                                        return true;
                                    }
                                });
                                futureThread = new Thread(futureTask);
                                futureThread.start();
    
                                Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
                            } catch (TimeoutException e) {
    
                                XxlJobHelper.log("<br>----------- xxl-job job execute timeout");
                                XxlJobHelper.log(e);
    
                                // handle result
                                XxlJobHelper.handleTimeout("job execute timeout ");
                            } finally {
                                futureThread.interrupt();
                            }
                        } else {
                            // just execute
                            handler.execute();
                        }
    
                        // valid execute handle data
                        if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {
                            XxlJobHelper.handleFail("job handle result lost.");
                        } else {
                            String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();
                            tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)
                                    ?tempHandleMsg.substring(0, 50000).concat("...")
                                    :tempHandleMsg;
                            XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);
                        }
                        XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="
                                + XxlJobContext.getXxlJobContext().getHandleCode()
                                + ", handleMsg = "
                                + XxlJobContext.getXxlJobContext().getHandleMsg()
                        );
    
                    } else {
                        if (idleTimes > 30) {
                            if(triggerQueue.size() == 0) {    // avoid concurrent trigger causes jobId-lost
                                XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
                            }
                        }
                    }
                } catch (Throwable e) {
                    if (toStop) {
                        XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
                    }
    
                    // handle result
                    StringWriter stringWriter = new StringWriter();
                    e.printStackTrace(new PrintWriter(stringWriter));
                    String errorMsg = stringWriter.toString();
    
                    XxlJobHelper.handleFail(errorMsg);
    
                    XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
                } finally {
                    if(triggerParam != null) {
                        // callback handler info
                        if (!toStop) {
                            // commonm
                            TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                                    triggerParam.getLogId(),
                                    triggerParam.getLogDateTime(),
                                    XxlJobContext.getXxlJobContext().getHandleCode(),
                                    XxlJobContext.getXxlJobContext().getHandleMsg() )
                            );
                        } else {
                            // is killed
                            TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                                    triggerParam.getLogId(),
                                    triggerParam.getLogDateTime(),
                                    XxlJobContext.HANDLE_COCE_FAIL,
                                    stopReason + " [job running, killed]" )
                            );
                        }
                    }
                }
            }
    
            // callback trigger request in queue
            while(triggerQueue !=null && triggerQueue.size()>0){
                TriggerParam triggerParam = triggerQueue.poll();
                if (triggerParam!=null) {
                    // is killed
                    TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                            triggerParam.getLogId(),
                            triggerParam.getLogDateTime(),
                            XxlJobContext.HANDLE_COCE_FAIL,
                            stopReason + " [job not executed, in the job queue, killed.]")
                    );
                }
            }
    
            // destroy
            try {
                handler.destroy();
            } catch (Throwable e) {
                logger.error(e.getMessage(), e);
            }
    
            logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
        }
    }
    View Code

    (3) 然后调用com.xxl.job.core.thread.JobThread#pushTriggerQueue 将参数存入对方的参数队列中

    2. com.xxl.job.core.thread.JobThread 分析

    1. run 方法会先调用handler.init()。

    2. while 循环内部然后用一个标记为进行判断。

    1》首先将空闲次数idleTimes 自增, 标记当前又一次没有获取到任务进行空转。

    2》然后从triggerQueue 阻塞队列获取任务, 并且一次周期最长等待时间是3s

    2.1》 如果获取到任务, 将idleTimes 清零。然后构造一些参数信息,然后缓存到ThreadLocal 中。 然后调用handler.execute(); 进行任务的执行。 (如果是继承IJobHandler则直接调用execute 方法); 如果是@XxlJob 注解的方式, 实则是生成了一个com.xxl.job.core.handler.impl.MethodJobHandler 反射进行调用。

    2.2》如果获取不到, 则判断空闲次数是否到达30 次, 如果到达30 次。则调用 com.xxl.job.core.executor.XxlJobExecutor#removeJobThread 移除该线程。 会将toStop 标记为置为true,然后线程正常结束后销毁。

    这里有几个注意点:

    (1) init 方法是每次创建一个JobThread 都会调用

    (2) 每个jobid 对应的任务都会开启一个线程。此线程允许90s 内不执行任务, 如果超过90 s 线程会自动销毁。并且是每个jobId对应 一个线程。

    补充: 一个xxl-job 后台开启的异步定时线程如下:

    【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】
  • 相关阅读:
    Python os模块简单应用
    requests获取源代码时中文乱码问题
    python 正则表达式findall和search用法
    linux source路径配置 省掉每次source的烦恼
    mstar屏参调试说明
    Mstar supernova方案调试笔记-001
    NonOS方案屛参的TOTAL和PLL SET值的设置
    supernova系统 Mrsv 简介笔记
    【工作随笔】Javascript 日期获取封装组件
    如何理解springboot中,mapper接口的实现类由持久层框架进行创建,而不由spring创建?
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/15510940.html
Copyright © 2011-2022 走看看