zoukankan      html  css  js  c++  java
  • flink spark storm的反压机制(backpressure)

    简单介绍下flink、spark和storm的backpressure机制。

    1、storm 反压

    实现原理

    Storm 是通过监控 Bolt 中的接收队列负载情况来实现反压:

    • 如果一个executor发现recv queue负载超过高水位值(high watermark)就会通知反压线程(backpressure thread)。
    • 反压线程将反压信息写到 Zookeeper。
    • Zookeeper 上的 watch 会通知该拓扑(topo)的所有 Worker,该拓扑出现反压。
    • Spout 减缓发送 tuple的速率。

    问题:停止发送,等待系统回复,再次高速生产,然后再次停止发送,造成往数据流颠簸

    2、spark 反压

    spark streaming 是以微批次模拟流式处理,设置batch interval时间:

    val ssc = new StreamingContext(sparkConf, Seconds(3))

    如果batch process time大于batch interval time=3s,程序的处理能力不足,积累的数据越来越多,最终会造成Executor的OOM。

    解决办法

    静态限速 (spark 1.5 以前)

    sparkConf.set("spark.streaming.kafka.maxRatePerPartition","10")

    动态反压(spark 1.5 以后)

    sparkConf.set("spark.streaming.backpressure.enabled","true")

    实现原理

    通过在Driver端进行速率估算,并将速率更新到Executor端的各个Receiver,从而实现反压。

    速率控制:

    整个背压机制的核心,就是Drvier端的RateContoller,它作为控制核心,继承自StreamingListener,监听Batch的完成情况,记录下它们的关键延迟,然后传递给computeAndPublish方法,遍历Executor并进行速率估算和更新。

    速率估算:

    PIDRateEstimator是目前RateEstimator的唯一官方实现。PID(Proportional Integral Derivative,比例积分差分控制算法)是工控领域中,经过多次的验证是一种非常有效的工业控制器算法。Spark Streaming将它引入,作为根据最新的Rate,以及比例(Proportional) 积分(Integral)微分(Derivative)这3个变量,来确定最新的Rate。

    速率更新:

    计算完新Rate,就该把它发布出去了。

    RateController通过ReceiverTracker,利用RPC消息,发布Rate到Receiver所在的Executor节点上,该节点上的ReceiverSupervisorImpl会接收消息,并把速率更新到BlockGenerator上,从而以控制每个批次的数据生成。

    3、flink 反压

    每个子任务都有自己的本地缓存池,收到的数据以及发出的数据,都会序列化之后,放入到缓冲池里。然后,两个TaskManager之间,只会建立一条物理链路(底层使用Netty通讯),所有子任务之间的通讯,都由这条链路承担。

    当任何一个子任务的发送缓存(不管是子任务自己的本地缓存,还是底层传输时Netty的发送缓存)耗尽时,发送方就会被阻塞,产生背压;同样,任何任务接收数据时,如果本地缓存用完了,都会停止从底层Netty那里读取数据,这样很快上游的数据很快就会占满下游的底层接收缓存,从而背压到发送端,形成对上游所有的任务的背压。

    很显然,这种思路有个明显的问题,任务一个下游子任务的产生背压,都会影响整条TaskManager之间的链路,导致全链路所有子任务背压。比如上图的B.3子任务,此时还有处理能力,但也无法收到数据。

    为了解决上节的单任务背压影响全链路的问题,在Flink 1.5之后,引入了Credit-based Flow Control,基于信用点的流量控制。

    这种方法,首先把每个子任务的本地缓存分为两个部分,独占缓存(Exclusive Buffers)和浮动缓存(Floating Buffers);

    然后,独占缓存的大小作为信用点发给数据发送方,发送方会按照不同的子任务分别记录信用点,并发送尽可能多数据给接收方,发送后则降低对应信用点的大小;

    当信用点为0时,则不再发送,起到背压的作用。在发送数据的同时,发送方还会把队列中暂存排队的数据量发给接收方,接收方收到后,根据本地缓存的大小,决定是否去浮动缓存里请求更多的缓存来加速队列的处理,起到动态控制流量的作用。整个过程参考上图。

    通过这样的设计,就实现了任务级别的背压:任意一个任务产生背压,只会影响这个任务,并不会对TaskManger上的其它任务造成影响。

     
  • 相关阅读:
    终端创建scrapy项目时报错(转)
    redis的一些命令
    pom.xml中build标签
    spring与mybatis四种整合方法
    linux lsof/netstat查看进程和端口号相关命令:
    ps -ef |grep 输出的具体含义
    java web项目在linux部署、启动,查看系统配置常用的linux命令总结
    linux mysql操作命令大全
    mysql中between...and..的使用,及时间范围的查询
    mysql中if()函数使用
  • 原文地址:https://www.cnblogs.com/luxiaoxun/p/12987438.html
Copyright © 2011-2022 走看看