概述
为什么需要一个复杂的工作量调度器?
1、一个完整的数据分析系统通常都是由大量任务单元组成:shell脚本程序,java程序,mapreduce程序、
hive脚本等
2、各任务单元之间存在时间先后及前后依赖关系
3、为了很好地组织起这样的复杂执行计划,需要一个工作流调度系统来调度执行
Apache DolphinScheduler 是一个分布式去中心化,中国人易观开源的一个分布式易扩展的可视化
DAG 工作流任务调度系统。致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理
流程中开箱即用。
特点:数据处理,可视化DAG,分布式,可扩展。
和其他调度系统的比较
Azkaban与Oozie对比
ooize 相比azkaban 是一个重量级的任务调度系统,功能全面,但配置使用也更复杂。如果可以不在意某些功能的缺失,轻量级调度器 azkaban 是很不错的候选对象。
功能:
两者均可以调度MapReduce,Hive,Java,脚本工作流任务等
两者均可以定时执行和间隔执行工作流任务
工作流定义:
Azkaban使用Properties文件定义工作流
Oozie使用XML文件定义工作流
工作流传参:
Azkaban支持直接传参,例如${input}
Oozie支持参数和EL表达式,例如${fs:dirSize(myInputDir)} strust2(ONGL)
定时执行:
Azkaban的定时执行任务是基于时间的
Oozie的定时执行任务基于时间和输入数据
资源管理:
Azkaban有较严格的权限控制,如用户对工作流进行读/写/执行等操作
Oozie暂无严格的权限控制
工作流执行:
Azkaban有两种运行模式,分别是solo server mode(executor server和web server部署在同一台
节点)和multi server mode(executor server和web server可以部署在不同节点)
Oozie作为工作流服务器运行,支持多用户和多工作流
工作流管理:
Azkaban支持浏览器以及ajax方式操作工作流
Oozie支持命令行、HTTP REST、Java API、浏览器操作工作流
丰富特性
- 以 DAG 图的方式将 Task 按照任务的依赖关系关联起来,可实时可视化监控任务的运行状态
- 支持丰富的任务类型:Shell、MapReduce、Spark、SQL(mysql、postgresql、hive、sparksql), Python, Sub_Process、Procedure 等
- 支持工作流定时调度、依赖调度、手动调度、手动暂停/停止/恢复,同时支持失败重试/告警、从指定节点恢复失败、Kill 任务等操作
- 支持工作流优先级、任务优先级及任务的故障转移及任务超时告警/失败
- 支持工作流全局参数及节点自定义参数设置
- 支持资源文件的在线上传/下载,管理等,支持在线文件创建、编辑
- 支持任务日志在线查看及滚动、在线下载日志等
- 实现集群HA,通过Zookeeper实现Master集群和Worker集群去中心化
- 支持对Master/Worker cpu load,memory,cpu在线查看
- 支持工作流运行历史树形/甘特图展示、支持任务状态统计、流程状态统计
- 支持补数
- 支持多租户
- 支持国际化
- 系统支持基于cron 表达式的定时调度和手动调度。命令类型支持:启动工作流、从当前节
点开始执行、恢复被容错的工作流、恢复暂停流程、从失败节点开始执行、补数、定时、重跑、暂停、
停止、恢复等待线程。其中 恢复被容错的工作流 和 恢复等待线程 两种命令类型是由调度内部控制使
用,外部无法调用 - 定时调度:系统采用 quartz 分布式调度器,并同时支持 cron 表达式可视化的生成
系统架构
基本流程如下:WEB把任务编排写入MySQL,masterserver根据ZK选出一个master, 该master负责扫MySQL表,进行DAG拆分,然后进行任务调度把任务分发给 worker节点执行。
具体组件说明:
MasterServer:
MasterServer 采用分布式无中心设计理念,MasterServer 主要负责 DAG 任务切分、任务提交监控,
并同时监听其它 MasterServer 和 WorkerServer 的健康状态。 MasterServer 服务启动时向
Zookeeper 注册临时节点,通过监听 Zookeeper 临时节点变化来进行容错处理。 MasterServer 基于
netty 提供监听服务。
该服务内主要包含:
- Distributed Quartz 分布式调度组件,主要负责定时任务的启停操作,当 quartz 调起任务后,
Master 内部会有线程池具体负责处理任务的后续操作 - MasterSchedulerThread 是一个扫描线程,定时扫描数据库中的 command 表,根据不同的
命令类型 进行不同的业务操作 - MasterExecThread 主要是负责 DAG 任务切分、任务提交监控、各种不同命令类型的逻辑处理
- MasterTaskExecThread 主要负责任务的持久化
WorkerServer:
WorkerServer 也采用分布式无中心设计理念,WorkerServer 主要负责任务的执行和提供日志服务。
WorkerServer 服务启动时向 Zookeeper 注册临时节点,并维持心跳。Server 基于 netty 提供监听服
务。
该服务包括:
- FetchTaskThread 主要负责不断从 Task Queue 中领取任务,并根据不同任务类型调用
TaskScheduleThread 对应执行器。 - LoggerServer 是一个 RPC 服务,提供日志分片查看、刷新和下载等功能
ZooKeeper:
主要是用作集群管理,HA,分布式锁
Task Queue:
DS 也是采用 ZK 实现的, 队列信息比较少,不大可能存在消息积压的问题
Alert:
提供告警相关结课,告警信息存储、查询通知功能
API:
所有的操作API封装在此
去中心化vs中心化
中心化思想如下图:
master节点主要负责任务的分发, 均衡分发给下游 worker节点,不至于饿死或者饱死。同时维护和worker节点心跳,自动扩容摘除。类似于包工头
worker就是具体干活的,维护和 master的心跳,以便接收到master分发的任务。
缺点:
master 单点效应
去中心化:
- 所有角色平等,影响范围小, 挂一台机器只会影响一部分
- 不存在单点故障问题, 但是由于不存在管理者,每个节点都需要和其他所有节点通信才能拿到全部机器信息,分布式通信是不可靠的,所以大大增加实现难度
- 真正去中心化的并不多见,大部分还是通过ZK 实现动态选主
ZK 分布式锁的实现,主要是顺序临时节点的原子性,只有节点值最小的才获取到锁
容错设计
通过ZK
其中 Master 监控其他 Master 和 Worker 的目录,如果监听到 remove 事件,则会根据具体的业务逻辑进行流程实例容错或者任务实例容错。
任务失败重试
- 任务失败重试是任务级别的,是调度系统自动进行的,比如一个 Shell 任务设置重试次数为 3次,那么在 Shell 任务运行失败后会自己再最多尝试运行3次
- 流程失败恢复是流程级别的,是手动进行的,恢复是从只能从失败的节点开始执行或从当前节点开始执行
- 流程失败重跑也是流程级别的,是手动进行的,重跑是从开始节点进行
任务节点: - 一种是业务节点,这种节点都对应一个实际的脚本或者处理语句,比如 Shell 节点,MR 节点、Spark 节点、依赖节点等。(每一个 业务节点 都可以配置失败重试的次数,当该任务节点失败,会自动重试,直到成功或者超过配置的重试次数。)
- 还有一种是逻辑节点,这种节点不做实际的脚本或语句处理,只是整个流程流转的逻辑处理,比如子流程节等。(逻辑节点不支持失败重试。但是逻辑节点里的任务支持重试。)
任务优先级设计
- 按照不同流程实例优先级优先于同一个流程实例优先级优先于同一流程内任务优先级优先于同一流
程内任务提交顺序依次从高到低进行任务处理。 - 具体实现是根据任务实例的json解析优先级,然后把流程实例优先级_流程实例id_任务优先级_任
务id信息保存在 ZooKeeper 任务队列中,当从任务队列获取的时候,通过字符串比较即可得出最
需要优先执行的任务 - 任务优先级有5项 HIGHEST、HIGH、MEDIUM、LOW、LOWEST
Logback 和 gRPC 实现日志访问
统一日志查询平台:
由于日志存储节点和查询节点不在一个地方,所以解决方案 要么是 日志存储在ES中
要么通过 GRPC 通信远程查询日志信息
DS 是通过 netty远程拉取日志查询的
模块
dolphinscheduler-alert 告警模块,提供 AlertServer 服务。
dolphinscheduler-api web 应用模块,提供 ApiServer 服务。
dolphinscheduler-common 通用的常量枚举、工具类、数据结构或者基类
dolphinscheduler-dao 提供数据库访问等操作。
dolphinscheduler-remote 基于 netty 的客户端、服务端
dolphinscheduler-server MasterServer 和 WorkerServer 服务
dolphinscheduler-service service模块,包含 Quartz、Zookeeper、日志客户端访问服务,便于server 模块和 api 模块调用
dolphinscheduler-ui 前端模块
功能点分析
具体功能分析
支持的任务节点类型:
Shell节点
子流程节点:就是把外部的某个工作流定义当做一个任务节点去执行
依赖节点: 就是依赖检查节点。比如A流程依赖昨天的B流程执行成功,依赖节点会去检查B流程在昨天是否有执行成功的实例。
存储过程节点: 根据选择的数据源,执行存储过程
SQL节点:
非查询SQL功能:编辑非查询SQL任务信息,sql类型选择非查询
查询SQL功能:编辑查询SQL任务信息,sql类型选择查询,选择表格或附件形式发送邮件到指定的收件人。
SPARK节点:通过SPARK节点,可以直接直接执行SPARK程序,对于spark节点,worker会使用spark-submit方式提交任务
MapReduce(MR)节点:使用MR节点,可以直接执行MR程序。对于mr节点,worker会使用hadoop jar方式提交任务
Python节点:使用python节点,可以直接执行python脚本,对于python节点,worker会使用python **方式提交任务。
Flink节点
http节点 :http请求URL
DATAX节点:
Conditions:Conditions是一个条件节点,根据上游任务运行状态,判断应该运行哪个下游任务。截止目前Conditions支持多个上游任务,但只支持两个下游任务。当上游任务数超过一个时,可以通过且以及或操作符实现复杂上游依赖。就是根据上游节点的运行结果判定应该走
下流的那个节点。
Switch: Switch是一个条件判断节点,依据全局变量的值和用户所编写的表达式判断结果执行对应分支。
内置参数
基础内置参数
变量名 | 声明方式 | 含义 |
---|---|---|
system.biz.date | ${system.biz.date} | 日常调度实例定时的定时时间前一天,格式为 yyyyMMdd,补数据时,该日期 +1 |
system.biz.curdate | ${system.biz.curdate} | 日常调度实例定时的定时时间,格式为 yyyyMMdd,补数据时,该日期 +1 |
system.datetime | ${system.datetime} | 日常调度实例定时的定时时间,格式为 yyyyMMddHHmmss,补数据时,该日期 +1 |
可以通过 ${system.biz.date} 引用系统内置参数 |
全局参数
作用域 : 在工作流定义页面配置的参数,作用于该工作流中全部的任务; 如下定义作用域是在整个工作流中。
本地参数
作用域: 在任务定义页面配置的参数,默认作用域仅限该任务,如果配置了参数传递则可将该参数作用到下游任务中。 如下就是该参数只作用于该任务。
参数传递
DolphinScheduler 提供参数间相互引用的能力,包括:本地参数引用全局参数、上下游参数传递。因为有引用的存在,就涉及当参数名相同时,参数的优先级问题