zoukankan      html  css  js  c++  java
  • airflow原理

    官网:

    http://airflow.apache.org/installation.html

    原理:

    https://www.cnblogs.com/cord/p/9450910.html

    原理介绍:

    DAG:有向无环图,有方向但没有循环

    airflow 的守护进程
    airflow 系统在运行时有许多守护进程,它们提供了 airflow 的全部功能。守护进程包括 Web服务器-webserver、调度程序-scheduler、执行单元-worker、消息队列监控工具-Flower等。下面是 apache-airflow 集群、高可用部署的主要守护进程。

    webserver
    webserver 是一个守护进程,它接受 HTTP 请求,允许你通过 Python Flask Web 应用程序与 airflow 进行交互,webserver 提供以下功能: 中止、恢复、触发任务。 监控正在运行的任务,断点续跑任务。 执行 ad-hoc 命令或 SQL 语句来查询任务的状态,日志等详细信息。 配置连接,包括不限于数据库、ssh 的连接等。

    webserver 守护进程使用 gunicorn 服务器(相当于 java 中的 tomcat )处理并发请求,可通过修改{AIRFLOW_HOME}/airflow.cfg文件中 workers 的值来控制处理并发请求的进程数。 例如:

    workers = 4 #表示开启4个gunicorn worker(进程)处理web请求
    启动 webserver 守护进程:

    $ airfow webserver -D
    scheduler
    scheduler 是一个守护进程,它周期性地轮询任务的调度计划,以确定是否触发任务执行。 启动的 scheduler 守护进程:

    $ airfow scheduler -D
    worker
    worker 是一个守护进程,它启动 1 个或多个 Celery 的任务队列,负责执行具体 的 DAG 任务。

    当设置 airflow 的 executors 设置为 CeleryExecutor 时才需要开启 worker 守护进程。推荐你在生产环境使用 CeleryExecutor :

    executor = CeleryExecutor
    启动一个 worker守护进程,默认的队列名为 default:

    $ airfow worker -D
    flower
    flower 是一个守护进程,用于是监控 celery 消息队列。启动守护进程命令如下:

    $ airflow flower -D
    默认的端口为 5555,您可以在浏览器地址栏中输入 "http://hostip:5555" 来访问 flower ,对 celery 消息队列进行监控。

    这个监控队列里有多少任务被schudle吊启, worker去消费队列里的任务

    并发控制参数解释:

    parallelism:这个参数指定了整个Airflow系统,在任何一刻能同时运行的Task Instance的数量,这个数量跟DAG无关,只跟Executor和Task有关。举个例子:如果parallelism=15, 这时你有两个DAG,A和B,如果A需要同时开跑10个Task,B也要同时开跑10个Task,两个DAG同时触发,那么这时候同时在跑的Task数量只能是15,其余的5个会等之前的Task运行完了触发,这时的状态不会显示在web上。而且在这种情况下,触发的顺序是不确定的。

    concurrency :这个用来控制 每个dag运行过程中最大可同时运行的task实例数。如果你没有设置这个值的话,scheduler 会从airflow.cfg里面读取默认值 dag_concurrency

    max_active_runs : 这个是用来控制在同一时间可以运行的最多的dag runs 数量。这里需要解释一下dag runs ,比如你的dag设置的每天运行,那么在天的时间段内运行某个dag就算是一个dag runs 。按道理每天只会执行一次,但是保不齐,你前天和大前天的dag都没运行,那么就需要补跑,或者你在某一次定时dag触发了之后,又手动触发了,那么就存在,同一个时间点有多个dag runs 。这个参数就是控制这个最大的dag runs

    sql_alchemy_pool_size:  默认使用连接数据库的最大连接,设置为0表示没有限制

    worker_concurrency = 80  #所有works的总task数
    parallelism = 160               #所有DAG的总task数

    #同时运行的schedule的线程: 

    当定义的dag文件过多的时候,airflow的scheduler节点运行效率缓慢

    [scheduler]
    # The scheduler can run multiple threads in parallel to schedule dags.
    # This defines how many threads will run.
    #默认是2这里改为100
    max_threads = 20

    查看:

     ps -ef |grep schedu

  • 相关阅读:
    c# 自定义事件和委托
    C#委托之个人理解(转)
    invokeRequired属性和 invoke()方法
    .NET(C#)连接各类数据库
    Mobile Web Development with ASP.NET 2.0
    移动飞信WEB发送服务接口
    4行C#代码打造专业数据库连接配置界面
    domino 中 UniversalID 和NoteID的区别
    VB中preserve的用法
    LOTUS Note ID 剖析
  • 原文地址:https://www.cnblogs.com/hongfeng2019/p/11847400.html
Copyright © 2011-2022 走看看