zoukankan      html  css  js  c++  java
  • Flink中发送端反压以及Credit机制(源码分析)

    上一篇《Flink接收端反压机制》说到因为Flink每个Task的接收端和发送端是共享一个bufferPool的,形成了天然的反压机制,当Task接收数据的时候,接收端会根据积压的数据量以及可用的buffer数量(可用的memorySegment数)来决定是否向上游发送Credit(简而言之就是当我还有空间的时候,我向上游也就是上一个Task的发送端发送一个ack消息,表明我还有空间你可以发送数据过来,如果下游没有给你Credit就证明下游已经堵了,没有空间了也就不能继续往下游发送了)

    现在从源码来看一下Task的数据发送端,也就是Netty的Server端的实现

    先看Task初始化的时候TaskManagerRunner.java中startTaskManager()方法中

     这个connectionManager其实分为两种,Netty,local一看就知道netty这种肯定是对应需要通过网络传输,本地模式这里就不讲了

     

    这个地方看到Flink的client和server都初始化了,需要注意的是其实这个地方client端只是初始化了一些配置,并没有调用bind()方法启动起来,这里看过上一篇文章的同学就会知道,client只有当第一次需要拉取上游subpatition数据的时候才会启动起来也就是bind(),

    而server端在这里也就是task启动的时候就启动起来了,继续看server端如何启动的server.init()方法

     init方法中,这里可以看到,这是Flink1.6以前只有基于netty的tcp网络层反压,这里是通过bootstrap的两个参数

        ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK  大小为两倍的memorySegment大小

        ChannelOption.WRITE_BUFFER_LOW_WATER_MARK   大小为memorySegment + 1

    接着

      

           

    1处2处常规的Netty定长编解码器,没有什么好说的

    看看3处,4这里先不讲后面会提到

     看到3是一个inboundHandler,反压机制时他的用处是用来接收来自下游响应的Credit,来看他的ChannelRead0方法

     当接收到的消息是一个Credit信任的时候

    先是

    增加了这个reader的可用的Credit可用数

    然后

     其实了解了接收端的反压,发送端接收到了下游的credit,那发送数据的时候肯定有一个地方会先判断是否有可用的Credit才决定是否往下发数据

    其实就是这个带星号的地方判断,然后下面就是常规的从queue中拉取reader往netty下游writeAndFlash()数据了,没什么好讲的

    来看一下他判断Credit是否满足的地方

    可以看到只有当

        有数据且可用的Credit数量大于0

        或者有数据且数据是一个事件而不是record的时候,才返回true往下游发送

    可以看到这个 enqueueAvailableReader()方法比较重要,里面包含了判断credit以及往后下游发送数据的逻辑

    那这个enqueueAvailableReader()方法除了会在接收到下游的Credit的时候触发一次,还有哪会被触发呢

    既然是往下游发送数据那我们task处理完数据以后应该也会调用这个方法

    于是来看一下Task发送数据,以前的文章讲过,这里就不赘述了,直接看到RecordWriterOutput的emit()

    会先将record写入到这个Serializer里面去

    然后copyFromSerializerToTargetChannel()方法中

     先去localBufferPool中请求buffer,这里就是反压了

    请求到buffer了以后

    这个调用链有点长不全列出来了

    最后

     这个requestQueue其实是前面Netty初始化时具体逻辑中的4,是一个ChannelInboundHandlerAdapter

     这个Inbound一开始我也很疑惑,这个Inbound没有重写他的channelRead()方法,那这个不就直接转发数据了吗,那他的作用是干嘛的呢

    继续往下看

    原来发送数据的时候会触发这个inbound的eventTrigger

    看下userEventTriggered()具体触发了什么

    这个方法就很眼熟了,就是前面到接收到下游发送过来的Credit时会触发一次的方法,用来判断是否有Credit以及通过netty往下游发送数据

    这里在发送数据的时候果然又触发了,后面就是判断是否有Credit满足往下游发送数据的条件,然后往下游发送数据

    也就是说

        当接收到下游返回的Credit的时候会触发一次,是否能往下游写数据的判断并拉queue数据写数据

        每次Task处理完数据以后emit,也会触发一次判断并拉queue数据写数据

  • 相关阅读:
    qt中qmake的详解
    教程:从零开始 使用Python进行深度学习!
    win10系统下搭建Python开发环境和TensorFlow深度学习环境(CPU版)
    怎么选择视觉光源颜色
    pycharm安装及设置中文
    新建DataSet和DataTable,并从中提取数据到文本
    网站服务基础面试
    TCP、UDP数据包大小的限制
    TCP的三次握手与四次挥手理解及面试题(很全面)
    zabbix服务深入
  • 原文地址:https://www.cnblogs.com/ljygz/p/11837033.html
Copyright © 2011-2022 走看看