zoukankan      html  css  js  c++  java
  • airflow sample to pass metadata to task. XCOM

    1. At first, let us take a look at one sample which  one we do not need to pass the metadata

    import random
    from datetime import datetime, timedelta
    from airflow.utils.dates import days_ago
    from airflow.operators.python import PythonOperator
    from airflow import DAG
    
    default_args = {
        'owner': 'Jasmine Qian',
        'start_date': days_ago(0),
        'email': ['jasmine.qian@liveramp.com'],
        'retries': 0,
        'retry_delay': timedelta(minutes=2),
    }
    
    dag = DAG(
        'Pass_Metadata',
        default_args=default_args,
        tags=['meta', 'params'],
        schedule_interval=None,
        catchup=False,
    )
    
    
    def run_this_fun(**context):
        print("Hi")
    
    
    def check_state(**context):
        if random.random() > 0.7:
            raise Exception('Exception')
        print("I am OK")
    
    
    run_this_task1 = PythonOperator(
        task_id="run_this",
        python_callable=check_state,
        provide_context=True,
        dag=dag,
    )
    
    
    run_this_task2 = PythonOperator(
        task_id="run_this2",
        python_callable=run_this_fun,
        provide_context=True,
        dag=dag,
    )
    
    
    run_this_task1 >> run_this_task2
    
    
    # run_this_task2
    if __name__ == "__main__":
        dag.cli()

    2. how about task1 passing any metadata to task2?

    import random
    from datetime import datetime, timedelta
    from airflow.utils.dates import days_ago
    from airflow.operators.python import PythonOperator
    from airflow import DAG
    
    default_args = {
        'owner': 'Jasmine Qian',
        'start_date': days_ago(0),
        'email': ['jasmine.qian@liveramp.com'],
        'retries': 0,
        'retry_delay': timedelta(minutes=2),
    }
    
    dag = DAG(
        'Pass_Metadata_v2',
        default_args=default_args,
        tags=['meta', 'params'],
        schedule_interval=None,
        catchup=False,
    )
    
    
    def run_this_fun(**context):
        received_value = context['ti'].xcom_pull(key='random_value')
        print("Hi, I received the value {}".format(str(received_value)))
    
    
    def push_to_xcom(**context):
        random_value = random.random()
        context['ti'].xcom_push(key='random_value', value=random_value)
        print("I am OK")
    
    
    run_this_task1 = PythonOperator(
        task_id="run_this",
        python_callable=push_to_xcom,
        provide_context=True,
        dag=dag,
    )
    
    run_this_task2 = PythonOperator(
        task_id="run_this2",
        python_callable=run_this_fun,
        provide_context=True,
        dag=dag,
    )
    
    run_this_task1 >> run_this_task2
    
    # run_this_task2
    if __name__ == "__main__":
        dag.cli()
    ------------------------- A little Progress a day makes you a big success... ----------------------------
  • 相关阅读:
    第一阶段意见评论
    《大道至简》阅读笔记03
    大二下第十二周学习笔记
    《大道至简》阅读笔记02
    课上练习—单词统计
    大二下第十一周学习笔记
    《大道至简》阅读笔记01
    《人月神话》阅读笔记03
    大二下第十周学习笔记
    团队项目前期冲刺-10
  • 原文地址:https://www.cnblogs.com/qianjinyan/p/15534110.html
Copyright © 2011-2022 走看看