zoukankan      html  css  js  c++  java
  • Flink 源码(二十六):Flink 内存管理(二)内存数据结构 、管理器

    2 内存数据结构

    ⚫ 内存段
      内存段在 Flink 内部叫 MemorySegment,是 Flink 中最小的内存分配单元,默认大小 32KB。它即可以是堆上内存(Java 的 byte 数组),也可以是堆外内存(基于 Netty 的
    DirectByteBuffer),同时提供了对二进制数据进行读取和写入的方法。
      HeapMemorySegment:用来分配堆上内存
      HybridMemorySegment:用来分配堆外内存和堆上内存,2017 年以后的版本实际上只使用了 HybridMemo`rySegment。
      如下图展示一个内嵌型的 Tuple3<Integer,Double,Person> 对象的序列化过程:
      可以看出这种序列化方式存储密度是相当紧凑的。其中 int 占 4 字节,double 占 8 字节,POJO 多个一个字节的 header,PojoSerializer 只负责将 header 序列化进去,并委托每个字段对应的 serializer 对字段进行序列化。
    ⚫ 内存页
      内存页是 MemorySegment 之上的数据访问视图,数据读取抽象为 DataInputView,数据写入抽象为 DataOutputView。使用时就无需关心 MemorySegment 的细节,会自
    动处理跨 MemorySegment 的读取和写入。
    ⚫ Buffer
      Task 算子之间在网络层面上传输数据,使用的是 Buffer,申请和释放由 Flink自行管理,实现类为 NetworkBuffer。1 个 NetworkBuffer 包装了 1 个MemorySegment。同时继承了 AbstractReferenceCountedByteBuf,是 Netty 中的抽象类。
    ⚫ Buffer 资源池
      BufferPool 用来管理 Buffer,包含 Buffer 的申请、释放、销毁、可用 Buffer 通知等,实现类是 LocalBufferPool,每个 Task 拥有自己的 LocalBufferPool。
      BufferPoolFactory 用 来 提 供 BufferPool 的 创 建 和 销 毁 , 唯 一 的 实 现 类 是NetworkBufferPool , 每 个 TaskManager 只 有 一 个 NetworkBufferPool 。 同一个TaskManager 上的 Task 共享 NetworkBufferPool,在 TaskManager 启动的时候创建并分配内存。 

    3 内存管理器

      MemoryManager 用来管理 Flink 中用于排序、Hash 表、中间结果的缓存或使用堆外内存的状态后端(RocksDB)的内存。
    1.10 之前版本,负责 TaskManager 所有内存。
    1.10 版本开始,管理范围是 Slot 级别。
    ⚫ 堆外内存资源申请:
    MemoryManager.java 
    MemorySegmentFactory.java
    ⚫ RocksDB 自己负责内存申请和释放
    RocksDBOperationUtils.java

     

    MemoryManager.java 

    4 网络传输中的内存管理

      网络上传输的数据会写到 Task 的 InputGate(IG)中,经过 Task 的处理后,再由 Task写到 ResultPartition(RS) 中。每个 Task 都包括了输入和输入,输入和输出的数据存在Buffer 中(都是字节数据)。Buffer 是 MemorySegment 的包装类。
      1)TaskManager(TM)在启动时,会先初始化 NetworkEnvironment 对象,TM 中所有与网络相关的东西都由该类来管理(如 Netty 连接),其中就包括 NetworkBufferPool。根据配置, Flink 会 在 NetworkBufferPool 中 生 成 一 定 数 量 ( 默 认 2048 ) 的 内 存 块MemorySegment(关于 Flink 的内存管理,后续文章会详细谈到),内存块的总数量就代表了网络传输中所有可用的内存。NetworkEnvironment 和 NetworkBufferPool 是 Task 之间共享的,每个 TM 只会实例化一个。
      2)Task 线程启动时,会向 NetworkEnvironment 注册,NetworkEnvironment 会为 Task的 InputGate(IG)和 ResultPartition(RP) 分别创建一个 LocalBufferPool(缓冲池)并设置可申请的 MemorySegment(内存块)数量。IG 对应的缓冲池初始的内存块数量与 IG 中InputChannel 数量一致,RP 对应的缓冲池初始的内存块数量与 RP 中的 ResultSubpartition数量一致。不过,每当创建或销毁缓冲池时,NetworkBufferPool 会计算剩余空闲的内存块数量,并平均分配给已创建的缓冲池。注意,这个过程只是指定了缓冲池所能使用的内存块数量,并没有真正分配内存块,只有当需要时才分配。为什么要动态地为缓冲池扩容呢?因为内存越多,意味着系统可以更轻松地应对瞬时压力(如 GC),不会频繁地进入反压状态,所以我们要利用起那部分闲置的内存块。
      3)在 Task 线程执行过程中,当 Netty 接收端收到数据时,为了将 Netty 中的数据拷贝到 Task 中,InputChannel(实际是 RemoteInputChannel)会向其对应的缓冲池申请内存块(上图中的①)。如果缓冲池中也没有可用的内存块且已申请的数量还没到池子上限,则会向 NetworkBufferPool 申请内存块(上图中的②)并交给 InputChannel 填上数据(上图中的③和④)。如果缓冲池已申请的数量达到上限了呢?或者 NetworkBufferPool 也没有可用内存块了呢?这时候,Task 的 Netty Channel 会暂停读取,上游的发送端会立即响应停止发送,拓扑会进入反压状态。当 Task 线程写数据到 ResultPartition 时,也会向缓冲池请求内存块,如果没有可用内存块时,会阻塞在请求内存块的地方,达到暂停写入的目的。 
      4)当一个内存块被消费完成之后(在输入端是指内存块中的字节被反序列化成对象了,在输出端是指内存块中的字节写入到 Netty Channel 了),会调用 Buffer.recycle() 方法,会将内存块还给 LocalBufferPool (上图中的⑤)。如果 LocalBufferPool 中当前申请的数量超过了池子容量(由于上文提到的动态容量,由于新注册的 Task 导致该池子容量变小),则LocalBufferPool 会将该内存块回收给 NetworkBufferPool(上图中的⑥)。如果没超过池子容量,则会继续留在池子中,减少反复申请的开销。
     
    ⚫ 反压的过程 
      1)记录“A”进入了 Flink 并且被 Task 1 处理。(这里省略了 Netty 接收、反序列化等过程)
      2)记录被序列化到 buffer 中。
      3)该 buffer 被发送到 Task 2,然后 Task 2 从这个 buffer 中读出记录。
      记录能被 Flink 处理的前提是:必须有空闲可用的 Buffer。
      结合上面两张图看:Task 1 在输出端有一个相关联的 LocalBufferPool(称缓冲池 1),Task 2 在输入端也有一个相关联的 LocalBufferPool(称缓冲池 2)。如果缓冲池 1 中有空闲可用的 buffer 来序列化记录 “A”,我们就序列化并发送该 buffer。
      注意两个场景:
      1)本地传输:如果 Task 1 和 Task 2 运行在同一个 worker 节点(TaskManager),该buffer 可以直接交给下一个 Task。一旦 Task 2 消费了该 buffer,则该 buffer 会被缓冲池 1回收。如果 Task 2 的速度比 1 慢,那么 buffer 回收的速度就会赶不上 Task 1 取 buffer的速度,导致缓冲池 1 无可用的 buffer,Task 1 等待在可用的 buffer 上。最终形成 Task 1的降速。
      2)远程传输:如果 Task 1 和 Task 2 运行在不同的 worker 节点上,那么 buffer 会在发送到网络(TCP Channel)后被回收。在接收端,会从 LocalBufferPool 中申请 buffer,然后拷贝网络中的数据到 buffer 中。如果没有可用的 buffer,会停止从 TCP 连接中读取数据。在输出端,通过 Netty 的水位值机制来保证不往网络中写入太多数据(后面会说)。如果网络中的数据(Netty 输出缓冲中的字节数)超过了高水位值,我们会等到其降到低水位值以下才继续写入数据。这保证了网络中不会有太多的数据。如果接收端停止消费网络中的数据(由于接收端缓冲池没有可用 buffer),网络中的缓冲数据就会堆积,那么发送端也会暂停发送。另外,这会使得发送端的缓冲池得不到回收,writer 阻塞在向 LocalBufferPool 请求buffer,阻塞了 writer 往 ResultSubPartition 写数据。 
      这种固定大小缓冲池就像阻塞队列一样,保证了 Flink 有一套健壮的反压机制,使得Task 生产数据的速度不会快于消费的速度。我们上面描述的这个方案可以从两个 Task 之间的数据传输自然地扩展到更复杂的 pipeline 中,保证反压机制可以扩散到整个 pipeline。
     
     
     
  • 相关阅读:
    证券创新之翼 阿里金融云
    通过改变计算机策略来解决“只能通过Chrome网上应用商店安装该程序”的方法及模版文件下载
    VMware Workstation pro 12下载以及序列号
    Centos7下配置Tomcat7以指定(非root)身份运行
    apache2: Could not reliably determine the server's fully qualified domain name
    apache使用ssl数字证书
    苹果远程控制
    excel 组及分级显示制作教程
    Apache 性能优化
    Pigs and chickens
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/14527204.html
Copyright © 2011-2022 走看看