zoukankan      html  css  js  c++  java
  • airflow原理

    官网:

    http://airflow.apache.org/installation.html

    原理:

    https://www.cnblogs.com/cord/p/9450910.html

    原理介绍:

    DAG:有向无环图,有方向但没有循环

    airflow 的守护进程
    airflow 系统在运行时有许多守护进程,它们提供了 airflow 的全部功能。守护进程包括 Web服务器-webserver、调度程序-scheduler、执行单元-worker、消息队列监控工具-Flower等。下面是 apache-airflow 集群、高可用部署的主要守护进程。

    webserver
    webserver 是一个守护进程,它接受 HTTP 请求,允许你通过 Python Flask Web 应用程序与 airflow 进行交互,webserver 提供以下功能: 中止、恢复、触发任务。 监控正在运行的任务,断点续跑任务。 执行 ad-hoc 命令或 SQL 语句来查询任务的状态,日志等详细信息。 配置连接,包括不限于数据库、ssh 的连接等。

    webserver 守护进程使用 gunicorn 服务器(相当于 java 中的 tomcat )处理并发请求,可通过修改{AIRFLOW_HOME}/airflow.cfg文件中 workers 的值来控制处理并发请求的进程数。 例如:

    workers = 4 #表示开启4个gunicorn worker(进程)处理web请求
    启动 webserver 守护进程:

    $ airfow webserver -D
    scheduler
    scheduler 是一个守护进程,它周期性地轮询任务的调度计划,以确定是否触发任务执行。 启动的 scheduler 守护进程:

    $ airfow scheduler -D
    worker
    worker 是一个守护进程,它启动 1 个或多个 Celery 的任务队列,负责执行具体 的 DAG 任务。

    当设置 airflow 的 executors 设置为 CeleryExecutor 时才需要开启 worker 守护进程。推荐你在生产环境使用 CeleryExecutor :

    executor = CeleryExecutor
    启动一个 worker守护进程,默认的队列名为 default:

    $ airfow worker -D
    flower
    flower 是一个守护进程,用于是监控 celery 消息队列。启动守护进程命令如下:

    $ airflow flower -D
    默认的端口为 5555,您可以在浏览器地址栏中输入 "http://hostip:5555" 来访问 flower ,对 celery 消息队列进行监控。

    这个监控队列里有多少任务被schudle吊启, worker去消费队列里的任务

    并发控制参数解释:

    parallelism:这个参数指定了整个Airflow系统,在任何一刻能同时运行的Task Instance的数量,这个数量跟DAG无关,只跟Executor和Task有关。举个例子:如果parallelism=15, 这时你有两个DAG,A和B,如果A需要同时开跑10个Task,B也要同时开跑10个Task,两个DAG同时触发,那么这时候同时在跑的Task数量只能是15,其余的5个会等之前的Task运行完了触发,这时的状态不会显示在web上。而且在这种情况下,触发的顺序是不确定的。

    concurrency :这个用来控制 每个dag运行过程中最大可同时运行的task实例数。如果你没有设置这个值的话,scheduler 会从airflow.cfg里面读取默认值 dag_concurrency

    max_active_runs : 这个是用来控制在同一时间可以运行的最多的dag runs 数量。这里需要解释一下dag runs ,比如你的dag设置的每天运行,那么在天的时间段内运行某个dag就算是一个dag runs 。按道理每天只会执行一次,但是保不齐,你前天和大前天的dag都没运行,那么就需要补跑,或者你在某一次定时dag触发了之后,又手动触发了,那么就存在,同一个时间点有多个dag runs 。这个参数就是控制这个最大的dag runs

    sql_alchemy_pool_size:  默认使用连接数据库的最大连接,设置为0表示没有限制

    worker_concurrency = 80  #所有works的总task数
    parallelism = 160               #所有DAG的总task数

    #同时运行的schedule的线程: 

    当定义的dag文件过多的时候,airflow的scheduler节点运行效率缓慢

    [scheduler]
    # The scheduler can run multiple threads in parallel to schedule dags.
    # This defines how many threads will run.
    #默认是2这里改为100
    max_threads = 20

    查看:

     ps -ef |grep schedu

  • 相关阅读:
    LeetCode Best Time to Buy and Sell Stock
    LeetCode Scramble String
    LeetCode Search in Rotated Sorted Array II
    LeetCode Gas Station
    LeetCode Insertion Sort List
    LeetCode Maximal Rectangle
    Oracle procedure
    浏览器下载代码
    Shell check IP
    KVM- 存储池配置
  • 原文地址:https://www.cnblogs.com/hongfeng2019/p/11847400.html
Copyright © 2011-2022 走看看