Airflow本身是一个综合平台,它兼容多种组件,所以在使用的时候有多种方案可以选择。比如最关键的执行器就有四种选择:
SequentialExecutor:单进程顺序执行任务,默认执行器,通常只用于测试
LocalExecutor:多进程本地执行任务
CeleryExecutor:分布式调度,生产常用
DaskExecutor :动态任务调度,主要用于数据分析
在当前项目使用CeleryExecutor
作为执行器。
celery是一个分布式调度框架,其本身无队列功能,需要使用第三方组件,比如redis或者rabbitmq,当前项目使用的是rabbitmq,系统整体结构如下所示:
其中:
turing为外部系统
GDags服务帮助拼接成dag
master节点webui管理dags、日志等信息
scheduler负责调度,只支持单节点
worker负责执行具体dag中的task, worker支持多节点
在整个调度系统中,节点之间的传递介质是消息,而消息的本质内容是执行脚本的命令,也就是说,工作节点的dag文件必须和master节点的dag文件保持一致,不然任务的执行会出问题。
队列的使用前提就是选择CeleryExecutor执行器
以下如内容来自airflow官网
使用CeleryExecutor时,可以指定将任务发送到的Celery队列。队列是BaseOperator的属性,因此可以将任何任务分配给任何队列。在airflow.cfg的celery-> default_queue中定义了环境的默认队列。这定义了未指定任务时分配给其的队列,以及Airflow的worker在启动时侦听的队列。
worker可以听一个或多个任务队列。启动工作程序时(使用命令airflow worker),可以指定一组用逗号分隔的队列名称(例如airflow worker -q spark)。然后,此worker将仅拾取连接到指定队列的任务。
如果从资源的角度(例如非常轻巧的任务,其中一个worker可以毫无问题地执行数千个任务)或从环境的角度(您需要一个在Spark集群中运行的worker)来需要专门的worker,这可能很有用。本身,因为它需要非常具体的环境和安全权限)。http://airflow.apache.org/docs/stable/concepts.html#queues
airflow中的队列严格来说不叫Queues,叫"lebal"更为合适。在operator中,可以设置queue参数如queue=spark,然后在启动worker时:airflow worker -q spark,那么该worker只会执行spark任务。相当于节点标签。
实践测试
测试地址 http://118.178.129.199:8888/admin/
测试DAG:cc_ods.hyf_test
根据官网内容对code进行修改
# -*- coding: utf8 -*-
import pprint
from datetime import timedelta, datetime
from os import sys, path
import airflow
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
reload(sys)
sys.setdefaultencoding('utf-8')
sys.path.append(path.dirname(path.abspath(__file__)))
sys.path.append(path.dirname(path.dirname(path.abspath(__file__))))
sys.path.append(path.dirname(path.dirname(path.dirname(path.abspath(__file__)))))
from utils import basic_util, send_email, sms_service, env_conf, alarm_util
import hallo
pp = pprint.PrettyPrinter(indent=4)
str_today = (datetime.today()).strftime('%Y-%m-%d')
## Define the DAG object
default_args = {
'owner': 'yifan.hu',
'depends_on_past': True,
'start_date': airflow.utils.dates.days_ago(2),
'retries': 4,
'retry_delay': timedelta(minutes=1),
'priority_weight': 10,
'execution_timeout': timedelta(minutes=360),
'queue': 'test_spark'
}
dag_id = basic_util.get_dag_id(path.abspath(__file__))
dag = DAG(dag_id, default_args=default_args, schedule_interval='0 0 * * *', concurrency=2)
# 元数据系统生成创建表的sql文件
t_meta_create_table_statements = PythonOperator(
task_id='meta_create_table_statements',
python_callable=hallo.test_task1,
default_args=default_args,
dag=dag
)
在py文件中的default_args添加了'queue': 'test_spark',给这个任务指定了queue
然后重新调起这个任务
在修改work上的队列标志之前,这个任务一直显示queued,因为没有任何一个work的队列标签是test_spark,所以,没有work可以去执行它,
然后在启动worker的脚本中添加了 -q test_spark
修改完成后重启这个节点上的worker(这里我修改的是节点test31)
重启worker以后,任务立刻执行完毕,可以看到执行的节点就是test31(多次重复实验均是在test31上进行)
在官网中又一段话是
worker可以听一个或多个任务队列。启动工作程序时(使用命令airflow worker),可以指定一组用逗号分隔的队列名称(例如airflow worker -q spark)。然后,此worker将仅拾取连接到指定队列的任务。
也就是说我们在启动这个节点上的worker的时候,加一个参数 -q test_spark,但是在airflow.cfg中不做修改,被标志test_spark的任务将会在这个节点上执行,并且在元数据中,queue列也会显示test_spark,而其他没有定义队列的task,将会按照airflow.cfg中default_queue参数配置进行,也就是说并不是这个worker打了test_spark的标签就只能执行带有test_spark队列标签的任务,只是这样的任务一定会在这个worker执行,其他task会按照airflow.cfg中定义的进行。
下图是元数据所示
airflow.cfg中default_queue参数配置