zoukankan      html  css  js  c++  java
  • AirFlow简介

    1, 简介

    ​ Airflow是一个可编程,调度和监控的工作流平台,基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行。airflow提供了丰富的命令行工具用于系统管控,而其web管理界面同样也可以方便的管控调度任务,并且对任务运行状态进行实时监控,方便了系统的运维和管理。

    2,执行器(Executor)

    ​ Airflow本身是一个综合平台,它兼容多种组件,所以在使用的时候有多种方案可以选择。比如最关键的执行器就有四种选择:

    SequentialExecutor:单进程顺序执行任务,默认执行器,通常只用于测试

    LocalExecutor:多进程本地执行任务

    CeleryExecutor:分布式调度,生产常用

    DaskExecutor :动态任务调度,主要用于数据分析

    在当前项目使用CeleryExecutor作为执行器。

    celery是一个分布式调度框架,其本身无队列功能,需要使用第三方组件,比如redis或者rabbitmq,当前项目使用的是rabbitmq,系统整体结构如下所示:

    其中:

    turing为外部系统

    GDags服务帮助拼接成dag

    master节点webui管理dags、日志等信息

    scheduler负责调度,只支持单节点

    worker负责执行具体dag中的task, worker支持多节点

    在整个调度系统中,节点之间的传递介质是消息,而消息的本质内容是执行脚本的命令,也就是说,工作节点的dag文件必须和master节点的dag文件保持一致,不然任务的执行会出问题。

    3,任务处理器

    airflow内置了丰富的任务处理器,用于实现不同类型的任务:

    BashOperator : 执行bash命令

    PythonOperator : 调用python代码

    EmailOperator : 发送邮件

    HTTPOperator : 发送 HTTP 请求

    SqlOperator : 执行 SQL 命令

    除了这些基本的构建块之外,还有更多的特定处理器:DockerOperatorHiveOperatorS3FileTransferOperatorPrestoToMysqlOperatorSlackOperator ...

    在当前项目使用了HTTPOperator 作为执行器,用于调用JAVA服务,整体结构图如下:

    关于airflow的环境搭建可以参考另外一篇博客: https://www.cnblogs.com/cord/p/9226608.html

    4,基本使用

    4.1,常用命令
    $ airflow webserver -D		 守护进程运行webserver
    
    $ airflow scheduler -D		 守护进程运行调度器
    
    $ airflow worker -D		     守护进程运行调度器
    
    $ airflow worker -c 1 -D 	 守护进程运行celery worker并指定任务并发数为1
    
    $ airflow pause dag_id  	暂停任务
    
    $ airflow unpause dag_id 	 取消暂停,等同于在管理界面打开off按钮
    
    $ airflow list_tasks dag_id  查看task列表
    
    $ airflow clear dag_id       清空任务实例
    
    $ airflow trigger_dag dag_id -r RUN_ID -e EXEC_DATE  运行整个dag文件
    
    $ airflow run dag_id task_id execution_date       运行task
    
    4.2,web管控界面的使用

    启动web管控界面需要执行airflow webserver -D命令,默认访问端口是8080

    http://110.55.63.51:8080/admin/

    (1) 任务启动暂停开关

    (2) 任务运行状态

    (3) 待执行,未分发的任务

    (4) 手动触发执行任务

    (5) 任务管控界面

    选择对应dag栏目,点击(5) Graph View即可进入任务管控界面

    点击对应的任务,会弹出一个任务管控台,主要几个功能如下:

    View Log : 查看任务日志

    Run : 运行选中任务

    Clear:清空任务队列

    Mark Success : 标记任务为成功状态

    4.3 通过定义DAG文件实现创建定时任务
    1) 普通任务
    from datetime import timedelta, datetime
    import airflow
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from airflow.operators.dummy_operator import DummyOperator
    
    default_args = { #默认参数
        'owner': 'jifeng.si', #dag拥有者,用于权限管控
        'depends_on_past': False,  #是否依赖上游任务
        'start_date': datetime(2018, 5, 2), #任务开始时间,默认utc时间
        'email': ['123456789@qq.com'], #告警通知邮箱地址
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    }
    
    dag = DAG(
        'example_hello_world_dag',  #dag的id
        default_args=default_args,
        description='my first DAG', #描述
        schedule_interval='*/25 * * * *', # crontab
        start_date=datetime(2018, 5, 28) #开始时间,覆盖默认参数
    )
    
    def print_hello():
        return 'Hello world!'
    
    dummy_operator = DummyOperator(task_id='dummy_task', dag=dag)
    
    hello_operator = BashOperator(   #通过BashOperator定义执行bash命令的任务
        task_id='sleep_task',
        depends_on_past=False,
        bash_command='echo `date` >> /home/py/test.txt',
        dag=dag
    )
    
    dummy_operator >> hello_operator #设置任务依赖关系
    #dummy_operator.set_downstream(hello_operator)
    
    2) 定义http任务并使用本地时间
    import os
    from datetime import timedelta, datetime
    import pytz
    from airflow.operators.http_operator import SimpleHttpOperator
    from airflow.models import DAG
    
    default_args = {
        'owner': 'cord',
        # 'depends_on_past': False,
        'depends_on_past': True,
        'wait_for_downstream': True,
        'execution_timeout': timedelta(minutes=3),
        'email': ['123456789@qq.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    }
    
    #将本地时间转换为utc时间,再设置为start_date
    tz = pytz.timezone('Asia/Shanghai')
    dt = datetime(2018, 7, 26, 12, 20, tzinfo=tz)
    utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None)
    
    os.environ['AIRFLOW_CONN_HTTP_TEST']='http://localhost:9090'
    
    dag = DAG(
        'bm01',
        default_args=default_args,
        description='my DAG',
        schedule_interval='*/2 * * * *',
        start_date=utc_dt
    )
    
    #通过SimpleHttpOperator定义http任务
    task1 = SimpleHttpOperator(
        task_id='get_op1',
        http_conn_id='http_test',
        method='GET',
        endpoint='test1',
        data={},
        headers={},
        dag=dag)
    
    task2 = SimpleHttpOperator(
        task_id='get_op2',
        http_conn_id='http_test',
        method='GET',
        endpoint='test2',
        data={},
        headers={},
        dag=dag)
    
    task1 >> task2
    
    
    4.4 crontab语法

    crontab格式如下所示:

    # ┌───────────── minute (0 - 59)
    # │ ┌───────────── hour (0 - 23)
    # │ │ ┌───────────── day of month (1 - 31)
    # │ │ │ ┌───────────── month (1 - 12)
    # │ │ │ │ ┌───────────── day of week (0 - 6) (Sunday to Saturday;
    # │ │ │ │ │                                       7 is also Sunday on some systems)
    # │ │ │ │ │
    # │ │ │ │ │
    # * * * * *  command to execute
    
    是否必须 取值范围 可用特殊符号 备注
    Minutes Yes 0–59 * , -
    Hours Yes 0–23 * , -
    Day of month Yes 1–31 * , - ? L W ? L W部分实现可用
    Month Yes 1–12 or JAN–DEC * , -
    Day of week Yes 0–6 or SUN–SAT * , - ? L # ? L W 部分实现可用
    Year No 1970–2099 * , - 标准实现里无这一项

    特殊符号功能说明:

    逗号(,)
    ​ 逗号用于分隔一个列表里的元素,比如 "MON,WED,FRI" 在第五域(day of week)表示Mondays, Wednesdays and Fridays。

    连字符(-)
    ​ 连字符用于表示范围,比如2000–2010表示2000到2010之间的每年,包括这两年(闭区间)。

    百分号(%)
    ​ 用于命令(command)中的格式化

    L
    ​ 表示last,最后一个,比如第五域,5L表示当月最后一个星期五

    W
    W表示weekday(Monday-Friday),指离指定日期附近的工作日,比如第三域设置为15L ,这表示临近当月15附近的工作日,假如15号是星期六,那么定时器会在14号执行,如果15号是星期天,那么定时器会在16号执行,也就是说只会在离指定日期最近的那天执行。

    井号#
    #用于第五域(day of week),#后面跟着一个1~5之间的数字,这个用于表示第几个星期,比如5#3表示第三个星期五

    ?
    ​ 在有些实现里面,*的功能相同,还有一些实现里面?表示cron的启动时间,比如 当cron服务在8:25am启动,则? ? * * * *会更新为25 8 * * * *, 直到下一次cron服务重新启动,定时器会再次更新。

    /
    /一般与*组合使用,后面跟着一个数字,表示频率,比如在第一域(Minutes)中*/5表示每5分钟,是普通列表表示5,10,15,20,25,30,35,40,45,50,55,00的缩写

    参考链接:
    https://segmentfault.com/a/1190000012803744?utm_source=tuicool&utm_medium=referral

    https://en.wikipedia.org/wiki/Cron

  • 相关阅读:
    JavaScript实现常用的排序算法
    jQuery学习之路(8)- 表单验证插件-Validation
    jQuery学习之路(7)- 用原生JavaScript实现jQuery的某些简单功能
    jQuery学习之路(6)- 简单的表格应用
    jQuery学习之路(5)- 简单的表单应用
    jQuery学习之路(4)- 动画
    JavaScript常见的五种数组去重的方式
    jQuery学习之路(3)- 事件
    jQuery学习之路(2)-DOM操作
    Docker使用非root用户
  • 原文地址:https://www.cnblogs.com/cord/p/9450910.html
Copyright © 2011-2022 走看看