zoukankan      html  css  js  c++  java
  • airflow sample to pass metadata to task. XCOM

    1. At first, let us take a look at one sample which  one we do not need to pass the metadata

    import random
    from datetime import datetime, timedelta
    from airflow.utils.dates import days_ago
    from airflow.operators.python import PythonOperator
    from airflow import DAG
    
    default_args = {
        'owner': 'Jasmine Qian',
        'start_date': days_ago(0),
        'email': ['jasmine.qian@liveramp.com'],
        'retries': 0,
        'retry_delay': timedelta(minutes=2),
    }
    
    dag = DAG(
        'Pass_Metadata',
        default_args=default_args,
        tags=['meta', 'params'],
        schedule_interval=None,
        catchup=False,
    )
    
    
    def run_this_fun(**context):
        print("Hi")
    
    
    def check_state(**context):
        if random.random() > 0.7:
            raise Exception('Exception')
        print("I am OK")
    
    
    run_this_task1 = PythonOperator(
        task_id="run_this",
        python_callable=check_state,
        provide_context=True,
        dag=dag,
    )
    
    
    run_this_task2 = PythonOperator(
        task_id="run_this2",
        python_callable=run_this_fun,
        provide_context=True,
        dag=dag,
    )
    
    
    run_this_task1 >> run_this_task2
    
    
    # run_this_task2
    if __name__ == "__main__":
        dag.cli()

    2. how about task1 passing any metadata to task2?

    import random
    from datetime import datetime, timedelta
    from airflow.utils.dates import days_ago
    from airflow.operators.python import PythonOperator
    from airflow import DAG
    
    default_args = {
        'owner': 'Jasmine Qian',
        'start_date': days_ago(0),
        'email': ['jasmine.qian@liveramp.com'],
        'retries': 0,
        'retry_delay': timedelta(minutes=2),
    }
    
    dag = DAG(
        'Pass_Metadata_v2',
        default_args=default_args,
        tags=['meta', 'params'],
        schedule_interval=None,
        catchup=False,
    )
    
    
    def run_this_fun(**context):
        received_value = context['ti'].xcom_pull(key='random_value')
        print("Hi, I received the value {}".format(str(received_value)))
    
    
    def push_to_xcom(**context):
        random_value = random.random()
        context['ti'].xcom_push(key='random_value', value=random_value)
        print("I am OK")
    
    
    run_this_task1 = PythonOperator(
        task_id="run_this",
        python_callable=push_to_xcom,
        provide_context=True,
        dag=dag,
    )
    
    run_this_task2 = PythonOperator(
        task_id="run_this2",
        python_callable=run_this_fun,
        provide_context=True,
        dag=dag,
    )
    
    run_this_task1 >> run_this_task2
    
    # run_this_task2
    if __name__ == "__main__":
        dag.cli()
    ------------------------- A little Progress a day makes you a big success... ----------------------------
  • 相关阅读:
    Charles抓包工具
    JQuery 实现表单验证,所有验证通过方可提交
    卡巴斯基注册信息清除
    Nginx 404 Not Found 解决办法
    php mysql 多表查询之子查询语句
    搜狗拼音、QQ拼音输入法、2345拼音输入法、百度输入法 、手心输入法对比。(个人体会)
    Notepad++使用-如何导出/导入配置文件
    深蓝词库转换2.4版发布,支持最新的搜狗用户词库备份bin格式
    网站更换服务器,百度站长后台抓取诊断时间
    阿里云代金券领取
  • 原文地址:https://www.cnblogs.com/qianjinyan/p/15534110.html
Copyright © 2011-2022 走看看