zoukankan      html  css  js  c++  java
  • DolphinScheduler源码分析

    DolphinScheduler源码分析

    本博客是基于1.2.0版本进行分析,与最新版本的实现有一些出入,还请读者辩证的看待本源码分析。具体细节可能描述的不是很准确,仅供参考

    源码版本

    1.2.0

    技术框架

    所有模块均采用比较流行的SprintBoot框架

    架构图

    DolphinScheduler架构图

    重要概念

    流程定义

    在DolphinScheduler中,作业的DAG被命名为“流程定义”。

    挺不可思议的。

    流程实例

    流程实例是流程定义的实例化,可以通过手动启动或定时调度生成,流程定义每运行一次,产生一个流程实例。流程实例由Master解析流程定义生成。

    任务实例

    任务实例是流程定义中任务节点的实例化,标识着具体的任务执行状态。

    居然没有作业的概念,直接上来任务实例的概念,真是匪夷所思。

    定时

    DAG的触发频率。与DAG概念隔离,单独创建、单独管理,一个DAG可以没有与之对应的定时。

    架构说明

    Quartz

    内部对Quartz进行了一个封装,org.apache.dolphinscheduler.server.quartz.QuartzExecutors仅仅提供增加、删除作业的基础功能。其作业的状态等信息保存在数据库中以QRTZ_开头的表。

    为了将实际作业的定义与Quartz隔离,抽象了一个ProcessScheduleJob类,用它来创建JobDetail。

    该类仅仅是根据流程定义的定时等信息创建了一个CommandType.SCHEDULER类型的Command对象,然后插入了数据库,并没有的执行任务的具体逻辑。

    MasterSchedulerThread

    架构图中有一个CommandScanner,对应到源码中就是org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread类。

    这是一个扫描线程,定时扫描数据库中的 t_ds_command 表,根据不同的命令类型进行不同的业务操作。扫描的SQL如下:

    select command.* from t_ds_command command
    join t_ds_process_definition definition on command.process_definition_id = definition.id
    where definition.release_state = 1 AND definition.flag = 1
    order by command.update_time asc
    limit 1
    

    定时的默认是1秒,由Constants.SLEEP_TIME_MILLIS设置。Command的创建与执行是异步的。

    MasterSchedulerThread类查询到一个Comamand后将其转化为一个ProcessInstance,交由MasterExecThread进行执行。

    MasterSchedulerThread功能比较简单,就是负责衔接Quartz创建的Command,一个桥梁的作用。

    MasterExecThread

    org.apache.dolphinscheduler.server.master.runner.MasterExecThread负责执行ProcessInstance,功能主要是DAG任务切分、任务提交监控等其他逻辑处理。

    其实DAG切割也比较简单,首先找入度为0的任务(也就是没有任务依赖),放到准备提交队列;任务执行成功后,扫描后续的任务,如果该任务的所有依赖都成功,则执行该任务;循环处理。MasterExecThread随着DAG中所有任务的执行结束而结束。

    一个任务执行,会分别占用master和worker各一个线程,这一点不太好。

    同样,该线程在一个逻辑处理结束后,也会休眠1秒,由Constants.SLEEP_TIME_MILLIS设置。

    当然在MasterExecThread中,也没有执行具体的任务逻辑,只是创建了一个MasterTaskExecThread负责任务的“执行”。

    MasterTaskExecThread

    org.apache.dolphinscheduler.server.master.runner.MasterTaskExecThread由MasterExecThread负责创建。其功能主要就是负责任务的持久化,简单来说就是把TaskInstacne信息保存到数据库中,同时如果一个任务满足执行条件,也会把任务ID提交到TaskQueue中的。

    这个线程会每隔1秒(Constants.SLEEP_TIME_MILLIS设置)查询作业的状态,直到作业执行完毕(不管是成功还是失败)。

    这样来看,一个任务执行,会占用master2个线程。

    TaskQueue

    架构图中Master/Worker通信的重要渠道,它把待执行的队列放到了TaskQueue,由Worker获取到之后,执行具体的业务逻辑。根据技术架构介绍,这个TaskQueue是由Zookeeper实现。由此也可以看出,Master、Worker是没有直接的物理交互的。

    FetchTaskThread

    org.apache.dolphinscheduler.server.worker.runner.FetchTaskThread循环从TaskQueue中获取任务,并根据不同任务类型调用TaskScheduleThread对应执行器。每次循环依旧休眠1秒。

    FetchTaskThread会一次性查询所有任务,检查当前是否有任务。这个设计有点不合理。

    如果当前有可执行的任务,则一次性取出当前节点剩余可执行任务数量的任务ID。

    根据任务ID查询创建TaskInstance,交由TaskScheduleThread具体执行。

    由此可见FetchTaskThread每个Worker只有一个,TaskScheduleThread会有很多个。

    TaskScheduleThread

    org.apache.dolphinscheduler.server.worker.runner.TaskScheduleThread负责任务的具体执行。该线程的逻辑比较清晰,就是构造获取任务相关的文件、参数等信息,创建Process类,执行对应的命令行,然后等待其执行完毕,获取标准输出、标准错误输出、返回码等信息。

    LoggerServer

    org.apache.dolphinscheduler.server.rpc.LoggerServer跟Worker、Master属于同一级别,都是需要单独启动的进程。这就是一个RPC服务器,提供日志分片查看、刷新和下载等功能。

    项目结构

    模块

    1. dolphinscheduler-ui 前端页面模块
    2. dolphinscheduler-server 核心模块。包括master/worker等功能
    3. dolphinscheduler-common 公共模块。公共方法或类
    4. dolphinscheduler-api Restful接口。前后端交互层,与master/worker交互等功能
    5. dolphinscheduler-dao 数据操作层。实体定义、数据存储
    6. dolphinscheduler-alert 预警模块。与预警相关的方法、功能
    7. dolphinscheduler-rpc 日志查看。提供日志实时查看rpc功能
    8. dolphinscheduler-dist 与编译、分发相关的模块。没有具体逻辑功能

    源码分析方法

    1. UI功能不分析
    2. 从与UI交互的API模块开始着手看
    3. 重点分析核心功能
    4. 非核心功能仅做了解

    模块-dolphinscheduler-api

    API接口层,主要负责处理前端UI层的请求。该服务统一提供RESTful api向外部提供请求服务。 接口包括工作流的创建、定义、查询、修改、发布、下线、手工启动、停止、暂停、恢复、从该节点开始执行等等。

    涉及的API太多,不宜深入研究,只研究其大致框架、功能。具体的API列表及其使用方法可查看官方文档

    启动入口

    org.apache.dolphinscheduler.api下面有两个类:ApiApplicationServer、CombinedApplicationServer。

    从ApiApplicationServer来看就是启动一个SpringBoot应用。

    CombinedApplicationServer除了启动一个SprintBoot应用之外,还启动了LoggerServer、AlertServer。

    @SpringBootApplication
    @ConditionalOnProperty(prefix = "server", name = "is-combined-server", havingValue = "true")
    @ServletComponentScan
    @ComponentScan("org.apache.dolphinscheduler")
    @Import({MasterServer.class, WorkerServer.class})
    @EnableSwagger2
    public class CombinedApplicationServer extends SpringBootServletInitializer {
    
        public static void main(String[] args) throws Exception {
    
            ApiApplicationServer.main(args);
    
            LoggerServer server = new LoggerServer();
            server.start();
    
            AlertServer alertServer = AlertServer.getInstance();
            alertServer.start();
        }
    }
    

    CombinedApplicationServer与ApiApplicationServer的区别:是否内嵌LoggerServer、AlertServer。而且当server.is-combined-server为true时,会自动启动CombinedApplicationServer。

    也不知道是否内嵌的意义在哪里,直接内嵌不好么?

    对于SpringBoot应用,接口一般都在controller中。org.apache.dolphinscheduler.api.controller包有以下几个Controller: AccessTokenController ProcessInstanceController AlertGroupController ProjectController BaseController QueueController DataAnalysisController ResourcesController DataSourceController SchedulerController ExecutorController TaskInstanceController LoggerController TaskRecordController LoginController TenantController MonitorController UsersController ProcessDefinitionController WorkerGroupController

    因为在DolphinScheduler调度中最重要的一个概念就是流程定义,所以我们从ProcessDefinitionController入手简要分析这个模块的基本功能。

    ProcessDefinitionController

    流程定义接口API列表

    官方文档中,可以看到org.apache.dolphinscheduler.api.controller.ProcessDefinitionController大概有14个接口。

    ProcessDefinitionController中只有一个字段ProcessDefinitionService,从名称以及自身经验来看,可以知道ProcessDefinitionController会负责HTTP请求的参数解析、参数校验、返回值等等,与业务无关的逻辑;具体的业务逻辑会交给ProcessDefinitionService类处理。

    由此我们可以类比分析其他所有的controller,都会有一个对应的service处理业务相关的逻辑。

    public Result createProcessDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
                                      @ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName,
                                      @RequestParam(value = "name", required = true) String name,
                                      @RequestParam(value = "processDefinitionJson", required = true) String json,
                                      @RequestParam(value = "locations", required = true) String locations,
                                      @RequestParam(value = "connects", required = true) String connects,
                                      @RequestParam(value = "description", required = false) String description) {
    
        try {
            logger.info("login user {}, create  process definition, project name: {}, process definition name: {}, " +
                            "process_definition_json: {}, desc: {} locations:{}, connects:{}",
                    loginUser.getUserName(), projectName, name, json, description, locations, connects);
            Map<String, Object> result = processDefinitionService.createProcessDefinition(loginUser, projectName, name, json,
                    description, locations, connects);
            return returnDataList(result);
        } catch (Exception e) {
            logger.error(Status.CREATE_PROCESS_DEFINITION.getMsg(), e);
            return error(Status.CREATE_PROCESS_DEFINITION.getCode(), Status.CREATE_PROCESS_DEFINITION.getMsg());
        }
    }
    

    上面是createProcessDefinition的源码,逻辑比较清晰,就是接收、校验HTTP的参数,然后调用processDefinitionService.createProcessDefinition函数,返回结果、处理异常。

    但这段代码有一个controller与service分隔不清的地方:HTTP返回的结果由谁处理。此处返回结果是由service负责的,service会创建一个Map<String, Object>类型的result字段,然后调用result.put("processDefinitionId",processDefine.getId());设置最终返回的数据。其实个人是不敢苟同这种做法的,严格来说,service只返回与业务相关的实体,HTTP具体返回什么信息应该交由controller处理。

    ProcessDefinitionService

    org.apache.dolphinscheduler.api.service.ProcessDefinitionService承担流程定义具体的CURD逻辑,调用各种mapper、dao。

    public Map<String, Object> createProcessDefinition(User loginUser, String projectName, String name,
                                                           String processDefinitionJson, String desc, String locations, String connects) throws JsonProcessingException {
    
            Map<String, Object> result = new HashMap<>(5);
            Project project = projectMapper.queryByName(projectName);
            // check project auth
            Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
            Status resultStatus = (Status) checkResult.get(Constants.STATUS);
            if (resultStatus != Status.SUCCESS) {
                return checkResult;
            }
    
            ProcessDefinition processDefine = new ProcessDefinition();
            Date now = new Date();
    
            ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
            Map<String, Object> checkProcessJson = checkProcessNodeList(processData, processDefinitionJson);
            if (checkProcessJson.get(Constants.STATUS) != Status.SUCCESS) {
                return checkProcessJson;
            }
    
            processDefine.setName(name);
            processDefine.setReleaseState(ReleaseState.OFFLINE);
            processDefine.setProjectId(project.getId());
            processDefine.setUserId(loginUser.getId());
            processDefine.setProcessDefinitionJson(processDefinitionJson);
            processDefine.setDescription(desc);
            processDefine.setLocations(locations);
            processDefine.setConnects(connects);
            processDefine.setTimeout(processData.getTimeout());
            processDefine.setTenantId(processData.getTenantId());
    
            //custom global params
            List<Property> globalParamsList = processData.getGlobalParams();
            if (globalParamsList != null && globalParamsList.size() > 0) {
                Set<Property> globalParamsSet = new HashSet<>(globalParamsList);
                globalParamsList = new ArrayList<>(globalParamsSet);
                processDefine.setGlobalParamList(globalParamsList);
            }
            processDefine.setCreateTime(now);
            processDefine.setUpdateTime(now);
            processDefine.setFlag(Flag.YES);
            processDefineMapper.insert(processDefine);
            putMsg(result, Status.SUCCESS);
            result.put("processDefinitionId",processDefine.getId());
            return result;
        }
    

    研读上面代码我们知道createProcessDefinition大概有以下功能:

    1. 校验当前用户是否拥有所属项目的权限
    2. 校验流程定义JSON是否合法。例如是否有环
    3. 构造ProcessDefinition对象插入数据库
    4. 设置HTTP返回结果

    因为这些都不是核心逻辑,都不再深入展开。

    ProcessDefinitionService的功能非常不合理,居然还有鉴权的功能,按照我的理解,有一个校验、插入数据库的功就可以了,其他的功能都可以抛出去

    dolphinscheduler-api其他的功能都不在分析,因为到此流程定义信息已经写入到了数据库,跟API模块已经没有关系了。但需要知道ProcessDefinition对象插入到了哪张表,这样才知道如何查询、更新这个表的。这个表就是前后台逻辑交互的关键。从ProcessDefinition定义可以看出,数据最终插入了t_ds_process_definition表。

    @Data
    @TableName("t_ds_process_definition")
    public class ProcessDefinition
    

    其实也可以不用关注具体插入到了哪张表,好像只需要关系哪个地方用ProcessDefinitionMapper查询了数据就行了。

    但根据之前的概念定义,我们知道每个流程定义是需要靠“定时”周期性触发的,这样的话我们可以猜测,系统并不会直接用ProcessDefinitionMapper查询流程定义,而是会根据定时关联的ProcessDefinition来调起DAG。这一点在MasterSchedulerThread的分析中已经可以看出来了。

    SchedulerController、SchedulerService

    考虑到Controler逻辑非常简单(不合理),此处将controller和service合并分析。

    同样SchedulerController几乎没有什么逻辑,全都交给了SchedulerService层。这里只分析SchedulerService.insertSchedule,简单浏览代码后,可以发现它跟createProcessDefinition逻辑差不多:

    1. 校验当前用户是否拥有所属项目的权限
    2. 校验流程定义JSON是否合法。例如是否有环
    3. 构造Schedule对象插入数据库
    4. 设置HTTP返回结果

    当然除了上面4点还查询、更新了ProcessDefinition,主要是将Schedule和ProcessDefinition进行关联。

    MasterSchedulerThread

    MasterSchedulerThread

    以上是MasterSchedulerThread类的概览图。

    MasterSchedulerThread实现Runnable接口,很明显主要的逻辑应该在run方法内,而且根据经验以及前面的分析可以知道,这个方法内是一个“死”循环,且为了避免CPU飙升,会休眠一小段时间。

    下面我们逐步展开、分析MasterSchedulerThread类

     MasterSchedulerThread-run

    从上图简单分析,总结一下run的逻辑:

    1. 调用OSUtils.checkResource,检查当前资源(内存、CPU)。
    2. 资源超出阈值,则休眠1秒进入下一次循环。
    3. 检查zookeeper是否连接成功
    4. 获取一个InterProcessMutex锁(分布式的公平可重入互斥锁)。也就是只有一个master可以获取到这个锁
    5. 查询一个Command,不为null时进行后续逻辑。
    6. 休眠1秒,进入下一次循环
    7. 进入下一次循环之前,释放InterProcessMutex锁

    在深入分析run之前,先简单分析一下 Stopper.isRunning() 的逻辑。

    /**
     *  if the process closes, a signal is placed as true, and all threads get this flag to stop working
     */
    public class Stopper {
    
    	private static volatile AtomicBoolean signal = new AtomicBoolean(false);
    	
    	public static final boolean isStoped(){
    		return signal.get();
    	}
    	
    	public static final boolean isRunning(){
    		return !signal.get();
    	}
    	
    	public static final void stop(){
    		signal.getAndSet(true);
    	}
    }
    

    其逻辑非常简单,就是用一个原子布尔值,标志当前进程是否要退出。如果收到了退出信号,则signal为true,该进程内所有的线程都退出当前循环。

    下面我们来分析查询到一个Command之后的逻辑:

    if (command != null) {
        logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString()));
    
        try{
            processInstance = processDao.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command);
            if (processInstance != null) {
                logger.info("start master exec thread , split DAG ...");
                masterExecService.execute(new MasterExecThread(processInstance,processDao));
            }
        }catch (Exception e){
            logger.error("scan command error ", e);
            processDao.moveToErrorCommand(command, e.toString());
        }
    }
    

    其实就是根据Command创建了一个ProcessInstance(流程实例),之前也分析过,流程定义是由Scheduler自动创建的,而Quartz已经根据Schedule信息创建了Command保存到了数据库。至此,流程定义与定时的关联逻辑就已经串起来了。

    创建流程实例的时候传入了当前可用(masterExecThreadNum - activeCount)的线程数量,如果满足当前dag,则返回ProcessInstance,否则返回null。

    ProcessInstance最终交由MasterExecThread去执行。

    至此MasterSchedulerThread类的主要逻辑如下:

    1. 调用OSUtils.checkResource,检查当前资源(内存、CPU)。
    2. 资源超出阈值,则休眠1秒进入下一次循环。
    3. 检查zookeeper是否连接成功
    4. 获取一个InterProcessMutex锁(分布式的公平可重入互斥锁)。也就是只有一个master可以获取到这个锁
    5. 查询一个Command,如果当前线程数够用,则创建一个流程实例(ProcessInstance),交给MasterExecThread线程处理。
    6. 休眠1秒,进入下一次循环
    7. 进入下一次循环之前,释放InterProcessMutex锁

    在结束MasterExecThread的源码分析之前,我们再简要分析一下这个类比较重要的一个字段:processDao。

    ProcessDao

    这个类,可以看成是与流程定义相关的操作集合,与流程定义存储相关的操作、逻辑的集合。

    processDao.moveToErrorCommand需要稍微注意一下,在异常情况下,它把Command从原来的表中删除,然后插入到了t_ds_error_command表。

    但个人感觉其定义不是非常清晰。如果是mapper的一个全集,则其他任何地方都不应该再调用mapper,事实又不是这样;如果只是流程定义相关的操作, 其功能又过于大。

    MasterExecThread

    与MasterSchedulerThread一样,MasterExecThread也是实现了Runnable的线程类,不过我们先来看MasterExecThread的构造函数。

    public MasterExecThread(ProcessInstance processInstance,ProcessDao processDao){
        this.processDao = processDao;
    
        this.processInstance = processInstance;
    
        int masterTaskExecNum = conf.getInt(Constants.MASTER_EXEC_TASK_THREADS,
                Constants.defaultMasterTaskExecNum);
        this.taskExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Task-Exec-Thread",
                masterTaskExecNum);
    }
    

    taskExecService这个字段非常重要,它是一个固定大小(20)的后台线程池。这意味着,一个DAG最大的并发任务数就是20。

    另外细心的读者发现,conf字段是一个static字段,在static代码块初始化的。为啥不从MasterSchedulerThread传过来呢?我还以为要自动reload呢,结果也没有。

    static {
        try {
            conf = new PropertiesConfiguration(Constants.MASTER_PROPERTIES_PATH);
        }catch (ConfigurationException e){
            logger.error("load configuration failed : " + e.getMessage(),e);
            System.exit(1);
        }
    }
    

    配置这种字段,完全可以全局唯一,到处传参,没必要在new一个。一般情况下这个类的内容也不会修改。

    下面分析该类的run方法。

    @Override
    public void run() {
    
        // process instance is null
        if (processInstance == null){
            logger.info("process instance is not exists");
            return;
        }
    
        // check to see if it's done
        if (processInstance.getState().typeIsFinished()){
            logger.info("process instance is done : {}",processInstance.getId());
            return;
        }
    
        try {
            if (processInstance.isComplementData() &&  Flag.NO == processInstance.getIsSubProcess()){
                // sub process complement data
                executeComplementProcess();
            }else{
                // execute flow
                executeProcess();
            }
        }catch (Exception e){
            logger.error("master exec thread exception: " + e.getMessage(), e);
            logger.error("process execute failed, process id:{}", processInstance.getId());
            processInstance.setState(ExecutionStatus.FAILURE);
            processInstance.setEndTime(new Date());
            processDao.updateProcessInstance(processInstance);
        }finally {
            taskExecService.shutdown();
            // post handle
            postHandle();
        }
    }
    

    分析源码后,简要总结其逻辑如下:

    1. 判断processInstance是否为null。为null则退出
    2. 判断processInstance是否已经完成(成功、报错、取消、暂停、等待)
    3. 判断是否为补数。是则走补数的逻辑
    4. 执行当前流程定义实例(executeProcess)
    5. 调用taskExecService.shutdown(),等待所有线程正常退出

    感觉第一步有点多此一举。

    executeProcess按顺序调用了prepareProcess、runProcess、endProcess三个方法,简单来说就是初始化、执行、释放资源。 prepareProcess又按顺序调用了initTaskQueue、buildFlowDag。

    initTaskQueue就是一些资源的初始化操作,比如通过流程定义ID查询到当前的任务实例。下面是其核心逻辑,可以发现,就是查询了完成的任务列表,报错且不能重试的任务列表。

    List<TaskInstance> taskInstanceList = processDao.findValidTaskListByProcessId(processInstance.getId());
    for(TaskInstance task : taskInstanceList){
        if(task.isTaskComplete()){
            completeTaskList.put(task.getName(), task);
        }
        if(task.getState().typeIsFailure() && !task.taskCanRetry()){
            errorTaskList.put(task.getName(), task);
        }
    }
    

    buildFlowDag看名字应该是生成DAG实例的,代码虽短,但调用了好几个函数,我们只重点分析最后一个函数调用。

    private void buildFlowDag() throws Exception {
        recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam());
    
        forbiddenTaskList = DagHelper.getForbiddenTaskNodeMaps(processInstance.getProcessInstanceJson());
        // generate process to get DAG info
        List<String> recoveryNameList = getRecoveryNodeNameList();
        List<String> startNodeNameList = parseStartNodeName(processInstance.getCommandParam());
        ProcessDag processDag = generateFlowDag(processInstance.getProcessInstanceJson(),
                startNodeNameList, recoveryNameList, processInstance.getTaskDependType());
        if(processDag == null){
            logger.error("processDag is null");
            return;
        }
        // generate process dag
        dag = DagHelper.buildDagGraph(processDag);
    }
    

    DagHelper.buildDagGraph生成了一个DAG对象实例,根据名字和注释猜测,这应该是对有向无环图的一个抽象。

    /**
     * the object of DAG
     */
    private DAG<String,TaskNode,TaskNodeRelation> dag;
    

    来看下DAG类的定义

    /**
     * analysis of DAG
     * Node: node
     * NodeInfo:node description information
     * EdgeInfo: edge description information
     */
    public class DAG<Node, NodeInfo, EdgeInfo>
    

    DAG有三个类型参数,分别代表节点key、节点信息、边信息。

    下面是TaskNode的字段

     TaskNode

    如果读者用过DolphinScheduler的UI的话,发现TaskNode的字段跟UI一一对应。

    TaskNodeRelation

    TaskNodeRelation代表边的信息,字段比较少,只有startNode、endNode两个String类型的字段。这其实是DAG类的第一个类型参数,节点的key。

    public static DAG<String, TaskNode, TaskNodeRelation> buildDagGraph(ProcessDag processDag) {
    
        DAG<String,TaskNode,TaskNodeRelation> dag = new DAG<>();
    
        /**
         * add vertex
         */
        if (CollectionUtils.isNotEmpty(processDag.getNodes())){
            for (TaskNode node : processDag.getNodes()){
                dag.addNode(node.getName(),node);
            }
        }
    
        /**
         * add edge
         */
        if (CollectionUtils.isNotEmpty(processDag.getEdges())){
            for (TaskNodeRelation edge : processDag.getEdges()){
                dag.addEdge(edge.getStartNode(),edge.getEndNode());
            }
        }
        return dag;
    }
    

    上面是buildDagGraph的源码。可以看出,增加节点时,第一个参数是TaskNode的getName。跟猜测的一样,DAG的第一个参数就是node的key,而key就是名称。

    细心的读者一定发现,DAG对象是根据ProcessDag来创建的,二者有啥区别呢?

    ProcessDag

    其实个人感觉区别不大,非要说区别的话就是DAG把节点、边的一个List转化成了一个Graph。

    初始化完成之后,来看一下具体如何执行流程定义的。

    runProcess

    这个方法源码很长,我们首先从整体简要分析。

    1. submitPostNode(null)
    2. 起一个while循环,直至流程定义实例停止(成功、失败、取消、暂停、等待)
    3. 首先判断是否超时,超时则发送预警邮件
    4. 获取当前活动的任务节点的Map。key是MasterBaseTaskExecThread对象,value是Future<Boolean>。value其实是MasterBaseTaskExecThread线程的当前状态。
    5. 如果当前任务实例已经结束,则从Map中移除
    6. 如果当前任务实例成功,则put到completeTaskList且调用submitPostNode(task.getName())
    7. 如果当前任务实例失败,则重试;否则直接结束(比如手动停止或暂停)
    8. 更新当前流程定义实例的状态,进入下一个循环

    其中activeTaskNode是一个非常重要的对象,从上面的分析中,可以猜测,activeTaskNode是由submitPostNode间接生成赋值的,并通过while循环驱动了整个流程实例的执行。

    private void submitPostNode(String parentNodeName){
    
        List<TaskInstance> submitTaskList = null;
        if(parentNodeName == null){
            submitTaskList = getStartSubmitTaskList();
        }else{
            submitTaskList = getPostTaskInstanceByNode(dag, parentNodeName);
        }
        // if previous node success , post node submit
        for(TaskInstance task : submitTaskList){
            if(readyToSubmitTaskList.containsKey(task.getName())){
                continue;
            }
    
            if(completeTaskList.containsKey(task.getName())){
                logger.info("task {} has already run success", task.getName());
                continue;
            }
            if(task.getState().typeIsPause() || task.getState().typeIsCancel()){
                logger.info("task {} stopped, the state is {}", task.getName(), task.getState().toString());
            }else{
                addTaskToStandByList(task);
            }
        }
    }
    

    submitPostNode的源码细节不再深入分析,大概就是从dag对象中找出入度为0的节点,放入到准备队列中。其实在runProcess方法中,还调用了submitStandByTask方法,该方法最终调起了可以执行的节点。从这点来看,整个流程实例由submitPostNode、submitStandByTask和while驱动。

    那么问题来了,流程实例的任务具体是怎么调起来的呢?下面是submitStandByTask方法中调用的最重要的函数,也是由它调起来的。

    /**
     * submit task to execute
     * @param taskInstance task instance
     * @return TaskInstance
     */
    private TaskInstance submitTaskExec(TaskInstance taskInstance) {
        MasterBaseTaskExecThread abstractExecThread = null;
        if(taskInstance.isSubProcess()){
            abstractExecThread = new SubProcessTaskExecThread(taskInstance, processInstance);
        }else {
            abstractExecThread = new MasterTaskExecThread(taskInstance, processInstance);
        }
        Future<Boolean> future = taskExecService.submit(abstractExecThread);
        activeTaskNode.putIfAbsent(abstractExecThread, future);
        return abstractExecThread.getTaskInstance();
    }
    

    逻辑也比较简单,就是把TaskInstance交给MasterTaskExecThread去执行;taskExecService提交之后,放到activeTaskNode列表,交由主逻辑判断任务是否完成。

    MasterTaskExecThread

    根据其定义,我们知道MasterTaskExecThread继承了MasterBaseTaskExecThread,且构造函数简单的调用了父类的构造函数。

    public class MasterTaskExecThread extends MasterBaseTaskExecThread
    

    MasterBaseTaskExecThread的构造函数也比较简单,给几个关键的字段赋初始值。

    /**
     * constructor of MasterBaseTaskExecThread
     * @param taskInstance      task instance
     * @param processInstance   process instance
     */
    public MasterBaseTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){
        this.processDao = BeanContext.getBean(ProcessDao.class);
        this.alertDao = BeanContext.getBean(AlertDao.class);
        this.processInstance = processInstance;
        this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
        this.cancel = false;
        this.taskInstance = taskInstance;
    }
    

    但processDao、alertDao居然是通过BeanContext.getBean获取到的!!!个人感觉这是一个非常恶心的设计。一个优秀的设计,应该是类的创建者负责子类的参数及其功能的边界。BeanContext.getBean扩展了所有类与SpringBoot的ApplicationContext间接打交道的能力,而且无法控制,因为只要调用BeanContext.getBean都可以获取到对应的bean进行操作。

    MasterBaseTaskExecThread实现了Callable<Boolean>接口,call方法又调用了submitWaitComplete,MasterTaskExecThread类中对改方法进行了覆盖。

    submitWaitComplete根据名称及其注释说明可以知道,它提交了一个任务实例,然后等待其完成。

    /**
     * submit task instance and wait complete
     * @return true is task quit is true
     */
    @Override
    public Boolean submitWaitComplete() {
        Boolean result = false;
        this.taskInstance = submit();
        if(!this.taskInstance.getState().typeIsFinished()) {
            result = waitTaskQuit();
        }
        taskInstance.setEndTime(new Date());
        processDao.updateTaskInstance(taskInstance);
        logger.info("task :{} id:{}, process id:{}, exec thread completed ",
                this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() );
        return result;
    }
    

    该函数的逻辑简单来说就是,提交一个任务实例,等待任务完成,更新任务结束时间到数据。

    我们可以看出,每个任务实例都可以更新数据库,加上其他线程,对数据库的压力可能很大。如果任务非常多,并发非常大的情况下,jdbc连接线程池需要适当调大。否则,数据库会成为系统瓶颈。如果worker节点个数过多,这种压力又会几何倍数的增长。

    首先来看看作业是如何提交的,好像也比较简单,就是调用了processDao.submitTask。

    protected TaskInstance submit(){
        Integer commitRetryTimes = conf.getInt(Constants.MASTER_COMMIT_RETRY_TIMES,
                Constants.defaultMasterCommitRetryTimes);
        Integer commitRetryInterval = conf.getInt(Constants.MASTER_COMMIT_RETRY_INTERVAL,
                Constants.defaultMasterCommitRetryInterval);
    
        int retryTimes = 1;
    
        while (retryTimes <= commitRetryTimes){
            try {
                TaskInstance task = processDao.submitTask(taskInstance, processInstance);
                if(task != null){
                    return task;
                }
                logger.error("task commit to mysql and queue failed , task has already retry {} times, please check the database", commitRetryTimes);
                Thread.sleep(commitRetryInterval);
            } catch (Exception e) {
                logger.error("task commit to mysql and queue failed : " + e.getMessage(),e);
            }
            retryTimes += 1;
        }
        return null;
    }
    

    根据前面的分析我们知道processDao就是跟数据库打交道的,难道这里就是把任务实例插入到了数据?

    @Transactional(rollbackFor = Exception.class)
    public TaskInstance submitTask(TaskInstance taskInstance, ProcessInstance processInstance){
        logger.info("start submit task : {}, instance id:{}, state: {}, ",
                taskInstance.getName(), processInstance.getId(), processInstance.getState() );
        processInstance = this.findProcessInstanceDetailById(processInstance.getId());
        //submit to mysql
        TaskInstance task= submitTaskInstanceToMysql(taskInstance, processInstance);
        if(task.isSubProcess() && !task.getState().typeIsFinished()){
            ProcessInstanceMap processInstanceMap = setProcessInstanceMap(processInstance, task);
    
            TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class);
            Map<String, String> subProcessParam = JSONUtils.toMap(taskNode.getParams());
            Integer defineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));
            createSubWorkProcessCommand(processInstance, processInstanceMap, defineId, task);
        }else if(!task.getState().typeIsFinished()){
            //submit to task queue
            task.setProcessInstancePriority(processInstance.getProcessInstancePriority());
            submitTaskToQueue(task);
        }
        logger.info("submit task :{} state:{} complete, instance id:{} state: {}  ",
                taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState());
        return task;
    }
    

    这段代码优点复杂,但只需要看其主干逻辑就好了。也就是调用submitTaskInstanceToMysql把任务实例插入到数据库,然后调用submitTaskToQueue(目前还看不出插入到了哪里)。

    submitTaskInstanceToMysql不再贴源码分析,与函数名差不多,就是把instance插入到数据库。

    submitTaskToQueue主干逻辑就是把taskInstance添加到了TaskQueue。

    public Boolean submitTaskToQueue(TaskInstance taskInstance) {
    
        try{
            // task cannot submit when running
            if(taskInstance.getState() == ExecutionStatus.RUNNING_EXEUTION){
                logger.info(String.format("submit to task queue, but task [%s] state already be running. ", taskInstance.getName()));
                return true;
            }
            if(checkTaskExistsInTaskQueue(taskInstance)){
                logger.info(String.format("submit to task queue, but task [%s] already exists in the queue.", taskInstance.getName()));
                return true;
            }
            logger.info("task ready to queue: {}" , taskInstance);
            taskQueue.add(DOLPHINSCHEDULER_TASKS_QUEUE, taskZkInfo(taskInstance));
            logger.info(String.format("master insert into queue success, task : %s", taskInstance.getName()) );
            return true;
        }catch (Exception e){
            logger.error("submit task to queue Exception: ", e);
            logger.error("task queue error : %s", JSONUtils.toJson(taskInstance));
            return false;
    
        }
    }
    

    taskQueue是TaskQueueFactory.getTaskQueueInstance()创建的。

    protected ITaskQueue taskQueue = TaskQueueFactory.getTaskQueueInstance();
    

    getTaskQueueInstance其实就是调用了TaskQueueZkImpl.getInstance(),这应该是一个遗留接口。估计设计初期是想根据配置创建不同的任务队列,比如redis或者其他,目前只支持zookeeper。

    public static ITaskQueue getTaskQueueInstance() {
      String queueImplValue = CommonUtils.getQueueImplValue();
      if (StringUtils.isNotBlank(queueImplValue)) {
          logger.info("task queue impl use zookeeper ");
          return TaskQueueZkImpl.getInstance();
      }else{
        logger.error("property dolphinscheduler.queue.impl can't be blank, system will exit ");
        System.exit(-1);
      }
    
      return null;
    }
    

    既然只支持zookeeper,这段冗余代码应该删除的

    这样来看submitTaskToQueue就是调用TaskQueueZkImpl.add方法,把任务实例插入到了zookeeper实现的队列中。

    /**
     * add task to tasks queue
     *
     * @param key      task queue name
     * @param value    ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,...
     */
    @Override
    public void add(String key, String value) {
        try {
            String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH + value;
            String result = getZkClient().create().withMode(CreateMode.PERSISTENT).forPath(taskIdPath, Bytes.toBytes(value));
    
            logger.info("add task : {} to tasks queue , result success",result);
        } catch (Exception e) {
            logger.error("add task to tasks queue exception",e);
        }
    
    }
    

    从上下文我们知道,这里的key就是tasks_queue;根据注释,value就是 ${processInstancePriority}${processInstanceId}${taskInstancePriority}_${taskId}_host1,host2...

    这样来看,add就是在zk的tasks_queue父节点下创建子节点,子节点的data就是value的值。

    submit的逻辑分析完毕,来继续分析submitWaitComplete的剩余主要逻辑:waitTaskQuit。

    waitTaskQuit

    waitTaskQuit代码比较多,先从整体来分析其逻辑:

    1. 通过taskInstance.id查询taskInstance。其实就是查询taskInstance的最新状态。
    2. 通过参数判断是否启用超时检查
    3. 一个while“死循环”。
    4. while中判断任务是否执行结束,是则退出
    5. 获取任务实例、流程实例最新状态
    6. 休眠1秒,进入下一次while循环

    简单来说waitTaskQuit就是循环查看任务实例的状态,直至其成功。

    MasterTaskExecThread的功能整体来看就是把任务实例信息插入到数据库,并放到zookeeper队列,然后循环等待任务实例的状态变成完成,并没有任何具体的执行逻辑。

    Stopper.isRunning()作为一个全局变量,控制了N多的线程,每个线程都处于一个while“死循环”中。虽然都sleep一段时间,但感觉还是有点浪费。

    至此,master涉及的5个主要线程,已经分析了四个(SubProcessTaskExecThread没有分析),主要功能分析结束。下面就分析一下master的启动过程。

    MasterServer

    MasterServer

    先看MasterServer源码概览,它是一个SpringBoot普通应用,可以有Autowired字段。有三个主要的方法:run/stop/heartBeatThread。根据经验和注释大胆猜测一下,run是master的主要启动逻辑;stop负责优雅退出(销毁资源、容灾等);heartBeatThread负责与zk的心跳。

    这次分析,我们先从非主干逻辑分析,那就是heartBeatThread。

    private Runnable heartBeatThread(){
        Runnable heartBeatThread  = new Runnable() {
            @Override
            public void run() {
                if(Stopper.isRunning()) {
                    // send heartbeat to zk
                    if (StringUtils.isBlank(zkMasterClient.getMasterZNode())) {
                        logger.error("master send heartbeat to zk failed: can't find zookeeper path of master server");
                        return;
                    }
    
                    zkMasterClient.heartBeatForZk(zkMasterClient.getMasterZNode(), Constants.MASTER_PREFIX);
                }
            }
        };
        return heartBeatThread;
    }
    

    heartBeatThread创建了一个线程,该线程就是调用了zkMasterClient.heartBeatForZk。

    public void heartBeatForZk(String znode, String serverType){
    	try {
    
    		//check dead or not in zookeeper
    		if(zkClient.getState() == CuratorFrameworkState.STOPPED || checkIsDeadServer(znode, serverType)){
    			stoppable.stop("i was judged to death, release resources and stop myself");
    			return;
    		}
    
    		byte[] bytes = zkClient.getData().forPath(znode);
    		String resInfoStr = new String(bytes);
    		String[] splits = resInfoStr.split(Constants.COMMA);
    		if (splits.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){
    			return;
    		}
    		String str = splits[0] + Constants.COMMA
    				+ splits[1] + Constants.COMMA
    				+ OSUtils.cpuUsage() + Constants.COMMA
    				+ OSUtils.memoryUsage() + Constants.COMMA
    				+ OSUtils.loadAverage() + Constants.COMMA
    				+ splits[5] + Constants.COMMA
    				+ DateUtils.dateToString(new Date());
    		zkClient.setData().forPath(znode,str.getBytes());
    
    	} catch (Exception e) {
    		logger.error("heartbeat for zk failed : " + e.getMessage(), e);
    		stoppable.stop("heartbeat for zk exception, release resources and stop myself");
    	}
    }
    

    zkMasterClient.heartBeatForZk就是在master对应的zookeeper目录下,更新data值,data主要包含当前系统的资源信息:CPU、内存、平均负载。还有最后一次更新的时间。

    我们注意到zkMasterClient的类型是ZKMasterClient,那是不是还会有一个功能类似的ZKWorkerClient?也是用来汇报worker节点的系统资源信息的?

    public synchronized void stop(String cause) {
    
        try {
            //execute only once
            if(Stopper.isStoped()){
                return;
            }
    
            logger.info("master server is stopping ..., cause : {}", cause);
    
            // set stop signal is true
            Stopper.stop();
    
            try {
                //thread sleep 3 seconds for thread quitely stop
                Thread.sleep(3000L);
            }catch (Exception e){
                logger.warn("thread sleep exception:" + e.getMessage(), e);
            }
            try {
                heartbeatMasterService.shutdownNow();
            }catch (Exception e){
                logger.warn("heartbeat service stopped exception");
            }
    
            logger.info("heartbeat service stopped");
    
            //close quartz
            try{
                QuartzExecutors.getInstance().shutdown();
            }catch (Exception e){
                logger.warn("Quartz service stopped exception:{}",e.getMessage());
            }
    
            logger.info("Quartz service stopped");
    
            try {
                ThreadPoolExecutors.getInstance().shutdown();
            }catch (Exception e){
                logger.warn("threadpool service stopped exception:{}",e.getMessage());
            }
    
            logger.info("threadpool service stopped");
    
            try {
                masterSchedulerService.shutdownNow();
            }catch (Exception e){
                logger.warn("master scheduler service stopped exception:{}",e.getMessage());
            }
    
            logger.info("master scheduler service stopped");
    
            try {
                zkMasterClient.close();
            }catch (Exception e){
                logger.warn("zookeeper service stopped exception:{}",e.getMessage());
            }
    
            logger.info("zookeeper service stopped");
    
    
        } catch (Exception e) {
            logger.error("master server stop exception : " + e.getMessage(), e);
            System.exit(-1);
        }
    }
    

    来看stop,它是一个同步方法(synchronized)。为了线程安全,这一点还是比较谨慎的。还会调用Stopper.isStoped(),以便只能执行一次。

    后面的逻辑就比较简单了,总结如下:

    1. Stopper.stop()。关闭全部线程的循环标志
    2. 休眠3秒
    3. heartbeatMasterService.shutdownNow
    4. QuartzExecutors.getInstance().shutdown
    5. ThreadPoolExecutors.getInstance().shutdown
    6. masterSchedulerService.shutdownNow
    7. zkMasterClient.close

    读者要细心的分析shutdownNow和shutdown的区别。一些重要的线程还是要等待其全部执行完才能退出的,比如ThreadPoolExecutors。

    但上面退出的顺序就值得商榷了。假如ThreadPoolExecutors等了很久才退出,就会造成zkMasterClient退出时间也非常久。现在还不知道其他master节点是怎么进行容灾的。假如通过HeartBeat,此时heartBeat已经停止了,应该容灾,但任务线程池还在执行,其他节点又重复启动了流程定义实例是否会有影响呢?如果通过zookeeper心跳,此时任务也没有结束,zookeeper还在连接,貌似也没法容灾吧。

    从上面的分析来看,在各个while循环处理Stopper.isRunning()时,并没有做相应的退出动作,所以此处的stop并不优雅。不是说master优雅退出了,其他节点就是优雅的退出。

    考虑到run源码比较长,且都是一些线程初始化、提交的逻辑,下面只分析最后一段代码。

    // start QuartzExecutors
    // what system should do if exception
    try {
        ProcessScheduleJob.init(processDao);
        QuartzExecutors.getInstance().start();
    } catch (Exception e) {
        try {
            QuartzExecutors.getInstance().shutdown();
        } catch (SchedulerException e1) {
            logger.error("QuartzExecutors shutdown failed : " + e1.getMessage(), e1);
        }
        logger.error("start Quartz failed : " + e.getMessage(), e);
    }
    
    
    /**
     *  register hooks, which are called before the process exits
     */
    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
        @Override
        public void run() {
            if (zkMasterClient.getActiveMasterNum() <= 1) {
                for (int i = 0; i < Constants.DOLPHINSCHEDULER_WARN_TIMES_FAILOVER; i++) {
                    zkMasterClient.getAlertDao().sendServerStopedAlert(
                            1, OSUtils.getHost(), "Master-Server");
                }
            }
            stop("shutdownhook");
        }
    }));
    

    ProcessScheduleJob.init就是给ProcessScheduleJob一个static字段赋值,也就是给所有的ProcessScheduleJob一个全局的processDao

    public static void init(ProcessDao processDao) {
        ProcessScheduleJob.processDao = processDao;
    }
    

    感觉源码中关于processDao的处理有点模糊不清,比较随意。有些是传参,有些是getBean,有些又是全局变量。好乱,好乱。

    addShutdownHook的逻辑就比较清晰了,就是添加了进程退出的hook。先发送预警信息,然后调用stop“优雅”退出。

    FetchTaskThread

    worker涉及的线程主要有两个FetchTaskThread、TaskScheduleThread。

    FetchTaskThread从名称上来看,应该是从zk队列拉取任务信息的。它也是Runnable的一个实现类,还是从run方法入手分析。

    @Override
    public void run() {
        while (Stopper.isRunning()){
            InterProcessMutex mutex = null;
            try {
                ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService;
                //check memory and cpu usage and threads
                boolean runCheckFlag = OSUtils.checkResource(this.conf, false) && checkThreadCount(poolExecutor);
    
                Thread.sleep(Constants.SLEEP_TIME_MILLIS);
    
                if(!runCheckFlag) {
                    continue;
                }
    
                //whether have tasks, if no tasks , no need lock  //get all tasks
                List<String> tasksQueueList = taskQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
                if (CollectionUtils.isEmpty(tasksQueueList)){
                    continue;
                }
                // creating distributed locks, lock path /dolphinscheduler/lock/worker
                mutex = zkWorkerClient.acquireZkLock(zkWorkerClient.getZkClient(),
                        zkWorkerClient.getWorkerLockPath());
    
    
                // task instance id str
                List<String> taskQueueStrArr = taskQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, taskNum);
    
                for(String taskQueueStr : taskQueueStrArr){
                    if (StringUtils.isEmpty(taskQueueStr)) {
                        continue;
                    }
    
                    if (!checkThreadCount(poolExecutor)) {
                        break;
                    }
    
                    // get task instance id
                    taskInstId = getTaskInstanceId(taskQueueStr);
    
                    // mainly to wait for the master insert task to succeed
                    waitForTaskInstance();
    
                    taskInstance = processDao.getTaskInstanceDetailByTaskId(taskInstId);
    
                    // verify task instance is null
                    if (verifyTaskInstanceIsNull(taskInstance)) {
                        logger.warn("remove task queue : {} due to taskInstance is null", taskQueueStr);
                        removeNodeFromTaskQueue(taskQueueStr);
                        continue;
                    }
    
                    Tenant tenant = processDao.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(),
                            taskInstance.getProcessDefine().getUserId());
    
                    // verify tenant is null
                    if (verifyTenantIsNull(tenant)) {
                        logger.warn("remove task queue : {} due to tenant is null", taskQueueStr);
                        removeNodeFromTaskQueue(taskQueueStr);
                        continue;
                    }
    
                    // set queue for process instance, user-specified queue takes precedence over tenant queue
                    String userQueue = processDao.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
                    taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
                    taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
    
                    logger.info("worker fetch taskId : {} from queue ", taskInstId);
    
    
                    if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){
                        continue;
                    }
    
                    // local execute path
                    String execLocalPath = getExecLocalPath();
    
                    logger.info("task instance  local execute path : {} ", execLocalPath);
    
                    // init task
                    taskInstance.init(OSUtils.getHost(),
                            new Date(),
                            execLocalPath);
    
                    // check and create Linux users
                    FileUtils.createWorkDirAndUserIfAbsent(execLocalPath,
                            tenant.getTenantCode(), logger);
    
                    logger.info("task : {} ready to submit to task scheduler thread",taskInstId);
                    // submit task
                    workerExecService.submit(new TaskScheduleThread(taskInstance, processDao));
    
                    // remove node from zk
                    removeNodeFromTaskQueue(taskQueueStr);
                }
    
            }catch (Exception e){
                logger.error("fetch task thread failure" ,e);
            }finally {
                AbstractZKClient.releaseMutex(mutex);
            }
        }
    }
    

    run还是一个while“死循环”,首先检查了当前资源是否超阈值、线程数是否够用,然后休眠1秒,判断前面的结果,为false则进入下一个循环。

    真恶心的写法,居然是先获取runCheckFlag标志,休眠后再判断这个值。

    调用taskQueue.getAllTasks获取当前所有的任务列表,为空则进入下一次循环。

    难道不应该用hasTask这样的接口判断吗?此处只是判断是否有作业,获取全部的任务列表就不合适了,优点浪费内存。

    申请InterProcessMutex锁,这样同一时刻只有一个worker节点可以从队列中poll任务。这意味着,任务会随机的在worker节点执行。分配任务的算法多少有点简单,难道不应该哪个节点资源多,抢占锁的可能性大一点吗?

    其实可以基于zookeeper实现一个具有优先级的分布式锁,申请锁时会设置当前客户端的权重,权重大的抢到锁的可能性随之增大。

    抢占到锁之后,每次poll固定数量的任务。还记得之前TaskQueue中插入的是什么样的数据吗?

    ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2...
    

    虽然作者说,百万级别的时候处理这样的字符串没有啥性能损耗,但简化点不好吗?特别是前面还有一个getAll。

    获取到任务列表之后,就是一个for循环,依次处理任务。下面简单总结一下其逻辑。

    1. 判断taskQueueStr是否为空。感觉有点多此一举。
    2. 判断当前线程数是否够用
    3. 从taskQueueStr中取到任务ID。就是按照_分隔之后的第四个字段。
    4. 等待任务实例信息插入到数据库。循环30次,每次等待1秒。注释说数据库操作会被延迟,不知道哪里会延迟。
    5. 通过任务id,获取任务实例信息。
    6. 通过任务实例,获取租户信息。
    7. 通过任务实例,获取用户队列信息。为啥不在查询任务实例信息的时候,直接获取到呢?或者在getTaskInstanceDetailByTaskId一次性获取到?
    8. 判断任务实例是否可以在当前节点执行,不能则继续下一个任务处理。这为啥不提前判断呢?调了2次db查询才来判断?
    9. 任务实例初始化
    10. 检查目录、用户是否存在。不存在则创建用户、目录。为啥不是提前建好?每次还要检查一遍。
    11. 提交任务,交给TaskScheduleThread线程执行。
    12. 删除taskQueue中对应的任务节点。

    FetchTaskThread功能就是抢占zk锁,从TaskQueue获取任务,然后创建TaskScheduleThread线程去执行。

    TaskScheduleThread

    TaskScheduleThread的功能应该是比较简单了,毕竟到这一步就要执行具体的逻辑了。

    @Override
    public void run() {
    
        try {
            // update task state is running according to task type
            updateTaskState(taskInstance.getTaskType());
    
            logger.info("script path : {}", taskInstance.getExecutePath());
            // task node
            TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class);
    
            // copy hdfs/minio file to local
            copyHdfsToLocal(processDao,
                    taskInstance.getExecutePath(),
                    createProjectResFiles(taskNode),
                    logger);
    
            // get process instance according to tak instance
            ProcessInstance processInstance = taskInstance.getProcessInstance();
    
            // set task props
            TaskProps taskProps = new TaskProps(taskNode.getParams(),
                    taskInstance.getExecutePath(),
                    processInstance.getScheduleTime(),
                    taskInstance.getName(),
                    taskInstance.getTaskType(),
                    taskInstance.getId(),
                    CommonUtils.getSystemEnvPath(),
                    processInstance.getTenantCode(),
                    processInstance.getQueue(),
                    taskInstance.getStartTime(),
                    getGlobalParamsMap(),
                    taskInstance.getDependency(),
                    processInstance.getCmdTypeIfComplement());
            // set task timeout
            setTaskTimeout(taskProps, taskNode);
    
            taskProps.setTaskAppId(String.format("%s_%s_%s",
                    taskInstance.getProcessDefine().getId(),
                    taskInstance.getProcessInstance().getId(),
                    taskInstance.getId()));
    
            // custom logger
            Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
                    taskInstance.getProcessDefine().getId(),
                    taskInstance.getProcessInstance().getId(),
                    taskInstance.getId()));
    
            task = TaskManager.newTask(taskInstance.getTaskType(),
                    taskProps,
                    taskLogger);
    
            // task init
            task.init();
    
            // task handle
            task.handle();
    
            // task result process
            task.after();
    
        }catch (Exception e){
            logger.error("task scheduler failure", e);
            kill();
            // update task instance state
            processDao.changeTaskState(ExecutionStatus.FAILURE,
                    new Date(),
                    taskInstance.getId());
        }
    
        logger.info("task instance id : {},task final status : {}",
                taskInstance.getId(),
                task.getExitStatus());
        // update task instance state
        processDao.changeTaskState(task.getExitStatus(),
                new Date(),
                taskInstance.getId());
    }
    

    还是一步步分析run方法。

    1. 更新任务状态为ExecutionStatus.RUNNING_EXEUTION
    2. 从任务实例获取任务节点信息。
    3. 从HDFS复制文件到本地。包括一些用户上传的资源文件,jar包、SQL文件、配置文件等等。
    4. 构造TaskProps对象。
    5. 初始化任务日志对象。
    6. 构造AbstractTask实例
    7. 依次调用AbstractTask的init、handle、after。
    8. 更新任务实例的状态。异常失败或成功等。

    TaskManager.newTask还是比较重要的,它创建了最终的、具体的、可执行的任务实例。

    public static AbstractTask newTask(String taskType, TaskProps props, Logger logger)
      throws IllegalArgumentException {
    switch (EnumUtils.getEnum(TaskType.class,taskType)) {
        case SHELL:
        return new ShellTask(props, logger);
      case PROCEDURE:
        return new ProcedureTask(props, logger);
      case SQL:
        return new SqlTask(props, logger);
      case MR:
        return new MapReduceTask(props, logger);
      case SPARK:
        return new SparkTask(props, logger);
      case FLINK:
        return new FlinkTask(props, logger);
      case PYTHON:
        return new PythonTask(props, logger);
      case DEPENDENT:
        return new DependentTask(props, logger);
      case HTTP:
        return new HttpTask(props, logger);
      default:
        logger.error("unsupport task type: {}", taskType);
        throw new IllegalArgumentException("not support task type");
    }
    

    至此,终于找到了前端配置任务的具体实现类。其实吧,这个异常抛的没有道理。这个taskType肯定是保存在数据库的,保存之前应该做校验了吧,毕竟是enum转过去的。

    ShellTask

    AbstractTask的子类现在有9个,为了简单下面只分析ShellTask,这是一个常见且简单的任务类型。

    public ShellTask(TaskProps taskProps, Logger logger) {
        super(taskProps, logger);
        
        this.taskDir = taskProps.getTaskDir();
        
        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskProps.getTaskDir(),
                taskProps.getTaskAppId(),
                taskProps.getTaskInstId(),
                taskProps.getTenantCode(),
                taskProps.getEnvFile(),
                taskProps.getTaskStartTime(),
                taskProps.getTaskTimeout(),
                logger);
        this.processDao = SpringApplicationContext.getBean(ProcessDao.class);
    }
    

    先看其构造函数,有两个字段的初始化比较重要:shellCommandExecutor、processDao。ShellCommandExecutor是shell脚本的执行器,具体功能后面再分析。processDao的初始化方法是不是比较熟悉?又是通过SpringApplicationContext.getBean获取到的,传个参数多好。把这些dao等其他类型的全局或局部变量封装到TaskContenxt多好,如果任务之间传递变量,就可以用TaskContenxt了。

    根据init、handle、after的名称来看,具体的执行应该是在handle。

    public void handle() throws Exception {
        try {
          // construct process
          exitStatusCode = shellCommandExecutor.run(buildCommand(), processDao);
        } catch (Exception e) {
          logger.error("shell task failure", e);
          exitStatusCode = -1;
        }
        }
    

    handle就是简单的调用了shellCommandExecutor.run,如果出现异常,则exitStatusCode赋值-1

    shellCommandExecutor.run的代码比较多,不再深入分析,此处只简单的分析shellCommandExecutor的buildProcess的方法。

    private void buildProcess(String commandFile) throws IOException {
        //init process builder
        ProcessBuilder processBuilder = new ProcessBuilder();
        // setting up a working directory
        processBuilder.directory(new File(taskDir));
        // merge error information to standard output stream
        processBuilder.redirectErrorStream(true);
        // setting up user to run commands
        processBuilder.command("sudo", "-u", tenantCode, commandType(), commandFile);
    
        process = processBuilder.start();
    
        // print command
        printCommand(processBuilder);
    }
    

    它根据commandFile创建了一个ProcessBuilder,返回了Process对象。当然了,是通过sudo -u执行的shell命令。

    DependentTask

    DependentTask虽然是AbstractTask的一个子类,虽然与shell属于同一个层级的类,但由于其功能的特殊性,此处单独拿出来做分析。

    public void handle(){
        // set the name of the current thread
        String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskProps.getTaskAppId());
        Thread.currentThread().setName(threadLoggerInfoName);
    
        try{
            TaskInstance taskInstance = null;
            while(Stopper.isRunning()){
                taskInstance = processDao.findTaskInstanceById(this.taskProps.getTaskInstId());
    
                if(taskInstance == null){
                    exitStatusCode = -1;
                    break;
                }
    
                if(taskInstance.getState() == ExecutionStatus.KILL){
                    this.cancel = true;
                }
    
                if(this.cancel || allDependentTaskFinish()){
                    break;
                }
    
                Thread.sleep(Constants.SLEEP_TIME_MILLIS);
            }
    
            if(cancel){
                exitStatusCode = Constants.EXIT_CODE_KILL;
            }else{
                DependResult result = getTaskDependResult();
                exitStatusCode = (result == DependResult.SUCCESS) ?
                        Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE;
            }
        }catch (Exception e){
            logger.error(e.getMessage(),e);
            exitStatusCode = -1;
        }
    }
    

    又是一个“死循环”,作者很钟情于这个设计啊。

    逻辑也比较简单,总结如下:

    1. 通过任务实例id,获取当前最新的任务实例信息
    2. 判断状态是否为kill,是则退出
    3. 判断所有依赖任务是否完成,是则退出
    4. 休眠1秒,进入下一次循环。

    allDependentTaskFinish是一个非常重要的逻辑。

    private boolean allDependentTaskFinish(){
        boolean finish = true;
        for(DependentExecute dependentExecute : dependentTaskList){
            for(Map.Entry<String, DependResult> entry: dependentExecute.getDependResultMap().entrySet()) {
                if(!dependResultMap.containsKey(entry.getKey())){
                    dependResultMap.put(entry.getKey(), entry.getValue());
                    //save depend result to log
                    logger.info("dependent item complete {} {},{}",
                            DEPENDENT_SPLIT, entry.getKey(), entry.getValue().toString());
                }
            }
            if(!dependentExecute.finish(dependentDate)){
                finish = false;
            }
        }
        return finish;
    }
    

    它遍历了dependentTaskList,通过dependentExecute.finish(dependentDate)判断了依赖的作业是否全部完成,任意一个没有完成,则退出循环,返回false。

    dependentDate的值也很重要,它其实是任务的调度时间或者启动时间(补数时间)

    if(taskProps.getScheduleTime() != null){
        this.dependentDate = taskProps.getScheduleTime();
    }else{
        this.dependentDate = taskProps.getTaskStartTime();
    }
    

    通过一层层追踪分析DependentExecute.finish,我们定位到了DependentExecute.calculateResultForTasks,这是用来判断某个依赖项的依赖结果的。

    /**
     * calculate dependent result for one dependent item.
     * @param dependentItem dependent item
     * @param dateIntervals date intervals
     * @return dateIntervals
     */
    private DependResult calculateResultForTasks(DependentItem dependentItem,
                                                        List<DateInterval> dateIntervals) {
        DependResult result = DependResult.FAILED;
        for(DateInterval dateInterval : dateIntervals){
            ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionId(),
                                                    dateInterval);
            if(processInstance == null){
                logger.error("cannot find the right process instance: definition id:{}, start:{}, end:{}",
                       dependentItem.getDefinitionId(), dateInterval.getStartTime(), dateInterval.getEndTime() );
                return DependResult.FAILED;
            }
            if(dependentItem.getDepTasks().equals(Constants.DEPENDENT_ALL)){
                result = getDependResultByState(processInstance.getState());
            }else{
                TaskInstance taskInstance = null;
                List<TaskInstance> taskInstanceList = processDao.findValidTaskListByProcessId(processInstance.getId());
    
                for(TaskInstance task : taskInstanceList){
                    if(task.getName().equals(dependentItem.getDepTasks())){
                        taskInstance = task;
                        break;
                    }
                }
                if(taskInstance == null){
                    // cannot find task in the process instance
                    // maybe because process instance is running or failed.
                     result = getDependResultByState(processInstance.getState());
                }else{
                    result = getDependResultByState(taskInstance.getState());
                }
            }
            if(result != DependResult.SUCCESS){
                break;
            }
        }
        return result;
    }
    

    总结并简化其重要的逻辑,大概是如果依赖整个DAG,则判断流程定义实例的状态;否则依次判断依赖任务实例的状态。

    DependentTask的逻辑简单清晰,就是循环等待所有的任务结束。但感觉这样设计不太好,毕竟把它当成一个普通的Task,就意味着它会占用整体的可调用的线程池。如果项目多、任务多、依赖也多的话,这个浪费还是有点大的。个人觉得DependentTask可以单独设计成一个线程,或者放到独立的线程池去运行。毕竟对于一个调度系统来说,“依赖”还是一个非常重要的概念的。

    WorkerServer

    最后我们分析WorkerServer,这是与master同级的类。与master分析思路一致,还是先来看stop方法。

    此处补贴代码,只总结stop逻辑。

    1. 调用Stopper.stop设置全局变量。停止所有线程的“死”循环
    2. 休眠3秒
    3. 停止worker心跳。heartbeatWorkerService.shutdownNow
    4. 停止worker任务线程池。ThreadPoolExecutors.getInstance().shutdown
    5. 停止killExecutor线程池。killExecutorService.shutdownNow
    6. 停止fetchTask线程池。fetchTaskExecutorService.shutdownNow
    7. 停止zookeeper客户端。zkWorkerClient.close

    heartBeatThread不再分析,其逻辑与master基本一致,就是上报worker的当前资源使用情况。

    ZKWorkerClient

    最后我们再来看ZKWorkerClient的逻辑,它与worker的容灾有很大关系。这是一个非常重要的逻辑和概念,下面会逐步深入分析。

    private ZKWorkerClient(){
    	init();
    }
    
    /**
     * init
     */
    private void init(){
    
    	// init system znode
    	this.initSystemZNode();
    
    	// monitor worker
    	this.listenerWorker();
    
    	// register worker
    	this.registWorker();
    }
    

    先来看其初始化过程,就是一次调用initSystemZNode、listenerWorker、registWorker。

    protected void initSystemZNode(){
    	try {
    		createNodePath(getMasterZNodeParentPath());
    		createNodePath(getWorkerZNodeParentPath());
    		createNodePath(getDeadZNodeParentPath());
    
    	} catch (Exception e) {
    		logger.error("init system znode failed : " + e.getMessage(),e);
    	}
    }
    
    private void createNodePath(String zNodeParentPath) throws Exception {
        if(null == zkClient.checkExists().forPath(zNodeParentPath)){
            zkClient.create().creatingParentContainersIfNeeded()
    				.withMode(CreateMode.PERSISTENT).forPath(zNodeParentPath);
    	}
    }
    

    根据initSystemZNode源码,以及涉及到的三个函数来看,就是在zookeeper中依次创建了3个节点。令人诧异的是,在worker节点初始化过程中居然会创建master相关的子节点。

    下面我们先分析registWorker,因为我觉得就是应该先注册worker节点,在开启监听啊。

    registWorker的源码不再贴出来,它主要就是调用registerServer(ZKNodeType.WORKER)注册了当前节点。

    public String registerServer(ZKNodeType zkNodeType) throws Exception {
    	String registerPath = null;
    	String host = OSUtils.getHost();
    	if(checkZKNodeExists(host, zkNodeType)){
    		logger.error("register failure , {} server already started on host : {}" ,
    				zkNodeType.toString(), host);
    		return registerPath;
    	}
    	registerPath = createZNodePath(zkNodeType);
    
        // handle dead server
    	handleDeadServer(registerPath, zkNodeType, Constants.DELETE_ZK_OP);
    
    	return registerPath;
    }
    

    registerServer首先检查了当前节点是否存在,存在则退出;不存在则创建节点。最后调用handleDeadServer,其实就是查找死掉的节点,然后从zk中删除。

    private void listenerWorker(){
    	workerPathChildrenCache = new PathChildrenCache(zkClient, getZNodeParentPath(ZKNodeType.WORKER), true, defaultThreadFactory);
    	try {
    		workerPathChildrenCache.start();
    		workerPathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
    			@Override
    			public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
    				switch (event.getType()) {
    					case CHILD_ADDED:
    						logger.info("node added : {}" ,event.getData().getPath());
    						break;
    					case CHILD_REMOVED:
                            String path = event.getData().getPath();
    						//find myself dead
    						String serverHost = getHostByEventDataPath(path);
    						if(checkServerSelfDead(serverHost, ZKNodeType.WORKER)){
    							return;
    						}
    						break;
    					case CHILD_UPDATED:
    						break;
    					default:
    						break;
    				}
    			}
    		});
    	}catch (Exception e){
    		logger.error("monitor worker failed : " + e.getMessage(),e);
    	}
    }
    

    listenerWorker就是监听worker的CHILD_REMOVED事件,监听到该事件之后,调用了checkServerSelfDead。worker本身并不会对其他worker节点的移除进行啥具体逻辑。

    protected boolean checkServerSelfDead(String serverHost, ZKNodeType zkNodeType) {
    	if (serverHost.equals(OSUtils.getHost())) {
    		logger.error("{} server({}) of myself dead , stopping...",
    				zkNodeType.toString(), serverHost);
    		stoppable.stop(String.format(" {} server {} of myself dead , stopping...",
    				zkNodeType.toString(), serverHost));
    		return true;
    	}
    	return false;
    }
    

    checkServerSelfDead判断是否为当前节点,如果是则调用stoppable.stop,而stoppable是在WorkerServer.run函数中设置的。

    zkWorkerClient.setStoppable(this);
    

    listenerWorker就是监听当前节点是否超时被zookeeper删除,删除后则调用stop方法,优雅退出。

    ZKMasterClient

    ZKWorkerClient好像没啥逻辑,就是用来优雅的退出,下面来分析ZKMasterClient。

    private ZKMasterClient(ProcessDao processDao){
    	this.processDao = processDao;
    	init();
    }
    
    public void init(){
    	// init dao
    	this.initDao();
    
    	InterProcessMutex mutex = null;
    	try {
    		// create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master
    		String znodeLock = getMasterStartUpLockPath();
    		mutex = new InterProcessMutex(zkClient, znodeLock);
    		mutex.acquire();
    
    		// init system znode
    		this.initSystemZNode();
    
    		// monitor master
    		this.listenerMaster();
    
    		// monitor worker
    		this.listenerWorker();
    
    		// register master
    		this.registerMaster();
    
    		// check if fault tolerance is required,failure and tolerance
    		if (getActiveMasterNum() == 1) {
    			failoverWorker(null, true);
    			failoverMaster(null);
    		}
    
    	}catch (Exception e){
    		logger.error("master start up  exception : " + e.getMessage(),e);
    	}finally {
    		releaseMutex(mutex);
    	}
    }
    

    代码比较长,总结一下其逻辑:

    1. initDao。其实就是初始化alertDao,调用DaoFactory.getDaoInstance(AlertDao.class)。好恶心的初始化方法,processDao是传进来的,alertDao又是这样创建的。
    2. 申请/dolphinscheduler/lock/failover/master路径的分布式锁。
    3. 申请到锁之后,依次调用initSystemZNode、listenerMaster、listenerWorker、registerMaster
    4. 如果当前活动的master个数为1则进行容灾。暂时还不知道为啥。

    initSystemZNode不再分析;listenerMaster不再贴源码,它主要逻辑就是监听到其他master节点被移除后调用removeZKNodePath,如果是当前节点,则优雅退出。

    removeZKNodePath也不再贴源码,它主要是处理死掉的节点、进行预警,如果要进行故障转移,就调用failoverServerWhenDown。这是一个非常重要的方法,它在里面按照不同情况调用了failoverMaster或failoverWorker。也就是说master和worker的故障转移都是在master处理的。

    private void failoverMaster(String masterHost) {
    	logger.info("start master failover ...");
    
    	List<ProcessInstance> needFailoverProcessInstanceList = processDao.queryNeedFailoverProcessInstances(masterHost);
    
    	//updateProcessInstance host is null and insert into command
    	for(ProcessInstance processInstance : needFailoverProcessInstanceList){
    		processDao.processNeedFailoverProcessInstances(processInstance);
    	}
    
    	logger.info("master failover end");
    }
    

    failoverMaster查询了指定master节点运行的流程定义实例,然后调动processNeedFailoverProcessInstances进行处理。

    @Transactional(rollbackFor = Exception.class)
    public void processNeedFailoverProcessInstances(ProcessInstance processInstance){
        //1 update processInstance host is null
        processInstance.setHost("null");
        processInstanceMapper.updateById(processInstance);
    
        //2 insert into recover command
        Command cmd = new Command();
        cmd.setProcessDefinitionId(processInstance.getProcessDefinitionId());
        cmd.setCommandParam(String.format("{"%s":%d}", Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));
        cmd.setExecutorId(processInstance.getExecutorId());
        cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
        createCommand(cmd);
    }
    

    processNeedFailoverProcessInstances逻辑就是更新为当前流程定义实例的host字段为null字符串,插入一条类型为RECOVER_TOLERANCE_FAULT_PROCESS的Command,参数是流程定义实例id。

    这里只是插入了一条Command,具体是在哪里处理的呢?这里全局搜索一下。

    recover_tolerance_fault_process

    排除之后定位到ProcessDao.constructProcessInstance,如果是RECOVER_TOLERANCE_FAULT_PROCESS类型,则调用processInstance.setRecovery(Flag.YES)。依然不知道怎么处理的,还得搜索判断为recovery为true时的逻辑。

    搜来搜去你会惊奇的发现,没有地方会对RECOVER_TOLERANCE_FAULT_PROCESS或processInstance.getRecovery为true做特殊处理!!!

    那就得回到cmd的构造上,它设置了4个值,就得研究下这跟普通的Command的其他区别了。

    Command command = new Command();
    command.setCommandType(CommandType.SCHEDULER);
    command.setExecutorId(schedule.getUserId());
    command.setFailureStrategy(schedule.getFailureStrategy());
    command.setProcessDefinitionId(schedule.getProcessDefinitionId());
    command.setScheduleTime(scheduledFireTime);
    command.setStartTime(fireTime);
    command.setWarningGroupId(schedule.getWarningGroupId());
    command.setWorkerGroupId(schedule.getWorkerGroupId());
    command.setWarningType(schedule.getWarningType());
    command.setProcessInstancePriority(schedule.getProcessInstancePriority());
    

    现在回过头去ProcessScheduleJob.execute看一下,普通Command是如何构造的。大家会发现RECOVER_TOLERANCE_FAULT_PROCESS的Command多了一个commandParam的设置,这个param的key是Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING。

    通过全局搜索,定位到ProcessDao.constructProcessInstance,这里有对Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING的处理。

    commandRecover

    处理也比较简单,就是获取到Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING的值,然后查询到流程定义实例对象而不是重新创建一个。后续逻辑就是把流程定义实例的参数,全都put到cmdParam中进行后续的逻辑判断。

    根据流程实例id查询,而不是重新创建。这样的好处是可以避免已经执行成功的任务不再重复执行。那处于running状态的任务呢?这个就要结合master的优雅退出来看了。

    还记得master如何优雅退出的吗?好像没有涉及到这部分逻辑啊?没错,就是没有涉及,其实master会等待所调度的所有流程定义实例中的任务直至结束的。也就是说,调度的任务应该会被正常的执行完毕的,状态最终会被更新成失败或者成功。

    那如果宕掉的master没有能成功更新作业的状态呢?很不幸,DolphinScheduler没有处理这部分异常。此时的作业只能是永久处于running状态,除非手动干预。

    这部分逻辑最好处理一下啊,要不然就真的永久处于running状态了。

    分析到这里大家会发现,其实master的故障转移很简单,就是把某个master节点的流程定义实例交由其他master节点去驱动,原来的流程定义实例中的任务状态没有任何改变。

    下面来看看worker的故障转移。

    /**
     * failover worker tasks
     *
     * 1. kill yarn job if there are yarn jobs in tasks.
     * 2. change task state from running to need failover.
     * 3. failover all tasks when workerHost is null
     * @param workerHost			worker host
     * @param needCheckWorkerAlive	need check worker alive
     * @throws Exception			exception
     */
    private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception {
    	logger.info("start worker[{}] failover ...", workerHost);
    
    	List<TaskInstance> needFailoverTaskInstanceList = processDao.queryNeedFailoverTaskInstances(workerHost);
    	for(TaskInstance taskInstance : needFailoverTaskInstanceList){
    		if(needCheckWorkerAlive){
    			if(!checkTaskInstanceNeedFailover(taskInstance)){
    				continue;
                }
    		}
    
    		ProcessInstance instance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
    		if(instance!=null){
    			taskInstance.setProcessInstance(instance);
    		}
    		// only kill yarn job if exists , the local thread has exited
    		ProcessUtils.killYarnJob(taskInstance);
    
    		taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
    		processDao.saveTaskInstance(taskInstance);
    	}
    	logger.info("end worker[{}] failover ...", workerHost);
    }
    

    首先根据worker的IP找到对应的任务实例,如果该实例不需要故障转移则继续下一个任务实例的检测;如果需要,则找到对应的流程定义实例,将其与任务实例关联;只杀掉yarn作业,作者说其他作业的本地线程已经退出(谁知道呢);设置任务实例的状态为ExecutionStatus.NEED_FAULT_TOLERANCE。

    看到这里发现worker的故障转移也没啥太复杂功能,就是设置任务实例的状态。那是如何处理ExecutionStatus.NEED_FAULT_TOLERANCE状态的任务实例的呢?

    很不幸,没有具体的逻辑来处理这个状态。还记得worker是如何优雅stop的吗?就是等对应的线程池安全退出,其实就是让正在执行的任务实例继续执行完毕。

    needFaultTolerance

    全局搜索之后,发现对ExecutionStatus.NEED_FAULT_TOLERANCE做过的唯一比较重要的判断就是:如果作业失败且需要故障转移,就把他放到recoverToleranceFaultTaskList列表中。但跟踪代码才发现recoverToleranceFaultTaskList就是用来预警的。

    刚开始,taskCanRetry中对NEED_FAULT_TOLERANCE的判断被我忽略了。

    /**
     * determine if you can try again
     * @return can try result
     */
    public boolean taskCanRetry() {
        if(this.isSubProcess()){
            return false;
        }
        if(this.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE){
            return true;
        }else {
            return (this.getState().typeIsFailure()
                && this.getRetryTimes() < this.getMaxRetryTimes());
        }
    }
    

    taskCanRetry中,如果是ExecutionStatus.NEED_FAULT_TOLERANCE状态,则不管重试了多少次,一定可以重试。有啥用呢?

    其实就是如果发现某个作业是故障转移状态,则失败的时候一定可以重试。

    if(task.getState().typeIsFailure()){
        if(task.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE){
            this.recoverToleranceFaultTaskList.add(task);
        }
        if(task.taskCanRetry()){
            addTaskToStandByList(task);
        }else{
            // node failure, based on failure strategy
            errorTaskList.put(task.getName(), task);
            completeTaskList.put(task.getName(), task);
            if(processInstance.getFailureStrategy() == FailureStrategy.END){
                killTheOtherTasks();
            }
        }
        continue;
    }
    

    上面是MasterExecThread.runProcess中的一段代码,其逻辑就是如果当前作业失败,且可以重试,就把作业添加到readyToSubmitTaskList队列中再次执行。而ExecutionStatus.NEED_FAULT_TOLERANCE就是属于可以重试。

    至此简单总结一下DolphinScheduler故障转移的逻辑:

    1. worker如果与zookeeper连接超时,则停止心跳,停止获取任务,等待所有任务实例执行结束(正常或失败)并更新数据库状态
    2. master如果与zookeeper连接超时,则停止心跳,停止获取流程定义实例,停止调度所有流程定义实例
    3. master如果发现某个流程定义实例中的任务实例失败且属于ExecutionStatus.NEED_FAULT_TOLERANCE状态,则重新运行。

    但这里有一个不太合理的假设:如果master/worker与zookeeper的连接超时,则master/worker出现了问题,应该发生故障转移。

    master/worker与zookeeper的连接超时有两种可能,master/worker的网络有问题、zookeeper有问题。如果是master/worker的网络有问题则MySQL的读写也会有问题,意味着任务实例的状态更新可能有问题,此时发生故障转移没问题;如果是zookeeper服务本身有问题,则所有的master/worker可能都会有问题,即使发生故障转移意义不是特别大。

    其实个人觉得,既然DolphinScheduler把流程定义、实例等信息保存到了数据库,那么心跳应该去度量与数据库的连接,而不是去度量与zookeeper的连接。zookeeper存在的意义就是分布式锁,而不应该是度量心跳!

    引用

    1. Dolphin Scheduler Api Docs
    2. DolphinScheduler内部原理和架构设计.ppt
  • 相关阅读:
    数据库主键策略
    经历alidns在国外的严重延时
    无题 MVC
    Jquery操作select,左右移动,双击移动 取到所有option的值
    Computop支付网关(一) credit Card
    [转载] 测试的道理
    微信砍价活动总结
    transform,变换
    transition,过渡效果
    文字居底方法
  • 原文地址:https://www.cnblogs.com/gabry/p/12162272.html
Copyright © 2011-2022 走看看