zoukankan      html  css  js  c++  java
  • Airflow 调度基础

    1. Airflow

    Airflow是一个调度、监控工作流的平台。用于将一个工作流制定为一组任务的有向无环图(DAG),并指派到一组计算节点上,根据相互之间的依赖关系,有序执行。

    2. 安装

    pip安装airflow

    pip3 install apache-airflow

    初始化db

    airflow initdb

    启动web server

    airflow webserver -p 8081

    启动scheduler

    airflow scheduler

    3. 例子

    下面是一个基本的管道定义,接下来我们会对它们进行详细解释:

    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from datetime import datetime, timedelta

    default_args = {
       
    'owner': 'tang-airflow',
       
    'depends_on_past': False,
       
    'start_date': datetime(2019, 6, 23),
       
    'email': ['xxxxxxx@qq.com'],
       
    'email_on_failure': False,
       
    'email_on_retry': False,
       
    'retries': 1,
       
    'retry_delay': timedelta(minutes=5),
       
    # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
    }

    dag = DAG(
    'first', default_args=default_args, schedule_interval=timedelta(days=1))

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
       
    task_id='print_date',
       
    bash_command='date',
       
    dag=dag)

    t2 = BashOperator(
       
    task_id='sleep',
       
    bash_command='sleep 5',
       
    retries=3,
       
    dag=dag)

    templated_command =
    """
        {% for i in range(5) %}
            echo "{{ ds }}"
            echo "{{ macros.ds_add(ds, 7)}}"
            echo "{{ params.my_param }}"
        {% endfor %}
    """

    t3 = BashOperator(
       
    task_id='templated',
       
    bash_command=templated_command,
       
    params={'my_param': 'Parameter I passed in'},
       
    dag=dag)

    t2.set_upstream(t1)
    t3.set_upstream(t1)

    它是一个DAG定义文件

    一件必须要注意的一件事是:Airflow Python脚本仅仅是一个配置文件,以代码的方式指定了DAG的结构。而真正执行的任务会以不同的上下文执行,不是以这个脚本的上下文。

    对于这个DAG定义文件来说,它们并不执行任何真正的数据处理,它也不是用于此用途。这个脚本的目的是:定义一个DAG对象。它需要很快地执行(秒级别,而不是分级别),因为scheduler会定期执行它,以反映出任何变化(如果有的话)

    引入模块

    一个Airflow pipeline 仅仅是一个Python脚本,用于定义一个Airflow DAG对象。首先我们需要import需要的库:

    # DAG对象;我们需要它实例化一个DAG
    from airflow import DAG

    # Operators;我们需要它去做操作
    from airflow.operators.bash_operator import BashOperator

    默认参数

    我们接下来会创建一个DAG以及一些tasks任务,并且可以显式地传递一组参数到每个task的构造器中(但是此操作会有些重复工作)。另外一种更好的方法是:我们可以定义一个默认参数的字典,在创建task时使用。

    from datetime import datetime, timedelta

    default_args = {
       
    'owner': 'tang-airflow',
       
    'depends_on_past': False,
       
    'start_date': datetime(2019, 6, 23),
       
    'email': ['402877015@qq.com'],
       
    'email_on_failure': False,
       
    'email_on_retry': False,
       
    'retries': 1,
       
    'retry_delay': timedelta(minutes=5),
       
    # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
    }

    以上仅是一组参数定义示范,在实际应用中可以有更多且多样的参数配置。

    实例化一个DAG

    我们需要一个DAG对象用于放置tasks。这里我们传递一个String定义dag_id,作为DAG的唯一标识符。我们也会将之前定义的参数字典传递给此方法,并定义调度DAG的间隔为1天(schedule_interval)。

    dag = DAG('first', default_args=default_args, schedule_interval=timedelta(days=1))

    Tasks

    Task任务是在实例化operator对象时生成的。从operator实例化的对象称为constructor。第一个参数task_id作为task的唯一标志符。

    t1 = BashOperator(
       
    task_id='print_date',
       
    bash_command='date',
       
    dag=dag)

    t2 = BashOperator(
       
    task_id='sleep',
       
    bash_command='sleep 5',
       
    retries=3,
       
    dag=dag)

    这里我们使用的是BashOperator,执行bash命令,参数部分较为简单。在一个task中,使用的参数优先级为:

    1.     显式传递的参数值

    2.     default_args 字典中存在的参数值

    3.     operator的默认值(如果有的话)

    一个task必须包含的两个参数为:task_id以及owner,否则Airflow会抛出异常。

    使用Jinja构建模版

    JinjaPython设计的一种模板语言。Airflow使用Jinja模板语言,为pipeline编写者提供了一组内置的的参数与宏。同时,它也提供了hooks,让用户定义它们自己的参数、宏、以及模板。

    提供的例子仅片面地介绍了在Airflow使用模板语言,不过提供这个例子的主要的目的有两个:1.让读者知道模板这个功能是存在的;2. 让读者了解双花括号的使用,以及最常见的模板变量: {{ ds }} (今天的”data stamp”)

    templated_command = """
        {% for i in range(5) %}
            echo "{{ ds }}"
            echo "{{ macros.ds_add(ds, 7)}}"
            echo "{{ params.my_param }}"
        {% endfor %}
    """
    t3 = BashOperator(
       
    task_id='templated',
       
    bash_command=templated_command,
       
    params={'my_param': 'Parameter I passed in'},
       
    dag=dag)

    需要注意的是 templated_command 的代码逻辑包含在{% %} 块中,引用的参数为{{ ds }}。调用的方法例如 {{ macros.ds_add(ds, 7) }},并且在 {{ params.my_param }} 中引用了一个用户定义的参数。

    BashOperator 中的params hook,允许你传递一个参数字典、以及/或对象到你的模板中。这里需要仔细看一下传递参数时的对应映射关系。

    文件也可以作为参数传递给bash_command,例如 bash_command=’templated_command.sh’,文件的地址为pipeline文件(这里是tutorial.py)所在文件夹的相对地址。这个功能对于很多场景是有用的,例如将脚本逻辑与pipeline代码分离、允许执行其他语言的代码文件、以及构建pipeline更多的灵活性等。也可以在DAG构造器调用中定义你的template_searchpath,指向任何目录地址。

    使用同样的DAG构造器调用,也可以定义user_defined_macros,指定你自己的变量。例如,传递dict(foo=’bar’)到这个参数,可以让你在模板中使用{{ foo }}。此外,指定user_defined_filters,可以注册自定义的过滤器。例如,传递dict(hello=lambda name: ‘Hello %s’ % name) 到这个变量,可以让你在模板中使用{{ ‘world’ | hello }}。对于更多的用户自定义过滤器,可以阅读以下Jinja官方文档:

    http://jinja.pocoo.org/docs/dev/api/#writing-filters

    对于更多有关可在模板中使用的变量与宏的信息,可以参考以下文档:

    https://airflow.apache.org/macros.html

    设置依赖关系

    现在我们有三个taskst1, t2 t3。它们之间并没有相互依赖关系。下面是几种可以用于定义它们之间依赖的方法:

    t1.set_downstream(t2)

    # This means that t2 will depend on t1
    # running successfully to run.
    # It is equivalent to:
    t2.set_upstream(t1)

    # The bit shift operator can also be
    # used to chain operations:
    t1 >> t2

    # And the upstream dependency with the
    # bit shift operator:
    t2 << t1

    # Chaining multiple dependencies becomes
    # concise with the bit shift operator:
    t1 >> t2 >> t3

    # A list of tasks can also be set as
    # dependencies. These operations
    # all have the same effect:
    t1.set_downstream([t2, t3])
    t1 >> [t2, t3]
    [t2, t3] << t1

    需要注意的是,在执行脚本时,如果Airflow发现在DAG中有回环、或是一个依赖被引用超过一次,会抛出异常。

    4. 测试

    我们将以上代码保存在文件tutorial.py中,保存位置为airflow.cfg文件中定义的DAGs目录。默认的DAGs目录地址为~/airflow/dags

    # The folder where your airflow pipelines live, most likely a

    # subfolder in a code repository

    # This path must be absolute

    dags_folder = /home/hadoop/airflow/dags

    执行脚本:

    python3 ~/airflow/dags/tutorial.py

    命令行验证元数据

    执行脚本后,我们执行几个命令进一步验证脚本:

    # 打印出activeDAGs

    > airflow list_dags

    tutorial

    # 打印 tutorial DAGtasks

    > airflow list_tasks tutorial

    print_date

    sleep

    templated

    # 打印tutorial DAG tasks 的树状结构

    > airflow list_tasks tutorial --tree

    <Task(BashOperator): sleep>

        <Task(BashOperator): print_date>

    <Task(BashOperator): templated>

        <Task(BashOperator): print_date>

    测试

    我们可以通过执行task实例进行测试,这里除了传入task外,还需要传入一个date(日期)。这里的date在执行上下文中是一个execution_date,模拟了scheduler在某个特定时间点(data + time)执行task

    # command layout: command subcommand dag_id task_id date

    # testing print_date

    > airflow test tutorial print_date 2019-02-02

    [2019-06-25 03:51:36,370] {bash_operator.py:90} INFO - Exporting the following env vars:

    AIRFLOW_CTX_DAG_ID=tutorial

    AIRFLOW_CTX_TASK_ID=print_date

    AIRFLOW_CTX_EXECUTION_DATE=2019-02-02T00:00:00+00:00

    [2019-06-25 03:51:36,370] {bash_operator.py:104} INFO - Temporary script location: /tmp/airflowtmpc9ntvif0/print_datehrv9r95p

    [2019-06-25 03:51:36,370] {bash_operator.py:114} INFO - Running command: date

    [2019-06-25 03:51:36,374] {bash_operator.py:123} INFO - Output:

    [2019-06-25 03:51:36,376] {bash_operator.py:127} INFO - Tue 25 Jun 03:51:36 UTC 2019

    [2019-06-25 03:51:36,376] {bash_operator.py:131} INFO - Command exited with return code 0

    # testing sleep
    > airflow test tutorial sleep 2019-02-02
     

    [2019-06-25 03:53:15,203] {bash_operator.py:90} INFO - Exporting the following env vars:

    AIRFLOW_CTX_DAG_ID=tutorial

    AIRFLOW_CTX_TASK_ID=sleep

    AIRFLOW_CTX_EXECUTION_DATE=2019-02-02T00:00:00+00:00

    [2019-06-25 03:53:15,203] {bash_operator.py:104} INFO - Temporary script location: /tmp/airflowtmp175xwnf8/sleepdsa5lg3t

    [2019-06-25 03:53:15,203] {bash_operator.py:114} INFO - Running command: sleep 5

    [2019-06-25 03:53:15,207] {bash_operator.py:123} INFO - Output:

    [2019-06-25 03:53:20,209] {bash_operator.py:131} INFO - Command exited with return code 0

    # testing 模板

    > airflow test tutorial templated 2019-02-02

    ...

    [2019-06-25 05:00:21,412] {bash_operator.py:114} INFO - Running command:

        echo "2019-02-02"

        echo "2019-02-09"

        echo "Parameter I passed in"

        echo "2019-02-02"

        echo "2019-02-09"

        echo "Parameter I passed in"

        echo "2019-02-02"

        echo "2019-02-09"

        echo "Parameter I passed in"

        echo "2019-02-02"

        echo "2019-02-09"

        echo "Parameter I passed in"

        echo "2019-02-02"

        echo "2019-02-09"

    echo "Parameter I passed in"

    ...

    需要注意的是,airflow test 命令是在本地运行task实例,将输出打印到stdout,并没有依赖考虑,也没有与数据库沟通状态(running, success, failed, …)。此命令仅测试一个单task实例。

    Backfill

    从本地运行来看,未出现任何问题,现在我们运行一个backfillBackfill可以测试某个DAG在设定的日期区间的运行状况。它会考虑到task之间的依赖、写入日志文件、与数据库交互并记录状态信息。如果启动了一个webserver,则可以在webserver上跟踪它的进度。

    需要注意的是,如果使用depends_on_past=True,则单个task实例的运行取决于它的上游task实例的成功运行。

    在这个上下文中,时间区间是start_date,以及一个可选的end_date

    # optional, start a web server in debug mode in the background
    # airflow webserver --debug &

    # start your backfill on a date range
    airflow backfill tutorial -s 2019-02-02 -e 2019-02-09

    执行之后可在Web Server 界面跟踪它们的执行状态。

    References:

    https://airflow.apache.org/tutorial.html

  • 相关阅读:
    MVP模式与MVVM模式
    webpack的配置处理
    leetcode 287 Find the Duplicate Number
    leetcode 152 Maximum Product Subarray
    leetcode 76 Minimum Window Substring
    感知器算法初探
    leetcode 179 Largest Number
    leetcode 33 Search in Rotated Sorted Array
    leetcode 334 Increasing Triplet Subsequence
    朴素贝叶斯分类器初探
  • 原文地址:https://www.cnblogs.com/zackstang/p/11082322.html
Copyright © 2011-2022 走看看