zoukankan      html  css  js  c++  java
  • airflow之SubDAGs(转载)

    转载:https://www.yuque.com/apachecn/airflow-doc-zh/zh_concepts

    SubDAGs

     

    SubDAG 非常适合重复模式。在使用 Airflow 时,定义一个返回 DAG 对象的函数是一个很好的设计模式。

     

    Airbnb 在加载数据时使用阶段检查交换模式。数据在临时表中暂存,然后对该表执行数据质量检查。 一旦检查全部通过,分区就会移动到生产表中。

     

    再举一个例子,考虑以下 DAG:

     

    我们可以将所有并行task-*运算符组合到一个 SubDAG 中,以便生成的 DAG 类似于以下内容:

     

     

    请注意,SubDAG 运算符应包含返回 DAG 对象的工厂方法。 这将阻止 SubDAG 在主 UI 中被视为单独的 DAG。 例如:

    #dags/subdag.py
    from airflow.models import DAG
    from airflow.operators.dummy_operator import DummyOperator
    
    
    # Dag is returned by a factory method
    def sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval):
      dag = DAG(
        dag_id='%s.%s' % (parent_dag_name, child_dag_name),
        schedule_interval=schedule_interval,
        start_date=start_date,
      )
    
      dummy_operator = DummyOperator(
        task_id='dummy_task',
        dag=dag,
      )
    
      return dag

    在主dag中使用

    # main_dag.py
    from datetime import datetime, timedelta
    from airflow.models import DAG
    from airflow.operators.subdag_operator import SubDagOperator
    from dags.subdag import sub_dag
    
    
    PARENT_DAG_NAME = 'parent_dag'
    CHILD_DAG_NAME = 'child_dag'
    
    main_dag = DAG(
      dag_id=PARENT_DAG_NAME,
      schedule_interval=timedelta(hours=1),
      start_date=datetime(2016, 1, 1)
    )
    
    sub_dag = SubDagOperator(
      subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, main_dag.start_date,
                     main_dag.schedule_interval),
      task_id=CHILD_DAG_NAME,
      dag=main_dag,
    )

    您可以从主 DAG 的图形视图放大 SubDagOperator,以显示 SubDAG 中包含的任务:

     

    使用 SubDAG 时的一些其他提示:

     

    • 按照惯例,SubDAG 的dag_id应以其父级和点为前缀。 和在parent.child
    • 通过将参数传递给 SubDAG 运算符来共享主 DAG 和 SubDAG 之间的参数(如上所示)
    • SubDAG 必须有一个计划并启用。如果 SubDAG 的时间表设置为None@once,SubDAG 将成功完成而不做任何事情
    • 清除 SubDagOperator 也会清除其中的任务状态
    • 在 SubDagOperator 上标记成功不会影响其中的任务状态
    • 避免在任务中使用depends_on_past=True,因为这可能会造成混淆
    • 可以为 SubDAG 指定执行程序。 如果要在进程中运行 SubDAG 并有效地将其并行性限制为 1,则通常使用 SequentialExecutor。使用 LocalExecutor 可能会有问题,因为它可能会过度消耗您的 workers,在单个插槽中运行多个任务
  • 相关阅读:
    ubuntu 安装 redis desktop manager
    ubuntu 升级内核
    Ubuntu 内核升级,导致无法正常启动
    spring mvc 上传文件,但是接收到文件后发现文件变大,且文件打不开(multipartfile)
    angular5 open modal
    POJ 1426 Find the Multiple(二维DP)
    POJ 3093 Margritas
    POJ 3260 The Fewest Coins
    POJ 1837 Balance(二维DP)
    POJ 1337 A Lazy Worker
  • 原文地址:https://www.cnblogs.com/wangbin2188/p/15310423.html
Copyright © 2011-2022 走看看