zoukankan      html  css  js  c++  java
  • airflow xcom 数据传递

     t1 写入参数, t2 获取参数

    #coding=utf-8
    from datetime import datetime, timedelta
    from airflow import DAG
    from airflow.operators.python_operator import PythonOperator
    import airflow.utils
    
    
    # 定义默认参数
    default_args = {
        'owner': 'airflow',  # 拥有者名称
            'start_date': airflow.utils.dates.days_ago(1),#  第一次开始执行的时间,为格林威治时间,为了方便测试,一般设置为当前时间减去执行周期
        'email': ['lshan523@163.com'],  # 接收通知的email列表
        'email_on_failure': True,  # 是否在任务执行失败时接收邮件
        'email_on_retry': True,  # 是否在任务重试时接收邮件
        'retries': 3,  # 失败重试次数
        'retry_delay': timedelta(seconds=5),  # 失败重试间隔
        'provide_context': True,
    }
    
    # 定义DAG
    dag = DAG(
        dag_id='hello_world_args',  # dag_id
        default_args=default_args,  # 指定默认参数
        schedule_interval="@once",
        # schedule_interval="00, *, *, *, *"  # 执行周期,依次是分,时,天,月,年,此处表示每个整点执行
    #     schedule_interval=timedelta(minutes=1)  # 执行周期,表示每分钟执行一次
    )
    
    
    # 定义要执行的Python函数1
    def hello_world_args_1(**context):
        current_time = str(datetime.today())
        with open('/tmp/hello_world_args_1.txt', 'a') as f:
            f.write('%s
    ' % current_time)
        assert 1 == 1  # 可以在函数中使用assert断言来判断执行是否正常,也可以直接抛出异常
        context['task_instance'].xcom_push(key='sea1', value="seaseseseaaa")
        return "t2"
    
    # 定义要执行的Python函数2
    def hello_world_args_2(**context):
        sea = context['task_instance'].xcom_pull(key="sea1",task_ids='hello_world_args_1')#参数id,task id
        current_time = str(datetime.today())
        with open('/tmp/hello_world_args_2.txt', 'a') as f:
            f.write('%s
    ' % sea)
            f.write('%s
    ' % current_time)
    
    # 定义要执行的task 1
    t1 = PythonOperator(
        task_id='hello_world_args_1',  # task_id
        python_callable=hello_world_args_1,  # 指定要执行的函数
        dag=dag,  # 指定归属的dag
        retries=2,  # 重写失败重试次数,如果不写,则默认使用dag类中指定的default_args中的设置
        provide_context=True,
    )
    
    # 定义要执行的task 2
    t2 = PythonOperator(
        task_id='hello_world_args_2',  # task_id
        python_callable=hello_world_args_2,  # 指定要执行的函数
        dag=dag,  # 指定归属的dag
        provide_context=True,
    )
    
    
    t1>>t2
  • 相关阅读:
    CAS单点登录(一)——初识SSO
    Sql Server 增加字段、修改字段、修改类型、修改默认值
    SQL 聚合函数-非聚合函数
    漫画:什么是中台?
    windows下nginx的安装及使用
    sql优化点
    如何处理sql中的关键字(例如',%)
    Mysql 如何创建一张临时表
    MySQL中information_schema是什么
    mysql查看表结构命令,如下:
  • 原文地址:https://www.cnblogs.com/lshan/p/11721148.html
Copyright © 2011-2022 走看看