zoukankan      html  css  js  c++  java
  • airflow

    教程:https://airflow.apache.org/docs/stable/index.html

    官网: http://airflow.incubator.apache.org/index.html

    airflow源码:https://github.com/apache/incubator-airflow

    参考资料:http://www.open-open.com/lib/view/open1452002876105.html

    简介:http://www.cnblogs.com/xianzhedeyu/p/8047828.html

    重要参数介绍:http://www.cnblogs.com/skyrim/p/7456166.html

    http://blog.csdn.net/permike/article/details/52184621

    FAQ :http://blog.csdn.net/yingkongshi99/article/details/52658660

    容器:docker pull puckel/docker-airflow

    启动dag调度器, 注意启动调度器, 并不意味着dag会被马上触发, dag触发需要符合它自己的schedule规则

    如果缺省了END_DATE参数, END_DATE等同于START_DATE.

    使用 DummyOperator 来汇聚分支
    使用 ShortCircuitOperator/BranchPythonOperator 做分支
    使用 SubDagOperator 嵌入一个子dag
    使用 TriggerDagRunOperator 直接trigger 另一个dag

     在创建MyBashOperator的实例时候, 为on_failure_callback和on_success_callback参数设置两个回调函数, 我们在回调函数中, 将success或failed状态记录到自己的表中.

    DAG的schedule_interval参数设置成None, 表明这个DAG始终是由外部触发

    如果将default_args字典传递给DAG,DAG将会将字典应用于其内部的任何Operator上。这很容易的将常用参数应用于多个Operator,而无需多次键入。

    default_args=dict(
        start_date=datetime(2016, 1, 1),
        owner='Airflow')
    
    dag = DAG('my_dag', default_args=default_args)
    op = DummyOperator(task_id='dummy', dag=dag)
    print(op.owner) # Airflow

    initdb,初始化元数据DB,元数据包括了DAG本身的信息、运行信息等;

    resetdb,清空元数据DB;

    list_dags,列出所有DAG;

    list_tasks,列出某DAG的所有task;

    test,测试某task的运行状况;

    backfill,测试某DAG在设定的日期区间的运行状况;

    webserver,开启webserver服务;

    scheduler,用于监控与触发DAG。

    $ cd ${AIRFLOW_HOME}/dags
    
    $ python test_import.py # 保证代码无语法错误
    
    $ airflow list_dags # 查看dag是否成功加载
    
    airflow list_tasks test_import_dag –tree # 查看dag的树形结构是否正确
    
    $ airflow test test_import_dag  test_import_task 2016-3-7 # 测试具体的dag的某个task在某个时间的运行是否正常
    
    $ airflow backfill test_import_dag -s 2016-3-4  -e 2016-3-7 # 对dag进行某段时间内的完整测试
    

      


    # print the list of active DAGs
    airflow list_dags

    # prints the list of tasks the "tutorial" dag_id
    airflow list_tasks tutorial

    # prints the hierarchy of tasks in the tutorial DAG
    airflow list_tasks tutorial --tree

    请注意,airflow test命令在本地运行任务实例,将其日志输出到stdout(屏幕上),不会影响依赖关系,并且不会将状态(运行,成功,失败,...)发送到数据库。 它只是允许简单的测试单个任务实例。
    如果使用depends_on_past = True,则单个任务实例将取决于上一个任务实例的成功与否,如果指定本身的start_date,则忽略此依赖关系
    # start your backfill on a date range
    airflow backfill tutorial -s 2015-06-01 -e 2015-06-07

    使用Xcom在task之间传参

    可以直接使用jinja模板语言,在{{}}中调用ti的xcom_push和xcom_pull方法,下面的例子为t1使用xcom_push推出了一个kv,t2通过taskid和key来接收

     dag = DAG(  
       dag_id='xcomtest', default_args=default_args, schedule_interval='*/2 * ** *')  
       
    t1 = BashOperator(  
       task_id='xcom',  
       bash_command='''''{{ ti.xcom_push(key='aaa', value='bbb') }}''',  
        dag=dag)  
       
    t2 = BashOperator(  
       task_id='xcom2',  
        bash_command='''''echo"{{ ti.xcom_pull(key='aaa', task_ids='xcom') }}" ''',  
        dag=dag)  
    t2.set_upstream(t1)  
    

      

    airflow提供了很多Macros Variables,可以直接使用jinja模板语言调用宏变量

    execution_date并不是task的真正执行时间,而是上一周期task的执行时间。
    我们在airflow上看到一个任务是6am执行的,而且interval=4hours,那么execution_date的值是2am,而不是6am

    暂时无法hold或pause某个task,只支持以dag为单位pause

    当使用BashOperator时,command需要调用脚本时,脚本后需要有个空格,否则报错,暂时不清楚原因,但加空格后可以正常执行,如下例,run.sh后需加空格


    Airflow为Operator提供许多常见任务,包括:
    BashOperator - 执行bash命令
    PythonOperator - 调用任意的Python函数
    EmailOperator - 发送邮件
    HTTPOperator - 发送 HTTP 请求
    SqlOperator - 执行 SQL 命令
    Sensor - 等待一定时间,文件,数据库行,S3键等...
    除了这些基本的构建块之外,还有更多的特定Operator:DockerOperator,HiveOperator,S3FileTransferOperator,PrestoToMysqlOperator,SlackOperator

    使用supervisord进行deamon

    airflow本身没有deamon模式,所以直接用supervisord就ok了,我们只要写4行代码

    [program:airflow_web]
    command=/home/kimi/env/athena/bin/airflow webserver -p 8080
    
    [program:airflow_scheduler]
    command=/home/kimi/env/athena/bin/airflow scheduler
    
    作者:yin1941
    链接:https://www.jianshu.com/p/59d69981658a
    來源:简书
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
    

      

    airflow 执行的命令或这种消息是支持 jinja2 模板语言;{{ ds }}是一种宏,表示当前的日期,
    形如2016-12-16,支持的宏在
    https://airflow.incubator.apache.org/code.html#macros

    test: 用于测试特定的某个task,不需要依赖满足
    run: 用于执行特定的某个task,需要依赖满足
    backfill: 执行某个DAG,会自动解析依赖关系,按依赖顺序执行
    unpause: 将一个DAG启动为例行任务,默认是关的,所以编写完DAG文件后一定要执行这和要命令,相反命令为pause
    scheduler: 这是整个 airflow 的调度程序,一般是在后台启动
    clear: 清除一些任务的状态,这样会让scheduler来执行重跑

     ============================

    前面的脚本里用到了{{ ds }}变量,每个DAG在执行时都会传入一个具体的时间(datetime对象), 这个ds就会在 render 命令时被替换成对应的时间。这里要特别强调一下, 对于周期任务,airflow传入的时间是上一个周期的时间(划重点),比如你的任务是每天执行, 那么今天传入的是昨天的日期,如果是周任务,那传入的是上一周今天的值

    ==========================

    executor
    SequentialExecutor:表示单进程顺序执行,通常只用于测试
    LocalExecutor:表示多进程本地执行,它用python的多进程库从而达到多进程跑任务的效果。
    CeleryExecutor:表示使用celery作为执行器,只要配置了celery,就可以分布式地多机跑任务,一般用于生产环境。
    sql_alchemy_conn :这个配置让你指定 airflow 的元信息用何种方式存储,默认用sqlite,如果要部署到生产环境,推荐使用 mysql。

    smtp :如果你需要邮件通知或用到 EmailOperator 的话,需要配置发信的 smtp 服务器

    ======================

    触发条件有两个维度, 以T1&T2->T3 这样的dag为例:
    一个维度是: 要根据dag上次运行T3的状态确定本次T3是否被调用, 由DAG的default_args.depends_on_past参数控制, 为True时, 只有上次T3运行成功, 这次T3才会被触发

    另一个维度是: 要根据前置T1和T2的状态确定本次T3是否被调用, 由T3.trigger_rule参数控制, 有下面6种情形, 缺省是all_success.
    all_success: (default) all parents have succeeded
    all_failed: all parents are in a failed or upstream_failed state
    all_done: all parents are done with their execution
    one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done
    one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done
    dummy: dependencies are just for show, trigger at will

    ========================

    airflow有两个基于PythonOperator的Operator来支持dag分支功能.

    ShortCircuitOperator, 用来实现流程的判断. Task需要基于ShortCircuitOperator, 如果本Task返回为False的话, 其下游Task将被skip; 如果为True的话, 其下游Task将会被正常执行. 尤其适合用在其下游都是单线节点的场景.

    BranchPythonOperator, 用来实现Case分支. Task需要基于BranchPythonOperator, airflow会根据本task的返回值(返回值是某个下游task的id),来确定哪个下游Task将被执行, 其他下游Task将被skip.

    ======================

    connection 表:
    我们的Task往往需要通过jdbc/ftp/http/webhdfs方式访问其他资源, 一般地访问资源时候都需要一些签证, airflow允许我们将这些connection以及鉴证存放在connection表中. 可以现在WebUI的Admin->Connections管理这些连接, 在代码中使用这些连接.

    MySQL 应该使用 mysqlclient 包, 我简单试了mysql-connector-python 有报错

    LocalExecutor 和 CeleryExecutor 都可用于生产环境, CeleryExecutor 将使用 Celery 作为Task执行的引擎, 扩展性很好, 当然配置也更复杂, 需要先setup Celery的backend(包括RabbitMQ, Redis)等. 其实真正要求扩展性的场景并不多, 所以LocalExecutor 是一个很不错的选择了.

    1. 配置OS环境变量 AIRFLOW_HOME, AIRFLOW_HOME缺省为 ~/airflow
    2. 运行下面命令初始化一个Sqlite backend DB, 并生成airflow.cfg文件
    your_python ${AIRFLOW_HOME}inairflow initdb
    3. 如果需要修改backend DB类型, 修改$AIRFLOW_HOME/airflow.cfg文件 sql_alchemy_conn后, 然后重新运行 airflow initdb .
    官方推荐使用MySQL/PostgreSQL做DB Server.

    有下面3个参数用于控制Task的并发度,
    parallelism, 一个Executor同时运行task实例的个数
    dag_concurrency, 一个dag中某个task同时运行的实例个数
    max_active_runs_per_dag: 一个dag同时启动的实例个数

    start_date 有点特别,如果你设置了这个参数,那么airflow就会从start_date开始以 schedule_interval 的规则开始执行,例如设置成3天前每小时执行一次,那么在调度正常启动时,就会立即调度 24*3 次,但注意,脚本执行环境的时间还是当前的系统时间,而不会说真是把系统时间模拟成3天前,所以感觉这个功能应用场景比较好限。

    ===========================

    dags_folder目录支持子目录和软连接,因此不同的dag可以分门别类的存储起来

    schedule_interval=timedelta(minutes=1) 或者 crontab格式
    crontab格式的介绍:https://www.cnblogs.com/chenshishuo/p/5152068.html http://blog.csdn.net/liguohanhaha/article/details/52261192

    sql_alchemy_conn = mysql://ct:152108@localhost/airflow
    对应字段解释如下: dialect+driver://username:password@host:port/database

    当遇到不符合常理的情况时考虑清空 airflow backend的数据库, 可使用airflow resetdb清空。
    删除dag文件后,webserver中可能还会存在相应信息,这时需要重启webserver并刷新网页。
    关闭webserver: ps -ef|grep -Ei '(airflow-webserver)'| grep master | awk '{print $2}'|xargs -i kill {}

    界面的时候看起来比较蛋疼, utc-0的时间,
    修改.../python2.7/site-packages/airflow/www/templates/admin/master.html如下(注释掉UCTSeconds,新增一行UTCSeconds), 这样时间就是本地时间了。


    验证脚本是否有问题:python xxx.py
    看是否能查询出新增的dags吗:airflow list_dags
    启动schedule :airflow scheduler


    这里有的 start_date 有点特别,如果你设置了这个参数,那么airflow就会从start_date开始以 schedule_interval 的规则开始执行,例如设置成3天前每小时执行一次,那么在调度正常启动时,就会立即调度 24*3 次,但注意,脚本执行环境的时间还是当前的系统时间,而不会说真是把系统时间模拟成3天前,所以感觉这个功能应用场景比较好限

    在centos6.8上装特别顺利(运行时貌似一切都正常,就是任务一直处于running状态---debug了一番源代码, 发现内存要必需够大,发现必需用非root身份运行airflow worker, 务必保证核数够用,否则需要调低dag_concurrency, max_active_runs_per_dag,max_threads,parallelism, 否则worker出现莫名其妙的问题)

    airflow跑着跑着就挂了,一看内存还够用(可能需要不要钱的加内存),如果你到处找不到想要的错误日志。那么看看AIRFLOW_HOME下面是不是莫名其妙的多了几个 .err/.out 的文件,进去看看会有收获。

    在需要运行作业的机器上的安装airflow airflow[celery] celery[redis] 模块后,启动airflow worker即可.这样作业就能运行在多个节点上.

    安装主模块
    [airflow@airflow ~]$ pip install airflow
    2.4.2 安装数据库模块、密码模块
    [airflow@airflow ~]$ pip install "airflow[postgres,password]"

  • 相关阅读:
    maven上传jar包规范
    java.util.ConcurrentModificationException
    求集合中的最大值和最小值
    对象/集合转换成json
    字符串直接赋值和构造赋值的区别
    CSV文件读取
    读取properties配置文件
    图片轮播 js代码
    工作流数据库字段设计-审批流程。。
    @Html.Partials 加载分布视图传参数
  • 原文地址:https://www.cnblogs.com/testzcy/p/8427141.html
Copyright © 2011-2022 走看看