zoukankan      html  css  js  c++  java
  • airflow使用本地时区

    ​ 在airflow中使用的时间是utc时间,而更多时候我们希望的是使用本地时间,于是在定义airflow定时任务的时候,涉及到了时间的转换。

    1.python中本地时间和utc时间的转换

    查看国内可用时区:

    >>> import pytz
    >>> pytz.country_timezones('cn')
    ['Asia/Shanghai', 'Asia/Urumqi']
    

    方式一:

    修改配置文件airflow.conf使用操作系统时间

    [core]
    default_timezone=system
    

    方式二:

    tz = pytz.timezone('Asia/Shanghai')
    naive = datetime.strptime("2018-06-13 17:40:00", "%Y-%m-%d %H:%M:%S")
    local_dt = tz.localize(naive, is_dst=None)
    utc_dt = local_dt.astimezone(pytz.utc)
    

    参考链接: https://stackoverflow.com/questions/79797/how-do-i-convert-local-time-to-utc-in-python

    方式三:

    tz = pytz.timezone('Asia/Shanghai')
    dt = datetime(2018, 6, 13, 17, 40, tzinfo=tz)
    utc_dt = dt.astimezone(pytz.utc)
    

    2.Airflow中通过时间转换使用本地时间

    这里涉及到一个问题,如果只是将本地时间转换成了utc时间,那么在运行过程中airflow会抛出以下错误:

    Can't subtract offset-naive and offset-aware datetimes
    

    解决办法是当将时间转换为utc时间之后将其时区属性设为None:

     dt.replace(tzinfo=None)
    

    参考链接: https://stackoverflow.com/questions/796008/cant-subtract-offset-naive-and-offset-aware-datetimes

    完整示例DAG如下:

    import time
    import airflow
    import pytz
    from airflow.operators.python_operator import PythonOperator
    from airflow.models import DAG
    from datetime import timedelta, datetime
    
    default_args = {
        'owner': 'cord',
        'depends_on_past': True,
        'email': ['123456@qq.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    }
    
    tz = pytz.timezone('Asia/Shanghai')
    # naive = datetime.strptime("2018-06-13 17:40:00", "%Y-%m-%d %H:%M:%S")
    # local_dt = tz.localize(naive, is_dst=None)
    # utc_dt = local_dt.astimezone(pytz.utc).replace(tzinfo=None)
    
    dt = datetime(2018, 6, 13, 17, 40, tzinfo=tz)
    utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None)
    
    dag = DAG(
        'demo',
        default_args=default_args,
        description='my DAG',
        schedule_interval='* */3 * * *',
        start_date=utc_dt
    )
    
    def test_func(str):
        print(str)
    
    task = PythonOperator(
        task_id='hello',
        python_callable=test_func,
        op_kwargs={'str': 'hello world'},
        dag=dag
    )
    
  • 相关阅读:
    pycharm路径
    git常用命令
    分页
    router
    视图集
    Leanring TypeScript 中文版
    RXJS 系列 04
    RXJS 系列 03
    RXJS 系列 02
    RXJS 系列 01
  • 原文地址:https://www.cnblogs.com/cord/p/9226517.html
Copyright © 2011-2022 走看看