zoukankan      html  css  js  c++  java
  • airflow中跨dag/task的数据传送

    默认在airflow中每个task都是独立的进程,无法进行数据交换,但airflow还提供了一个XCom功能,以满足用户的类似需求

    下面我们创建两个dag,其中一个push数据,一个pull数据,如下的dag中的task(push)执行完后会推送一条数据到xcom表,key=push ,value=True

    dag = DAG(
        dag_id='migrate_mongo',
        default_args=args,
        schedule_interval='0 17 * * *',
        catchup=False
    )
    
    
    def push(**kwargs):
        kwargs['ti'].xcom_push(key='status', value=True)
    
    task1 = PythonOperator(task_id='push',
                           python_callable=push,
                           provide_context=True,
                           dag=dag)

    另起一个dag,创建一个task(pull)可以将上面的dag中的数据获取到

    dag = DAG(
        dag_id='migrate_mongo2',
        default_args=args,
        schedule_interval='0 17 * * *',
        catchup=False
    )
    
    
    def pull(**kwargs):
        execution_date = kwargs['execution_date']
        v1 = XCom.get_one(dag_id='migrate_mongo', task_id='push', execution_date=execution_date)
        print(v1)
    
    task2 = PythonOperator(task_id='pull',
                           python_callable=pull,
                           provide_context=True,
                           dag=dag)

    两个dag之间其实通过airflow底层的一张xcom表进行数据交换的

  • 相关阅读:
    asp.net mvc Bundle
    Nginx配置文件详细说明
    Nginx 负载均衡配置和策略
    WPF 体系结构
    理解WPF中的视觉树和逻辑树
    物料需求預估
    缓存ViewState减少网络传输
    xingfu
    如何在表格中插入图片
    add
  • 原文地址:https://www.cnblogs.com/wangbin2188/p/14035783.html
Copyright © 2011-2022 走看看