zoukankan      html  css  js  c++  java
  • 开源任务调度平台elastic-job-lite源码解析

    前段时间写过一遍文章<一文揭秘定时任务调度框架quartz>,有读者建议我再讲讲elastic-job这个任务调度框架,年末没有那么忙,就来学习一下elastic-job。

    首先一点,elastic-job基于quartz,理解quartz的运行机制有助于对elastic-job的快速理解。

    首先看一下elastic-job-lite的架构

    我们知道quartz有三个重要的概念:Job,Trigger,Scheduler。那么elastic-job里面三个概念是什么体现的呢?

    1.Job

    LiteJob继承自quartz的job接口

    import org.quartz.Job;
    import org.quartz.JobExecutionContext;
    import org.quartz.JobExecutionException;
    
    /**
     * Lite调度作业.
     *
     * @author zhangliang
     */
    public final class LiteJob implements Job {
        
        @Setter
        private ElasticJob elasticJob;
        
        @Setter
        private JobFacade jobFacade;
        
        @Override
        public void execute(final JobExecutionContext context) throws JobExecutionException {
            JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
        }
    }

    其中,

    1.1 ElasticJob实现了不同的Job类型

    1.2.JobFacade是作业内部服务门面服务

    注意:elasticJob的特性在里面可以看到如:

    任务分片:

      将整体任务拆解为多个子任务

      可通过服务器的增减弹性伸缩任务处理能力

      分布式协调,任务服务器上下线的全自动发现与处理

    容错性:

      支持定时自我故障检测与自动修复

      分布式任务分片唯一性保证

      支持失效转移和错过任务重触发

    任务跟踪

    任务调度

    public interface JobFacade {
        
        /**
         * 读取作业配置.
         * 
         * @param fromCache 是否从缓存中读取
         * @return 作业配置
         */
        JobRootConfiguration loadJobRootConfiguration(boolean fromCache);
        
        /**
         * 检查作业执行环境.
         * 
         * @throws JobExecutionEnvironmentException 作业执行环境异常
         */
        void checkJobExecutionEnvironment() throws JobExecutionEnvironmentException;
        
        /**
         * 如果需要失效转移, 则执行作业失效转移.
         */
        void failoverIfNecessary();
        
        /**
         * 注册作业启动信息.
         *
         * @param shardingContexts 分片上下文
         */
        void registerJobBegin(ShardingContexts shardingContexts);
        
        /**
         * 注册作业完成信息.
         *
         * @param shardingContexts 分片上下文
         */
        void registerJobCompleted(ShardingContexts shardingContexts);
        
        /**
         * 获取当前作业服务器的分片上下文.
         *
         * @return 分片上下文
         */
        ShardingContexts getShardingContexts();
        
        /**
         * 设置任务被错过执行的标记.
         *
         * @param shardingItems 需要设置错过执行的任务分片项
         * @return 是否满足misfire条件
         */
        boolean misfireIfRunning(Collection<Integer> shardingItems);
        
        /**
         * 清除任务被错过执行的标记.
         *
         * @param shardingItems 需要清除错过执行的任务分片项
         */
        void clearMisfire(Collection<Integer> shardingItems);
        
        /**
         * 判断作业是否需要执行错过的任务.
         * 
         * @param shardingItems 任务分片项集合
         * @return 作业是否需要执行错过的任务
         */
        boolean isExecuteMisfired(Collection<Integer> shardingItems);
        
        /**
         * 判断作业是否符合继续运行的条件.
         * 
         * <p>如果作业停止或需要重分片或非流式处理则作业将不会继续运行.</p>
         * 
         * @return 作业是否符合继续运行的条件
         */
        boolean isEligibleForJobRunning();
        
        /**判断是否需要重分片.
         *
         * @return 是否需要重分片
         */
        boolean isNeedSharding();
        
        /**
         * 作业执行前的执行的方法.
         *
         * @param shardingContexts 分片上下文
         */
        void beforeJobExecuted(ShardingContexts shardingContexts);
        
        /**
         * 作业执行后的执行的方法.
         *
         * @param shardingContexts 分片上下文
         */
        void afterJobExecuted(ShardingContexts shardingContexts);
        
        /**
         * 发布执行事件.
         *
         * @param jobExecutionEvent 作业执行事件
         */
        void postJobExecutionEvent(JobExecutionEvent jobExecutionEvent);
        
        /**
         * 发布作业状态追踪事件.
         *
         * @param taskId 作业Id
         * @param state 作业执行状态
         * @param message 作业执行消息
         */
        void postJobStatusTraceEvent(String taskId, State state, String message);
    }

    2.JobDetail

    通用的Job属性,定义在job.xsd

        <xsd:complexType name="base">
            <xsd:complexContent>
                <xsd:extension base="beans:identifiedType">
                    <xsd:all>
                        <xsd:element ref="listener" minOccurs="0" maxOccurs="1" />
                        <xsd:element ref="distributed-listener" minOccurs="0" maxOccurs="1" />
                    </xsd:all>
                    <xsd:attribute name="class" type="xsd:string" />
                    <xsd:attribute name="job-ref" type="xsd:string" />
                    <xsd:attribute name="registry-center-ref" type="xsd:string" use="required" />
                    <xsd:attribute name="cron" type="xsd:string" use="required" />
                    <xsd:attribute name="sharding-total-count" type="xsd:string" use="required" />
                    <xsd:attribute name="sharding-item-parameters" type="xsd:string" />
                    <xsd:attribute name="job-parameter" type="xsd:string" />
                    <xsd:attribute name="monitor-execution" type="xsd:string" default="true"/>
                    <xsd:attribute name="monitor-port" type="xsd:string" default="-1"/>
                    <xsd:attribute name="max-time-diff-seconds" type="xsd:string" default="-1"/>
                    <xsd:attribute name="failover" type="xsd:string" default="false"/>
                    <xsd:attribute name="reconcile-interval-minutes" type="xsd:int" default="10"/>
                    <xsd:attribute name="misfire" type="xsd:string" default="true"/>
                    <xsd:attribute name="job-sharding-strategy-class" type="xsd:string" />
                    <xsd:attribute name="description" type="xsd:string" />
                    <xsd:attribute name="disabled" type="xsd:string" default="false"/>
                    <xsd:attribute name="overwrite" type="xsd:string" default="false"/>
                    <xsd:attribute name="executor-service-handler" type="xsd:string" default="io.elasticjob.lite.executor.handler.impl.DefaultExecutorServiceHandler"/>
                    <xsd:attribute name="job-exception-handler" type="xsd:string" default="io.elasticjob.lite.executor.handler.impl.DefaultJobExceptionHandler"/>
                    <xsd:attribute name="event-trace-rdb-data-source" type="xsd:string" />
                </xsd:extension>
            </xsd:complexContent>
        </xsd:complexType>

    其中Simple类型的任务完全继承通用属性,dataflow类型的任务增加了streaming-process属性,script增加了script-command-line属性

    使用的解析器定义在spring.handlers

    http://www.dangdang.com/schema/ddframe/reg=io.elasticjob.lite.spring.reg.handler.RegNamespaceHandler
    http://www.dangdang.com/schema/ddframe/job=io.elasticjob.lite.spring.job.handler.JobNamespaceHandler

    JobNamespaceHandler

    /**
     * 分布式作业的命名空间处理器.
     * 
     * @author caohao
     */
    public final class JobNamespaceHandler extends NamespaceHandlerSupport {
        
        @Override
        public void init() {
            registerBeanDefinitionParser("simple", new SimpleJobBeanDefinitionParser());
            registerBeanDefinitionParser("dataflow", new DataflowJobBeanDefinitionParser());
            registerBeanDefinitionParser("script", new ScriptJobBeanDefinitionParser());
        }
    }

    在弹性化分布式作业执行器AbstractElasticJobExecutor.java初始化时获取配置属性,并使用对应的Handler进行处理。

        protected AbstractElasticJobExecutor(final JobFacade jobFacade) {
            this.jobFacade = jobFacade;
            jobRootConfig = jobFacade.loadJobRootConfiguration(true);
            jobName = jobRootConfig.getTypeConfig().getCoreConfig().getJobName();
            executorService = ExecutorServiceHandlerRegistry.getExecutorServiceHandler(jobName, (ExecutorServiceHandler) getHandler(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER));
            jobExceptionHandler = (JobExceptionHandler) getHandler(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER);
            itemErrorMessages = new ConcurrentHashMap<>(jobRootConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(), 1);
        }

    3 执行作业

    弹性化分布式作业执行器AbstractElasticJobExecutor.java

        /**
         * 执行作业.
         */
        public final void execute() {
            try {
                jobFacade.checkJobExecutionEnvironment();  //1 
            } catch (final JobExecutionEnvironmentException cause) {
                jobExceptionHandler.handleException(jobName, cause);
            }
            ShardingContexts shardingContexts = jobFacade.getShardingContexts();  //2
            if (shardingContexts.isAllowSendJobEvent()) {
                jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName));  //3
            }
            if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
                if (shardingContexts.isAllowSendJobEvent()) {
                    jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
                            "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName, 
                            shardingContexts.getShardingItemParameters().keySet()));
                }
                return;
            }
            try {
                jobFacade.beforeJobExecuted(shardingContexts);       //4
                //CHECKSTYLE:OFF
            } catch (final Throwable cause) {
                //CHECKSTYLE:ON
                jobExceptionHandler.handleException(jobName, cause);
            }
            execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);  //5
            while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
                jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
                execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
            }
            jobFacade.failoverIfNecessary();                           //6
            try {
                jobFacade.afterJobExecuted(shardingContexts);          //7
                //CHECKSTYLE:OFF
            } catch (final Throwable cause) {
                //CHECKSTYLE:ON
                jobExceptionHandler.handleException(jobName, cause);
            }
        }

     3.1 环境监测

    检查本机与注册中心的时间误差秒数是否在允许范围

        /**
         * 检查本机与注册中心的时间误差秒数是否在允许范围.
         * 
         * @throws JobExecutionEnvironmentException 本机与注册中心的时间误差秒数不在允许范围所抛出的异常
         */
        public void checkMaxTimeDiffSecondsTolerable() throws JobExecutionEnvironmentException {
            int maxTimeDiffSeconds =  load(true).getMaxTimeDiffSeconds();
            if (-1  == maxTimeDiffSeconds) {
                return;
            }
            long timeDiff = Math.abs(timeService.getCurrentMillis() - jobNodeStorage.getRegistryCenterTime());
            if (timeDiff > maxTimeDiffSeconds * 1000L) {
                throw new JobExecutionEnvironmentException(
                        "Time different between job server and register center exceed '%s' seconds, max time different is '%s' seconds.", timeDiff / 1000, maxTimeDiffSeconds);
            }
        }

    3.2 根据分片规则进行分片

    如果需要分片且当前节点为主节点, 则作业分片.

     如果当前无可用节点则不分片.

        /**
         * 如果需要分片且当前节点为主节点, 则作业分片.
         * 
         * <p>
         * 如果当前无可用节点则不分片.
         * </p>
         */
        public void shardingIfNecessary() {
            List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
            if (!isNeedSharding() || availableJobInstances.isEmpty()) {
                return;
            }
            if (!leaderService.isLeaderUntilBlock()) {
                blockUntilShardingCompleted();
                return;
            }
            waitingOtherShardingItemCompleted();
            LiteJobConfiguration liteJobConfig = configService.load(false);
            int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount();
            log.debug("Job '{}' sharding begin.", jobName);
            jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
            resetShardingInfo(shardingTotalCount);
            JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(liteJobConfig.getJobShardingStrategyClass());
            jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
            log.debug("Job '{}' sharding complete.", jobName);
        }

    3.3 使用EventBus通知

    com.google.common.eventbus.EventBus

      /**
       * Posts an event to all registered subscribers.  This method will return
       * successfully after the event has been posted to all subscribers, and
       * regardless of any exceptions thrown by subscribers.
       *
       * <p>If no subscribers have been subscribed for {@code event}'s class, and
       * {@code event} is not already a {@link DeadEvent}, it will be wrapped in a
       * DeadEvent and reposted.
       *
       * @param event  event to post.
       */
      public void post(Object event) {
        Set<Class<?>> dispatchTypes = flattenHierarchy(event.getClass());
    
        boolean dispatched = false;
        for (Class<?> eventType : dispatchTypes) {
          subscribersByTypeLock.readLock().lock();
          try {
            Set<EventSubscriber> wrappers = subscribersByType.get(eventType);
    
            if (!wrappers.isEmpty()) {
              dispatched = true;
              for (EventSubscriber wrapper : wrappers) {
                enqueueEvent(event, wrapper);
              }
            }
          } finally {
            subscribersByTypeLock.readLock().unlock();
          }
        }
    
        if (!dispatched && !(event instanceof DeadEvent)) {
          post(new DeadEvent(this, event));
        }
    
        dispatchQueuedEvents();
      }

    3.4 job预执行,监听ElasticJobListener

        @Override
        public void beforeJobExecuted(final ShardingContexts shardingContexts) {
            for (ElasticJobListener each : elasticJobListeners) {
                each.beforeJobExecuted(shardingContexts);
            }
        }

    3.5 job执行

        private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
            if (shardingContexts.getShardingItemParameters().isEmpty()) {
                if (shardingContexts.isAllowSendJobEvent()) {
                    jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName));
                }
                return;
            }
            jobFacade.registerJobBegin(shardingContexts);//1
            String taskId = shardingContexts.getTaskId();
            if (shardingContexts.isAllowSendJobEvent()) {
                jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
            }
            try {
                process(shardingContexts, executionSource);//2
            } finally {
                // TODO 考虑增加作业失败的状态,并且考虑如何处理作业失败的整体回路
                jobFacade.registerJobCompleted(shardingContexts);
                if (itemErrorMessages.isEmpty()) {
                    if (shardingContexts.isAllowSendJobEvent()) {
                        jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
                    }
                } else {
                    if (shardingContexts.isAllowSendJobEvent()) {
                        jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
                    }
                }
            }
        }

      >>1.将job注册到注册中心

      >>2.将各个任务分片放到线程池中执行

    3.6 实现转移

    如果需要失效转移, 则执行作业失效转移.

        /**
         * 在主节点执行操作.
         * 
         * @param latchNode 分布式锁使用的作业节点名称
         * @param callback 执行操作的回调
         */
        public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
            try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
                latch.start();
                latch.await();
                callback.execute();
            //CHECKSTYLE:OFF
            } catch (final Exception ex) {
            //CHECKSTYLE:ON
                handleException(ex);
            }
        }

    3.7 作业执行后处理

    作业执行后的执行的方法

        @Override
        public void afterJobExecuted(final ShardingContexts shardingContexts) {
            for (ElasticJobListener each : elasticJobListeners) {
                each.afterJobExecuted(shardingContexts);
            }
        }

    4.Trigger 

    elasticJob默认使用Cron Trigger,在job属性里定义

      <xsd:attribute name="cron" type="xsd:string" use="required" />

    5.作业调度器JobScheduler

        /**
         * 初始化作业.
         */
        public void init() {
            LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);  //1
            JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
            JobScheduleController jobScheduleController = new JobScheduleController(
                    createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName()); //2
            JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);  //3
            schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
            jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());   //4
        }
        
        private JobDetail createJobDetail(final String jobClass) {
            JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
            result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
            Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
            if (elasticJobInstance.isPresent()) {
                result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
            } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
                try {
                    result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
                } catch (final ReflectiveOperationException ex) {
                    throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
                }
            }
            return result;
        }
        
        protected Optional<ElasticJob> createElasticJobInstance() {
            return Optional.absent();
        }
        
        private Scheduler createScheduler() {
            Scheduler result;
            try {
                StdSchedulerFactory factory = new StdSchedulerFactory();
                factory.initialize(getBaseQuartzProperties());
                result = factory.getScheduler();
                result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
            } catch (final SchedulerException ex) {
                throw new JobSystemException(ex);
            }
            return result;
        }
        
        private Properties getBaseQuartzProperties() {
            Properties result = new Properties();
            result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName());
            result.put("org.quartz.threadPool.threadCount", "1");
            result.put("org.quartz.scheduler.instanceName", liteJobConfig.getJobName());
            result.put("org.quartz.jobStore.misfireThreshold", "1");
            result.put("org.quartz.plugin.shutdownhook.class", JobShutdownHookPlugin.class.getName());
            result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());
            return result;
        }

    5.1 更新作业配置.

        /**
         * 更新作业配置.
         *
         * @param liteJobConfig 作业配置
         * @return 更新后的作业配置
         */
        public LiteJobConfiguration updateJobConfiguration(final LiteJobConfiguration liteJobConfig) {
            configService.persist(liteJobConfig);
            return configService.load(false);
        }

    5.2 初始化一系列操作

    5.2.1 创建quartz scheduler

        private Scheduler createScheduler() {
            Scheduler result;
            try {
                StdSchedulerFactory factory = new StdSchedulerFactory();
                factory.initialize(getBaseQuartzProperties());
                result = factory.getScheduler();
                result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
            } catch (final SchedulerException ex) {
                throw new JobSystemException(ex);
            }
            return result;
        }

    5.2.2 创建JobDetail

        private JobDetail createJobDetail(final String jobClass) {
            JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
            result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
            Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
            if (elasticJobInstance.isPresent()) {
                result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
            } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
                try {
                    result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
                } catch (final ReflectiveOperationException ex) {
                    throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
                }
            }
            return result;
        }
        

    5.2.3 添加作业调度控制器.

        /**
         * 添加作业调度控制器.
         * 
         * @param jobName 作业名称
         * @param jobScheduleController 作业调度控制器
         * @param regCenter 注册中心
         */
        public void registerJob(final String jobName, final JobScheduleController jobScheduleController, final CoordinatorRegistryCenter regCenter) {
            schedulerMap.put(jobName, jobScheduleController);
            regCenterMap.put(jobName, regCenter);
            regCenter.addCacheData("/" + jobName);
        }

    5.2.4 调度作业.

        /**
         * 调度作业.
         * 
         * @param cron CRON表达式
         */
        public void scheduleJob(final String cron) {
            try {
                if (!scheduler.checkExists(jobDetail.getKey())) {
                    scheduler.scheduleJob(jobDetail, createTrigger(cron));
                }
                scheduler.start();
            } catch (final SchedulerException ex) {
                throw new JobSystemException(ex);
            }
        }

    6.总结

      >>elastic-job使用了quartz的调度机制,内部原理一致,增加了性能和可用性。

      >>elastic-job使用注册中心(zookeeper)替换了quartz的jdbc数据存储方式,性能有较大提升。

     >> elastic-job增加了job的追踪(使用Listener),便于monitor

     >>elastic-job使用了分片机制,可以将job分成多个任务项,放到不同的地方执行

     >>elastic-job仅支持cronTrigger,quartz支持更多的trigger实现

  • 相关阅读:
    怎么在虚拟机下的Linux系统安装数据库
    bbs仿博客
    算法思想
    vue前后台交互
    vue项目内文件的使用和配置
    pycharm使用vue
    vue之vue-cookies
    跨域请求
    vue的常用插件
    vue的生命周期
  • 原文地址:https://www.cnblogs.com/davidwang456/p/10346013.html
Copyright © 2011-2022 走看看