zoukankan      html  css  js  c++  java
  • 分布式定时任务—xxl-job学习(二):源码分析——执行器的启动过程

    从上一篇搭建一个简单的分布式demo任务调度项目可以知道,主要是三个部分:

    配置并启动任务调度中心(xxl-job-admin)
    配置并启动业务系统(执行器)
    在调度中心web页面配置执行器及任务
    本篇咱们先从业务系统的执行器的配置和启动的源码进行深度分析。
    xxl.job.version使用的是 2.2.1-SNAPSHOT版本

    一、执行器的启动
    在业务定时任务系统

    引入xxl-job的依赖配置
    新增执行器组件配置类XxlJobConfig.java,其中配置了核心类XxlJobSpringExecutor
    新增jobhandler类,类中有带 @XxlJob("xxx")注解的方法
    1.1 分析核心类XxlJobSpringExecutor
    public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {

    @Override
    public void afterSingletonsInstantiated() {
    //。。。。。。。。暂时省略这个方法的具体内容
    }

    @Override
    public void destroy() {
    super.destroy();
    }

    private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
    //。。。。。。。。暂时省略这个方法的具体内容
    }

    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    this.applicationContext = applicationContext;
    }

    public static ApplicationContext getApplicationContext() {
    return applicationContext;
    }
    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    在源码中我们可以看到这个类继承了XxlJobExecutor类,实现了ApplicationContextAware、SmartInitializingSingleton、DisposableBean。

    这个对象初始化的时候会调用afterSingletonsInstantiated()方法。

    @Override
    public void afterSingletonsInstantiated() {

    // init JobHandler Repository
    /*initJobHandlerRepository(applicationContext);*/

    // init JobHandler Repository (for method)
    initJobHandlerMethodRepository(applicationContext);

    // refresh GlueFactory
    GlueFactory.refreshInstance(1);

    // super start
    try {
    super.start();
    } catch (Exception e) {
    throw new RuntimeException(e);
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    initJobHandlerRepository()和initJobHandlerMethodRepository()是将项目中配置的任务保存在项目的内存中,使用ConcurrentMap<String, IJobHandler>保存,使用 springbean的id为key,具体的任务实例对象为 value。;
    刷新GlueFactory(glue执行工厂),把它刷新为 SpringGlueFactory,在执行 glue 模式的任务时使用 spring 来加载相应实例。
    会调用执行器的核心XxlJobExecutor中的start()方法。
    1.1.1 initJobHandlerRepository()
    这个方法是旧版本中用来注册带有 @JobHandler 注解的bean的Java类, 2.2.1-SNAPSHOT版本已经不支持该种方式;

    1.1.2 initJobHandlerMethodRepository()
    private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
    if (applicationContext == null) {
    return;
    }
    // init job handler from method
    String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
    for (String beanDefinitionName : beanDefinitionNames) {
    Object bean = applicationContext.getBean(beanDefinitionName);

    Map<Method, XxlJob> annotatedMethods = null;
    // referred to : org.springframework.context.event.EventListenerMethodProcessor.processBean
    try {
    annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
    new MethodIntrospector.MetadataLookup<XxlJob>() {
    @Override
    public XxlJob inspect(Method method) {
    return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
    }
    });
    } catch (Throwable ex) {
    logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
    }
    if (annotatedMethods==null || annotatedMethods.isEmpty()) {
    continue;
    }

    for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
    Method method = methodXxlJobEntry.getKey();
    XxlJob xxlJob = methodXxlJobEntry.getValue();
    if (xxlJob == null) {
    continue;
    }

    String name = xxlJob.value();
    if (name.trim().length() == 0) {
    throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
    }
    if (loadJobHandler(name) != null) {
    throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
    }

    // execute method
    if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
    throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
    "The correct method format like " public ReturnT<String> execute(String param) " .");
    }
    if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {
    throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
    "The correct method format like " public ReturnT<String> execute(String param) " .");
    }
    method.setAccessible(true);

    // init and destory
    Method initMethod = null;
    Method destroyMethod = null;

    if (xxlJob.init().trim().length() > 0) {
    try {
    initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
    initMethod.setAccessible(true);
    } catch (NoSuchMethodException e) {
    throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
    }
    }
    if (xxlJob.destroy().trim().length() > 0) {
    try {
    destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
    destroyMethod.setAccessible(true);
    } catch (NoSuchMethodException e) {
    throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
    }
    }

    // registry jobhandler 向`ConcurrentMap<String, IJobHandler>`中保存当前定时任务实例。
    registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));
    }
    }

    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    分析:

    从applicationContext中获取所有的bean对象;
    利用MethodIntrospector工具类的selectMethods方法和MetadataLookup接口得到Map<Method, XxlJob>(下边学习下这个工具类核心方法selectMethods的源码)
    public static <T> Map<Method, T> selectMethods(Class<?> targetType, final MetadataLookup<T> metadataLookup) {
    final Map<Method, T> methodMap = new LinkedHashMap<>();
    Set<Class<?>> handlerTypes = new LinkedHashSet<>();
    Class<?> specificHandlerType = null;
    //判断是否是代理类
    if (!Proxy.isProxyClass(targetType)) {
    //如果是代理类,找到实际的类型
    specificHandlerType = ClassUtils.getUserClass(targetType);
    handlerTypes.add(specificHandlerType);
    }
    handlerTypes.addAll(ClassUtils.getAllInterfacesForClassAsSet(targetType));
    //遍历所有找到的class对象
    for (Class<?> currentHandlerType : handlerTypes) {
    final Class<?> targetClass = (specificHandlerType != null ? specificHandlerType : currentHandlerType);

    ReflectionUtils.doWithMethods(currentHandlerType, method -> {
    //获取指定的method
    Method specificMethod = ClassUtils.getMostSpecificMethod(method, targetClass);
    //获取方法关联的元数据,一般是指注解
    T result = metadataLookup.inspect(specificMethod);
    if (result != null) {
    Method bridgedMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
    if (bridgedMethod == specificMethod || metadataLookup.inspect(bridgedMethod) == null) {
    methodMap.put(specificMethod, result);
    }
    }
    }, ReflectionUtils.USER_DECLARED_METHODS);
    }

    return methodMap;
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    循环第二步得到的Map<Method, XxlJob>,key就是注解的id,value是注解元数据。
    校验注解元数据的name属性,如果为空则抛出异常;
    根据name从内存ConcurrentMap<String, IJobHandler>(这其实是注册的时候存的所有任务的仓库)获取对应任务实例,如果已经存在,则抛出异常(任务冲突);
    校验入参,必须为String param,因为 2.2.1-SNAPSHOT指定了开发Job方法,方式格式要求为 “public ReturnT< String> execute(String param)”。
    校验出参,必须是ReturnT< String>格式;
    注入元数据中配置的init()和destroy()方法;
    向ConcurrentMap<String, IJobHandler>中保存当前定时任务实例。
    1.1.3 GlueFactory.refreshInstance(1)
    刷新GlueFactory为 SpringGlueFactory,在执行 glue 模式的任务时使用 spring 来加载相应实例。

    1.1.4 super.start()
    调用XxlJobExecutor.start() 。

    1.2 分析核心类XxlJobExecutor
    XxlJobExecutor的属性有:

    // ---------------------- param ----------------------
    private String adminAddresses;
    private String accessToken;
    private String appname;
    private String address;
    private String ip;
    private int port;
    private String logPath;
    private int logRetentionDays;
    1
    2
    3
    4
    5
    6
    7
    8
    9
    上一步介绍了最终XxlJobSpringExecutor会调用XxlJobExecutor的start()方法,下边我们继续看看这个方法做些什么:

    public void start() throws Exception {

    // init logpath
    XxlJobFileAppender.initLogPath(logPath);

    // init invoker, admin-client
    initAdminBizList(adminAddresses, accessToken);


    // init JobLogFileCleanThread
    JobLogFileCleanThread.getInstance().start(logRetentionDays);

    // init TriggerCallbackThread
    TriggerCallbackThread.getInstance().start();

    // init executor-server
    initEmbedServer(address, ip, port, appname, accessToken);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    1.2.1 XxlJobFileAppender.initLogPath(logPath)
    logPath是我们配置执行器组件里的xxl.job.executor.logpath日志路径。

    public static void initLogPath(String logPath){
    // init
    if (logPath!=null && logPath.trim().length()>0) {
    logBasePath = logPath;
    }
    // mk base dir
    File logPathDir = new File(logBasePath);
    if (!logPathDir.exists()) {
    logPathDir.mkdirs();
    }
    logBasePath = logPathDir.getPath();

    // mk glue dir
    File glueBaseDir = new File(logPathDir, "gluesource");
    if (!glueBaseDir.exists()) {
    glueBaseDir.mkdirs();
    }
    glueSrcPath = glueBaseDir.getPath();
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    如果配置了日志路径,那么logBasePath就是我们配置文件里的地址;
    判断这个日志路径是否存在,如果不存在则创建日志目录;
    生成gluesource子文件夹;
    1.2.2 initAdminBizList(adminAddresses, accessToken)
    // ---------------------- admin-client (rpc invoker) ----------------------
    private static List<AdminBiz> adminBizList;
    private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
    if (adminAddresses!=null && adminAddresses.trim().length()>0) {
    for (String address: adminAddresses.trim().split(",")) {
    if (address!=null && address.trim().length()>0) {

    AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);

    if (adminBizList == null) {
    adminBizList = new ArrayList<AdminBiz>();
    }
    adminBizList.add(adminBiz);
    }
    }
    }
    }
    public static List<AdminBiz> getAdminBizList(){
    return adminBizList;
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    这个方法是根据调度中心部署跟地址adminAddresses和执行器通讯TOKENaccessToken初始化AdminBizClient,AdminBizClient这个类有三个核心方法

    @Override
    public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
    return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
    }

    @Override
    public ReturnT<String> registry(RegistryParam registryParam) {
    return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
    }

    @Override
    public ReturnT<String> registryRemove(RegistryParam registryParam) {
    return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, timeout, registryParam, String.class);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    提供callback(回调)、registry(注册)以及registryRemove(注册移除)到调度中心的方法。

    1.2.3 JobLogFileCleanThread.getInstance().start(logRetentionDays)
    这个方法是初始化日志清除线程,过期日志自动清理(清理N天前的日志文件)。

    public class JobLogFileCleanThread {
    private static Logger logger = LoggerFactory.getLogger(JobLogFileCleanThread.class);

    private static JobLogFileCleanThread instance = new JobLogFileCleanThread();
    public static JobLogFileCleanThread getInstance(){
    return instance;
    }

    private Thread localThread;
    private volatile boolean toStop = false;
    public void start(final long logRetentionDays){

    // limit min value
    if (logRetentionDays < 3 ) {
    return;
    }

    localThread = new Thread(new Runnable() {
    @Override
    public void run() {
    while (!toStop) {
    try {
    // clean log dir, over logRetentionDays
    File[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles();
    if (childDirs!=null && childDirs.length>0) {

    // today
    Calendar todayCal = Calendar.getInstance();
    todayCal.set(Calendar.HOUR_OF_DAY,0);
    todayCal.set(Calendar.MINUTE,0);
    todayCal.set(Calendar.SECOND,0);
    todayCal.set(Calendar.MILLISECOND,0);

    Date todayDate = todayCal.getTime();

    for (File childFile: childDirs) {

    // valid
    if (!childFile.isDirectory()) {
    continue;
    }
    if (childFile.getName().indexOf("-") == -1) {
    continue;
    }

    // file create date
    Date logFileCreateDate = null;
    try {
    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
    logFileCreateDate = simpleDateFormat.parse(childFile.getName());
    } catch (ParseException e) {
    logger.error(e.getMessage(), e);
    }
    if (logFileCreateDate == null) {
    continue;
    }

    if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) {
    FileUtil.deleteRecursively(childFile);
    }

    }
    }

    } catch (Exception e) {
    if (!toStop) {
    logger.error(e.getMessage(), e);
    }

    }

    try {
    TimeUnit.DAYS.sleep(1);
    } catch (InterruptedException e) {
    if (!toStop) {
    logger.error(e.getMessage(), e);
    }
    }
    }
    logger.info(">>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destory.");

    }
    });
    localThread.setDaemon(true);
    localThread.setName("xxl-job, executor JobLogFileCleanThread");
    localThread.start();
    }

    public void toStop() {
    toStop = true;

    if (localThread == null) {
    return;
    }

    // interrupt and wait
    localThread.interrupt();
    try {
    localThread.join();
    } catch (InterruptedException e) {
    logger.error(e.getMessage(), e);
    }
    }  郑州看妇科哪家医院好:http://www.zztongjifk.com/郑州妇科医院哪里好:http://www.zztongjifk.com/郑州做妇科检查多少钱:http://www.zztongjifk.com/

    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    我们主要关注下start()方法:

    执行器组件配置的执行器日志文件保存天数必须大于3,否则不清理;
    创建一个守护线程,每天执行一次(TimeUnit.DAYS.sleep(1););
    获取日志路径根目录下的所有日期文件目录;
    循环判断当前时间(当天的0时0分0秒0毫秒)和日期目录对应的"yyyy-MM-dd"时间差值是否大于配置的执行器日志文件保存天数参数;
    (todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000)
    1
    如果超过日志保存天数,则删除该时间目录及其目录下所有文件。

  • 相关阅读:
    springboot 基础
    spring 基础
    spring MVC 基础
    windows shell
    oracle 创建用户和视图并授权
    maven 安装本地依赖
    JAVA ssl 证书
    mybatis 递归
    MyBatis基础
    当年的毕设-cpf (一个简易的协议 fuzzer)
  • 原文地址:https://www.cnblogs.com/sushine1/p/13187916.html
Copyright © 2011-2022 走看看