zoukankan      html  css  js  c++  java
  • airflow

    教程:https://airflow.apache.org/docs/stable/index.html

    官网: http://airflow.incubator.apache.org/index.html

    airflow源码:https://github.com/apache/incubator-airflow

    参考资料:http://www.open-open.com/lib/view/open1452002876105.html

    简介:http://www.cnblogs.com/xianzhedeyu/p/8047828.html

    重要参数介绍:http://www.cnblogs.com/skyrim/p/7456166.html

    http://blog.csdn.net/permike/article/details/52184621

    FAQ :http://blog.csdn.net/yingkongshi99/article/details/52658660

    容器:docker pull puckel/docker-airflow

    启动dag调度器, 注意启动调度器, 并不意味着dag会被马上触发, dag触发需要符合它自己的schedule规则

    如果缺省了END_DATE参数, END_DATE等同于START_DATE.

    使用 DummyOperator 来汇聚分支
    使用 ShortCircuitOperator/BranchPythonOperator 做分支
    使用 SubDagOperator 嵌入一个子dag
    使用 TriggerDagRunOperator 直接trigger 另一个dag

     在创建MyBashOperator的实例时候, 为on_failure_callback和on_success_callback参数设置两个回调函数, 我们在回调函数中, 将success或failed状态记录到自己的表中.

    DAG的schedule_interval参数设置成None, 表明这个DAG始终是由外部触发

    如果将default_args字典传递给DAG,DAG将会将字典应用于其内部的任何Operator上。这很容易的将常用参数应用于多个Operator,而无需多次键入。

    default_args=dict(
        start_date=datetime(2016, 1, 1),
        owner='Airflow')
    
    dag = DAG('my_dag', default_args=default_args)
    op = DummyOperator(task_id='dummy', dag=dag)
    print(op.owner) # Airflow

    initdb,初始化元数据DB,元数据包括了DAG本身的信息、运行信息等;

    resetdb,清空元数据DB;

    list_dags,列出所有DAG;

    list_tasks,列出某DAG的所有task;

    test,测试某task的运行状况;

    backfill,测试某DAG在设定的日期区间的运行状况;

    webserver,开启webserver服务;

    scheduler,用于监控与触发DAG。

    $ cd ${AIRFLOW_HOME}/dags
    
    $ python test_import.py # 保证代码无语法错误
    
    $ airflow list_dags # 查看dag是否成功加载
    
    airflow list_tasks test_import_dag –tree # 查看dag的树形结构是否正确
    
    $ airflow test test_import_dag  test_import_task 2016-3-7 # 测试具体的dag的某个task在某个时间的运行是否正常
    
    $ airflow backfill test_import_dag -s 2016-3-4  -e 2016-3-7 # 对dag进行某段时间内的完整测试
    

      


    # print the list of active DAGs
    airflow list_dags

    # prints the list of tasks the "tutorial" dag_id
    airflow list_tasks tutorial

    # prints the hierarchy of tasks in the tutorial DAG
    airflow list_tasks tutorial --tree

    请注意,airflow test命令在本地运行任务实例,将其日志输出到stdout(屏幕上),不会影响依赖关系,并且不会将状态(运行,成功,失败,...)发送到数据库。 它只是允许简单的测试单个任务实例。
    如果使用depends_on_past = True,则单个任务实例将取决于上一个任务实例的成功与否,如果指定本身的start_date,则忽略此依赖关系
    # start your backfill on a date range
    airflow backfill tutorial -s 2015-06-01 -e 2015-06-07

    使用Xcom在task之间传参

    可以直接使用jinja模板语言,在{{}}中调用ti的xcom_push和xcom_pull方法,下面的例子为t1使用xcom_push推出了一个kv,t2通过taskid和key来接收

     dag = DAG(  
       dag_id='xcomtest', default_args=default_args, schedule_interval='*/2 * ** *')  
       
    t1 = BashOperator(  
       task_id='xcom',  
       bash_command='''''{{ ti.xcom_push(key='aaa', value='bbb') }}''',  
        dag=dag)  
       
    t2 = BashOperator(  
       task_id='xcom2',  
        bash_command='''''echo"{{ ti.xcom_pull(key='aaa', task_ids='xcom') }}" ''',  
        dag=dag)  
    t2.set_upstream(t1)  
    

      

    airflow提供了很多Macros Variables,可以直接使用jinja模板语言调用宏变量

    execution_date并不是task的真正执行时间,而是上一周期task的执行时间。
    我们在airflow上看到一个任务是6am执行的,而且interval=4hours,那么execution_date的值是2am,而不是6am

    暂时无法hold或pause某个task,只支持以dag为单位pause

    当使用BashOperator时,command需要调用脚本时,脚本后需要有个空格,否则报错,暂时不清楚原因,但加空格后可以正常执行,如下例,run.sh后需加空格


    Airflow为Operator提供许多常见任务,包括:
    BashOperator - 执行bash命令
    PythonOperator - 调用任意的Python函数
    EmailOperator - 发送邮件
    HTTPOperator - 发送 HTTP 请求
    SqlOperator - 执行 SQL 命令
    Sensor - 等待一定时间,文件,数据库行,S3键等...
    除了这些基本的构建块之外,还有更多的特定Operator:DockerOperator,HiveOperator,S3FileTransferOperator,PrestoToMysqlOperator,SlackOperator

    使用supervisord进行deamon

    airflow本身没有deamon模式,所以直接用supervisord就ok了,我们只要写4行代码

    [program:airflow_web]
    command=/home/kimi/env/athena/bin/airflow webserver -p 8080
    
    [program:airflow_scheduler]
    command=/home/kimi/env/athena/bin/airflow scheduler
    
    作者:yin1941
    链接:https://www.jianshu.com/p/59d69981658a
    來源:简书
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
    

      

    airflow 执行的命令或这种消息是支持 jinja2 模板语言;{{ ds }}是一种宏,表示当前的日期,
    形如2016-12-16,支持的宏在
    https://airflow.incubator.apache.org/code.html#macros

    test: 用于测试特定的某个task,不需要依赖满足
    run: 用于执行特定的某个task,需要依赖满足
    backfill: 执行某个DAG,会自动解析依赖关系,按依赖顺序执行
    unpause: 将一个DAG启动为例行任务,默认是关的,所以编写完DAG文件后一定要执行这和要命令,相反命令为pause
    scheduler: 这是整个 airflow 的调度程序,一般是在后台启动
    clear: 清除一些任务的状态,这样会让scheduler来执行重跑

     ============================

    前面的脚本里用到了{{ ds }}变量,每个DAG在执行时都会传入一个具体的时间(datetime对象), 这个ds就会在 render 命令时被替换成对应的时间。这里要特别强调一下, 对于周期任务,airflow传入的时间是上一个周期的时间(划重点),比如你的任务是每天执行, 那么今天传入的是昨天的日期,如果是周任务,那传入的是上一周今天的值

    ==========================

    executor
    SequentialExecutor:表示单进程顺序执行,通常只用于测试
    LocalExecutor:表示多进程本地执行,它用python的多进程库从而达到多进程跑任务的效果。
    CeleryExecutor:表示使用celery作为执行器,只要配置了celery,就可以分布式地多机跑任务,一般用于生产环境。
    sql_alchemy_conn :这个配置让你指定 airflow 的元信息用何种方式存储,默认用sqlite,如果要部署到生产环境,推荐使用 mysql。

    smtp :如果你需要邮件通知或用到 EmailOperator 的话,需要配置发信的 smtp 服务器

    ======================

    触发条件有两个维度, 以T1&T2->T3 这样的dag为例:
    一个维度是: 要根据dag上次运行T3的状态确定本次T3是否被调用, 由DAG的default_args.depends_on_past参数控制, 为True时, 只有上次T3运行成功, 这次T3才会被触发

    另一个维度是: 要根据前置T1和T2的状态确定本次T3是否被调用, 由T3.trigger_rule参数控制, 有下面6种情形, 缺省是all_success.
    all_success: (default) all parents have succeeded
    all_failed: all parents are in a failed or upstream_failed state
    all_done: all parents are done with their execution
    one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done
    one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done
    dummy: dependencies are just for show, trigger at will

    ========================

    airflow有两个基于PythonOperator的Operator来支持dag分支功能.

    ShortCircuitOperator, 用来实现流程的判断. Task需要基于ShortCircuitOperator, 如果本Task返回为False的话, 其下游Task将被skip; 如果为True的话, 其下游Task将会被正常执行. 尤其适合用在其下游都是单线节点的场景.

    BranchPythonOperator, 用来实现Case分支. Task需要基于BranchPythonOperator, airflow会根据本task的返回值(返回值是某个下游task的id),来确定哪个下游Task将被执行, 其他下游Task将被skip.

    ======================

    connection 表:
    我们的Task往往需要通过jdbc/ftp/http/webhdfs方式访问其他资源, 一般地访问资源时候都需要一些签证, airflow允许我们将这些connection以及鉴证存放在connection表中. 可以现在WebUI的Admin->Connections管理这些连接, 在代码中使用这些连接.

    MySQL 应该使用 mysqlclient 包, 我简单试了mysql-connector-python 有报错

    LocalExecutor 和 CeleryExecutor 都可用于生产环境, CeleryExecutor 将使用 Celery 作为Task执行的引擎, 扩展性很好, 当然配置也更复杂, 需要先setup Celery的backend(包括RabbitMQ, Redis)等. 其实真正要求扩展性的场景并不多, 所以LocalExecutor 是一个很不错的选择了.

    1. 配置OS环境变量 AIRFLOW_HOME, AIRFLOW_HOME缺省为 ~/airflow
    2. 运行下面命令初始化一个Sqlite backend DB, 并生成airflow.cfg文件
    your_python ${AIRFLOW_HOME}inairflow initdb
    3. 如果需要修改backend DB类型, 修改$AIRFLOW_HOME/airflow.cfg文件 sql_alchemy_conn后, 然后重新运行 airflow initdb .
    官方推荐使用MySQL/PostgreSQL做DB Server.

    有下面3个参数用于控制Task的并发度,
    parallelism, 一个Executor同时运行task实例的个数
    dag_concurrency, 一个dag中某个task同时运行的实例个数
    max_active_runs_per_dag: 一个dag同时启动的实例个数

    start_date 有点特别,如果你设置了这个参数,那么airflow就会从start_date开始以 schedule_interval 的规则开始执行,例如设置成3天前每小时执行一次,那么在调度正常启动时,就会立即调度 24*3 次,但注意,脚本执行环境的时间还是当前的系统时间,而不会说真是把系统时间模拟成3天前,所以感觉这个功能应用场景比较好限。

    ===========================

    dags_folder目录支持子目录和软连接,因此不同的dag可以分门别类的存储起来

    schedule_interval=timedelta(minutes=1) 或者 crontab格式
    crontab格式的介绍:https://www.cnblogs.com/chenshishuo/p/5152068.html http://blog.csdn.net/liguohanhaha/article/details/52261192

    sql_alchemy_conn = mysql://ct:152108@localhost/airflow
    对应字段解释如下: dialect+driver://username:password@host:port/database

    当遇到不符合常理的情况时考虑清空 airflow backend的数据库, 可使用airflow resetdb清空。
    删除dag文件后,webserver中可能还会存在相应信息,这时需要重启webserver并刷新网页。
    关闭webserver: ps -ef|grep -Ei '(airflow-webserver)'| grep master | awk '{print $2}'|xargs -i kill {}

    界面的时候看起来比较蛋疼, utc-0的时间,
    修改.../python2.7/site-packages/airflow/www/templates/admin/master.html如下(注释掉UCTSeconds,新增一行UTCSeconds), 这样时间就是本地时间了。


    验证脚本是否有问题:python xxx.py
    看是否能查询出新增的dags吗:airflow list_dags
    启动schedule :airflow scheduler


    这里有的 start_date 有点特别,如果你设置了这个参数,那么airflow就会从start_date开始以 schedule_interval 的规则开始执行,例如设置成3天前每小时执行一次,那么在调度正常启动时,就会立即调度 24*3 次,但注意,脚本执行环境的时间还是当前的系统时间,而不会说真是把系统时间模拟成3天前,所以感觉这个功能应用场景比较好限

    在centos6.8上装特别顺利(运行时貌似一切都正常,就是任务一直处于running状态---debug了一番源代码, 发现内存要必需够大,发现必需用非root身份运行airflow worker, 务必保证核数够用,否则需要调低dag_concurrency, max_active_runs_per_dag,max_threads,parallelism, 否则worker出现莫名其妙的问题)

    airflow跑着跑着就挂了,一看内存还够用(可能需要不要钱的加内存),如果你到处找不到想要的错误日志。那么看看AIRFLOW_HOME下面是不是莫名其妙的多了几个 .err/.out 的文件,进去看看会有收获。

    在需要运行作业的机器上的安装airflow airflow[celery] celery[redis] 模块后,启动airflow worker即可.这样作业就能运行在多个节点上.

    安装主模块
    [airflow@airflow ~]$ pip install airflow
    2.4.2 安装数据库模块、密码模块
    [airflow@airflow ~]$ pip install "airflow[postgres,password]"

  • 相关阅读:
    不可小视视图对效率的影响力
    Maximum Margin Planning
    PhysicsBased Boiling Simulation

    Learning Behavior Styles with Inverse Reinforcement Learning
    Simulating Biped Behaviors from Human Motion Data
    Nearoptimal Character Animation with Continuous Control
    Apprenticeship Learning via Inverse Reinforcement Learning
    回报函数学习的学徒学习综述
    Enabling Realtime Physics Simulation in Future Interactive Entertainment
  • 原文地址:https://www.cnblogs.com/testzcy/p/8427141.html
Copyright © 2011-2022 走看看