zoukankan      html  css  js  c++  java
  • airfkow的queue(队列)的使用

    Airflow本身是一个综合平台,它兼容多种组件,所以在使用的时候有多种方案可以选择。比如最关键的执行器就有四种选择:

    SequentialExecutor:单进程顺序执行任务,默认执行器,通常只用于测试

    LocalExecutor:多进程本地执行任务

    CeleryExecutor:分布式调度,生产常用

    DaskExecutor :动态任务调度,主要用于数据分析

    在当前项目使用CeleryExecutor作为执行器。

    celery是一个分布式调度框架,其本身无队列功能,需要使用第三方组件,比如redis或者rabbitmq,当前项目使用的是rabbitmq,系统整体结构如下所示:

    其中:

    turing为外部系统

    GDags服务帮助拼接成dag

    master节点webui管理dags、日志等信息

    scheduler负责调度,只支持单节点

    worker负责执行具体dag中的task, worker支持多节点

    在整个调度系统中,节点之间的传递介质是消息,而消息的本质内容是执行脚本的命令,也就是说,工作节点的dag文件必须和master节点的dag文件保持一致,不然任务的执行会出问题。

    队列的使用前提就是选择CeleryExecutor执行器

    以下如内容来自airflow官网

    使用CeleryExecutor时,可以指定将任务发送到的Celery队列。队列是BaseOperator的属性,因此可以将任何任务分配给任何队列。在airflow.cfg的celery-> default_queue中定义了环境的默认队列。这定义了未指定任务时分配给其的队列,以及Airflow的worker在启动时侦听的队列。

    worker可以听一个或多个任务队列。启动工作程序时(使用命令airflow worker),可以指定一组用逗号分隔的队列名称(例如airflow worker -q spark)。然后,此worker将仅拾取连接到指定队列的任务。

    如果从资源的角度(例如非常轻巧的任务,其中一个worker可以毫无问题地执行数千个任务)或从环境的角度(您需要一个在Spark集群中运行的worker)来需要专门的worker,这可能很有用。本身,因为它需要非常具体的环境和安全权限)。http://airflow.apache.org/docs/stable/concepts.html#queues

    airflow中的队列严格来说不叫Queues,叫"lebal"更为合适。在operator中,可以设置queue参数如queue=spark,然后在启动worker时:airflow worker -q spark,那么该worker只会执行spark任务。相当于节点标签。

    实践测试

    测试地址 http://118.178.129.199:8888/admin/

    测试DAG:cc_ods.hyf_test

    根据官网内容对code进行修改

    # -*- coding: utf8 -*-
    import pprint
    from datetime import timedelta, datetime
    from os import sys, path
    
    import airflow
    from airflow import DAG
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.operators.python_operator import PythonOperator
    
    reload(sys)
    sys.setdefaultencoding('utf-8')
    sys.path.append(path.dirname(path.abspath(__file__)))
    sys.path.append(path.dirname(path.dirname(path.abspath(__file__))))
    sys.path.append(path.dirname(path.dirname(path.dirname(path.abspath(__file__)))))
    
    from utils import basic_util, send_email, sms_service, env_conf, alarm_util
    import hallo
    
    pp = pprint.PrettyPrinter(indent=4)
    
    str_today = (datetime.today()).strftime('%Y-%m-%d')
    
    ## Define the DAG object
    default_args = {
        'owner': 'yifan.hu',
        'depends_on_past': True,
        'start_date': airflow.utils.dates.days_ago(2),
        'retries': 4,
        'retry_delay': timedelta(minutes=1),
        'priority_weight': 10,
        'execution_timeout': timedelta(minutes=360),
        'queue': 'test_spark'
    }
    
    dag_id = basic_util.get_dag_id(path.abspath(__file__))
    
    
    dag = DAG(dag_id, default_args=default_args, schedule_interval='0 0 * * *', concurrency=2)
    
    
    # 元数据系统生成创建表的sql文件
    t_meta_create_table_statements = PythonOperator(
        task_id='meta_create_table_statements',
        python_callable=hallo.test_task1,
        default_args=default_args,
        dag=dag
    )

    在py文件中的default_args添加了'queue''test_spark',给这个任务指定了queue

    然后重新调起这个任务

     

    在修改work上的队列标志之前,这个任务一直显示queued,因为没有任何一个work的队列标签是test_spark,所以,没有work可以去执行它,

    然后在启动worker的脚本中添加了 -q test_spark

    修改完成后重启这个节点上的worker(这里我修改的是节点test31)

    重启worker以后,任务立刻执行完毕,可以看到执行的节点就是test31(多次重复实验均是在test31上进行)

    在官网中又一段话是

    worker可以听一个或多个任务队列。启动工作程序时(使用命令airflow worker),可以指定一组用逗号分隔的队列名称(例如airflow worker -q spark)。然后,此worker将仅拾取连接到指定队列的任务。

    也就是说我们在启动这个节点上的worker的时候,加一个参数 -q test_spark,但是在airflow.cfg中不做修改,被标志test_spark的任务将会在这个节点上执行,并且在元数据中,queue列也会显示test_spark,而其他没有定义队列的task,将会按照airflow.cfg中default_queue参数配置进行,也就是说并不是这个worker打了test_spark的标签就只能执行带有test_spark队列标签的任务,只是这样的任务一定会在这个worker执行,其他task会按照airflow.cfg中定义的进行。

    下图是元数据所示

    airflow.cfg中default_queue参数配置


  • 相关阅读:
    结构化系统开发和面向对象开发方法
    十五周总结
    第十四周总结
    第十三周总结
    分答
    第十周总结
    DFD
    判定表
    第八周总结
    开发方法对比
  • 原文地址:https://www.cnblogs.com/xuziyu/p/12566444.html
Copyright © 2011-2022 走看看