zoukankan      html  css  js  c++  java
  • airflow的定时任务

    代码展示 : 

    import os
    from datetime import datetime
    
    import pytz
    from airflow import DAG
    from airflow.models import Variable
    from airflow.operators.http_operator import SimpleHttpOperator
    from airflow.operators.python_operator import PythonOperator
    from airflow.operators.bash_operator import BashOperator
    from airflow.hooks.postgres_hook import PostgresHook
    
    # 设置第一次触发任务时间 及 设置任务执行的时区
    local_tz = pendulum.timezone("Asia/Shanghai")
    start_date = datetime.datetime(2016, 1, 1, tzinfo=local_tz)
    
    # dag默认参数
    args = {
        "owner": "Rgc",  # 任务拥有人
        "depends_on_past": False,  # 是否依赖过去执行此任务的结果,如果为True,则过去任务必须成功,才能执行此次任务
        "start_date": start_date,  # 任务开始执行时间
    }
    
    # 定义一个DAG
    # 参数catchup指 是否填充执行 start_date到现在 未执行的缺少任务;如:start_date定义为2019-10-10,现在是2019-10-29,任务是每天定时执行一次,
    # 如果此参数设置为True,则 会生成 10号到29号之间的19点执行此任务;如果设置为False,则不会补充执行任务;
    # schedule_interval:定时执行方式,推荐使用如下字符串方式, 方便写出定时规则的网址:https://crontab.guru/
    # 也可以使用  :  schedule_interval= datetime.timedelta(minutes=5)  5分钟执行一次
    dag = DAG(
        dag_id="HttpSendDag",
        catchup=False,
        default_args=args,
        schedule_interval="0 19 * * *"
    )
    
    
    # 触发任务的时候的某些参数 , ds=today
    def func(ds, next_ds, run_id, **kwargs):
        pass
    
    
    # 调动Python相关函数
    t1 = PythonOperator(
        task_id='deviation_new',
        python_callable=func,
        provide_context=True,
        dag=dag
    )

    注意:当您不想安排DAG时,请使用schedule_interval=None而不是schedule_interval='None'

    # 每5分钟执行一次 
    schedule_interval="*/5 * * * *"
    
    # 每小时执行一次 
    schedule_interval="0 1 * * *"
    
    # 每天8-20点之间,每小时执行一次
    schedule_interval="0 8-20 * * *"
  • 相关阅读:
    LabelImg 图像图像标注工具
    周杰伦的2000w个故事
    ROS 订阅图像节点(1)
    ROS 订阅图像节点
    ROS 双目标定
    书籍
    Z30云台PC控制问题
    大疆M600组装和试飞
    M100 X3云台安装
    M100 组装教程
  • 原文地址:https://www.cnblogs.com/zmc940317/p/12598261.html
Copyright © 2011-2022 走看看