1.从调度到airflow
ETL,是英文 Extract,Transform,Load 的缩写,用来描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程,合理安排三者以及三者子类的过程被称之为数据调度。
在数据调度中,数据流程之间的依赖主要是以下四种:
-
时间依赖:任务需要等待某一个时间点触发。
-
外部系统依赖:任务依赖外部系统需要调用接口去访问。
-
任务间依赖:任务 A 需要在任务 B 完成后启动,两个任务互相间会产生影响。
-
资源环境依赖:任务消耗资源非常多, 或者只能在特定的机器上执行。
crontab 可以处理定时执行任务的需求,但仅能管理时间上的依赖,无法处理逻辑上的依赖和相应的监控。所以我们准备寻一种轻度的调度工具替代他,airflow是不二之选。
Airflow 是一个使用 Python 语言编写的 Data Pipeline 调度和监控工作流的平台,基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行。
提供了丰富的命令行工具用于系统管控,而其web管理界面同样也可以方便的管控调度任务,并且对任务运行状态进行实时监控,方便了系统的运维和管理。
总结为下面三点:
- Airflow 是一种 仓库管理系统(Warehouse Management System 简称WMS),它可以解决上述四种依赖问题,将任务以及它们的依赖看作代码,按照那些计划规范任务执行,并在实际工作进程之间分发需执行的任务。
- Airflow 提供了一个用于显示当前活动任务和过去任务状态的web界面,并允许用户手动管理任务的执行和状态。
- Airflow 中的工作流是具有方向性依赖的任务集合。
对应Airflow调度工作有以下功能:
- 系统配置($AIRFLOW_HOME/airflow.cfg)
- 作业管理($AIRFLOW_HOME/dags/xxxx.py)
- 运行监控(webserver)
- 报警(邮件或短信)
- 日志查看(webserver 或 $AIRFLOW_HOME/logs/***)
- 跑批耗时分析(webserver)
- 后台调度服务(scheduler)
完成上述功能对应如上组件:
-
元数据库:这个数据库存储有关任务状态的信息。
-
调度器:Scheduler 是一种使用 DAG 定义结合元数据中的任务状态来决定哪些任务需要被执行以及任务执行优先级的过程。调度器通常作为服务运行。
-
执行器:Executor 是一个消息队列进程,它被绑定到调度器中,用于确定实际执行每个任务计划的工作进程。有不同类型的执行器,每个执行器都使用一个指定工作进程的类来执行任务。例如,LocalExecutor 使用与调度器进程在同一台机器上运行的并行进程执行任务。其他像 CeleryExecutor 的执行器使用存在于独立的工作机器集群中的工作进程执行任务。
-
Workers:这些是实际执行任务逻辑的进程,由正在使用的执行器确定。
2.安装
首先安装命令为:
pip install airflow