zoukankan      html  css  js  c++  java
  • airflow 简介

    转载:https://zhuanlan.zhihu.com/p/36043468

     

    简介

    Apache-Airflow 是Airbnb开源的一款数据流程工具,目前是Apache孵化项目。以非常灵活的方式来支持数据的ETL过程,同时还支持非常多的插件来完成诸如HDFS监控、邮件通知等功能。Airflow支持单机和分布式两种模式,支持Master-Slave模式,支持Mesos等资源调度,有非常好的扩展性。被大量公司采用。

    Airflow提供了一系列的python SDK,用户在该SDK的规范下,使用python定义各个ETL节点执行的工作,节点间的关系,同时定义执行计划,失败策略等,提交到Airflow平台中后,平台会根据执行计划自动执行,同时支持失败重试、失败通知等能力。

    同时,Airflow还提供了一个Web UI来查看数据流程的执行和支持一部分简单操作。部分功能也可以通过命令行或者Restful API来完成。

     

     

     

     

     

     

    概念

    Airflow中有几个重要概念,比较典型和易理解:

    • Operators:Airflow定义的一系列算子/操作符,更直接的理解就是python class。不同的Operator类实现了具体的功能,比如:
      • BashOperator: 可以执行用户指定的一个Bash命令
      • PythonOperator:可以执行用户指定的一个python函数
      • EmailOperator:可以进行邮件发送
      • Sensor:感知器/触发器,可以定义触发条件和动作,在条件满足时执行某个动作。Airflow提供了更具体的Sensor,比如FileSensor,DatabaseSensor等

     

    • Tasks:Operators的具体实例,在某个Operator的基础上指定了具体的参数或内容。其实就是OO概念中的对象(Operator是类)。
    • Task Instances:一个Task的一次运行会产生一个实例
    • DAGS:有向无环图,包括一系列的tasks和tasks之间的链接关系

    由此可以看出来,使用Airflow的步骤就是定义以上概念的过程:

    1. 根据实际需要,使用不同的Operator
    2. 传入具体的参数,定义一系列的Tasks
    3. 定义Tasks间的关系,形成一个DAG
    4. 调度DAG运行,每个Task会行成一个Instance
    5. 使用命令行或者Web UI进行查看和管理

     

     

    安装

    安装非常简单

    # airflow needs a home, ~/airflow is the default,
    # but you can lay foundation somewhere else if you prefer
    # (optional)
    export AIRFLOW_HOME=~/airflow
    
    # install from pypi using pip
    pip install apache-airflow
    
    # initialize the database
    airflow initdb
    
    # start the web server, default port is 8080
    airflow webserver -p 8080
    # start the scheduler server
    airflow scheduler

     

    不过实际使用过程中发现Airflow对Python 3的兼容性更好,因此强烈建议运行在Python 3环境下。建议使用pyenv + pyenv-virtualenv来管理多个python版本。

    配置

    默认情况下,airflow在用户主目录下创建一个airflow目录作为AIRFLOW_HOME并生成数据库文件和配置文件。配置文件中包括众多的配置参数,可以根据需要修改。

    如果需要使用邮件功能,修改SMTP参数:

    [smtp]
    # If you want airflow to send emails on retries, failure, and you want to use
    # the airflow.utils.email.send_email_smtp function, you have to configure an
    # smtp server here
    smtp_host = HOST
    smtp_starttls = False
    smtp_ssl = False
    smtp_user = USER_NAME
    smtp_password = PASSWORD
    smtp_port = 25
    smtp_mail_from = FROM_EMAIL

     

    示例

    以下是官方的示例,定义了一个三节点的流程。第一个节点执行Bash命令打印日期,第二个节点执行Bash命令Sleep 5秒,第三个节点使用模板方式执行Bash命令。

    把该python文件拷贝到AIRFLOW_HOME/dags目录下(如果不存在,手工创建),Airflow会自动检测并更新加载。

    """
    Code that goes along with the Airflow tutorial located at:
    https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
    """
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from datetime import datetime, timedelta
    
    
    default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    }
    
    dag = DAG('tutorial', default_args=default_args)
    
    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)
    
    t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)
    
    templated_command = """
    {% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
    echo "{{ params.my_param }}"
    {% endfor %}
    """
    
    t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)
    
    t2.set_upstream(t1)
    t3.set_upstream(t1)
    

     

    常见问题

    时区问题

    当前版本的Airflow默认使用的是UTC时区,如果要指定流程定时运行,需要减去8。 即如果希望流程在每天晚上20:00点开始执行,实际要填写的时间是12:00

    定时问题

    Airflow DAG的定时规则遵循cron experssion格式,同时还提供了一些“快捷方式”,比如 @hourly 可以定义每小时运行一次.

    其他参数还有:

    1. start_date: 流程开始调度的时间,可以早于或者晚于当前时间
    2. end_data: 流程结束调度的时间
    3. catch_up: 如果指定的开始时间早于当前时间且catch_up设置为true,那么airflow会把过去‘遗漏’的调度执行一遍

    举例:

    如果今天的时间是2018-04-12 08:00, 流程的定时策略是每天上午10:00执行,那么schedule_interval='00 02 * * *' (减8小时)
    如果start_date是 2018-04-01,且catch_up为true。那么在提交到平台后,Airflow会开始从2018-04-01的日期开始调度执行,执行11次到2018-04-11。
    Airflow此时等待到10:00,执行2018-04-12当天的流程

    Sensor问题

    Airflow提供了很多现成的Sensor,比如用于监控HDFS文件的Sensor。但是由于Sensor特性基于snakebite库,而snakebite目前并不支持Python 3,因此Sensor相关的特性在Python3下暂时无法使用。

  • 相关阅读:
    Using the @synchronized Directive
    What Are Threads?
    ios 线程同步
    CAAnimation 动画支撑系统
    UIView 动画 依赖与 CALayer的证据
    动画的定义
    Core Animation1-简介
    繁的是表象,简的是本质
    完全自定义动画
    线程安全与可重入
  • 原文地址:https://www.cnblogs.com/to-here/p/11890187.html
Copyright © 2011-2022 走看看