zoukankan      html  css  js  c++  java
  • airflow Operators

    airflow Operators

    20190927


    一、 Dag 编写步骤

    1. import DAG类和若干operater类以及必要的Python模块
    2. 设定默认参数,创建DAG对象
    3. 提供必要的参数(比如task_id和dag),创建Task(即Operator对象)
    4. 设定Task的上下游依赖关系

    1. import DAG类

    import airflow
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from datetime import timedelta
    

    2. 设置一些默认参数

    • 所有的 Operator 都是从BaseOperator 派生而来,并通过继承获得更多功能
    • 参考【airflow operators-CSDN
    • default_args设置的是DAG的通用参数,这些通用参数会直接传递给DAG下属的所有Task,这些参数也可以在创建Task时传入
    default_args = {
    	# 常用
        'owner': 'airflow', # 这个DAG的所有者,会在Web UI上显示,主要用于方便管理
        'depends_on_past': False,  # 是否依赖于过去。如果为True,那么必须要昨天的DAG执行成功了,今天的DAG才能执行
        'start_date': datetime(2015, 6, 1), 
        # DAG的开始时间,比如这里就是从2015年6月1日开始执行第一个DAG。这个参数会影响到部署上线时回填DAG的数
        #量。一般建议写成上线时间的前一天(因为这里的start_date指的是execute_date,而Airflow执行的逻辑是,
        #今天的同一时间执行昨天的任务,比如execute_date=2018-03-01,每天凌晨3点执行,则会在2018-03-02 
        #03:00:00启动这个DAG。特别地,这个参数必须一个datetime对象,不可以用字符串
        'email': ['airflow@example.com'],# 出问题时,发送报警Email的地址,可以填多个,用逗号隔开
        'email_on_failure': False, # 任务失败且重试次数用完时是否发送Email,推荐填True
        'email_on_retry': False, # 任务重试时是否发送Email
        'retries': 1, # 任务失败后的重试次数
        'retry_delay': timedelta(minutes=5), # 重试间隔,必须是timedelta对象
        # 不常用
        'queue': 'bash_queue', # 队列,默认是default,决定实际执行任务会发送到哪个worker
        'pool': 'backfill',  # pool是一个分类限制并发量的设计,目前来说可以忽略,默认所有的Task都在一个pool里。
        'priority_weight': 10, # 优先级权重,在任务需要排队时而你需要优先执行某些任务时会有用
        'end_date': datetime(2016, 1, 1), # 结束时间,一般线上任务都会一直跑下去,所以没必要设置
    }
    

    3. 创建DAG对象

    dag = DAG('tutorial', default_args=default_args,schedule_interval="0 3 * * *") 
    # 第一个参数固定为dag的名字(即这个.py脚本的名称)
    # schedule_interval为执行时间间隔,同crontab的语法
    # 在这个例子中表示每天凌晨3点执行
    

    4. 创建Task

    # 这是一个SSHOperator的task示例
    task_1 = SSHOperator(
        ssh_conn_id='ssh_24', # 指定conn_id
        task_id='task_shopping',
        command='/bin/bash path/my.sh $(date -d "yesterday" +%Y%m%d) ', # 远程机器上的脚本文件
        dag=dag
    )
    

    5. 设定Task依赖关系

    • 设定依赖有两种方式,一种是使用对象的方法
    • set_upstreamset_downstream来设置上下游依赖
    • 另一种是使用运算符,比如下面的 task_1 << task_2,表示task_1task_2的下游对象
     task_1 << task_2
    

    二、完整的例子

    import airflow
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from airflow.contrib.operators.ssh_operator import SSHOperator
    from datetime import datetime, timedelta
    import pendulum
    
    local_tz = pendulum.timezone("Asia/Shanghai")
    dt = local_tz.convert(datetime(2019, 9, 27))
    
    default_args = {
        'owner': 'lion',
        'depend_on_past': False,
        'email': ['my@email','other@email'],
        'email_on_failure': False,
        'email_on_retry': False,
        'start_date':dt,
        'retries': 0,
        'retry_delay': timedelta(minutes=1)
    }
    
    dag = DAG(dag_id="ssh_myssh", default_args=default_args, schedule_interval='0 6 * * *')
    
    task_1 = SSHOperator(
        ssh_conn_id='ssh_24',,
        task_id='task_shopping',
         command='/bin/bash path/my_1.sh $(date -d "yesterday" +%Y%m%d) ',
        dag=dag
    )
    
    task_2 = SSHOperator(
        ssh_conn_id='ssh_24',
        task_id='task_dimming',
         command='/bin/bash path/my_2.sh $(date -d "yesterday" +%Y%m%d) ',
        dag=dag
    )
    
    task_1 >> task_2
    

    参考

    airflow operators-CSDN
    airflow DAG

  • 相关阅读:
    Python——String类型操作符
    NLTK——NLP流程
    NLTK——常用函数
    Java——IO流 对象的序列化和反序列化流ObjectOutputStream和ObjectInputStream
    java——什么是浅表副本
    JavaWeb——<c:forEach varStatus="status">
    kubernetes安装
    [转]Jmeter + Grafana + InfluxDB 性能测试监控
    html转markdown网站
    golang的包管理---vendor/dep等
  • 原文地址:https://www.cnblogs.com/damahuhu/p/11675484.html
Copyright © 2011-2022 走看看