zoukankan      html  css  js  c++  java
  • spark2.1源码分析4:spark-network-common模块的设计原理

    spark-network-common模块底层使用netty作为通讯框架,可以实现rpc消息、数据块和数据流的传输。



    Message类图

    这里写图片描述
    所有request消息都是RequestMessage的子类
    所有response消息都是ResponseMessage的子类



    TransportClient主要提供了三个方法:

    //通过给定的streamId,获取远端数据流
    public void stream(final String streamId, final StreamCallback callback);
    //发送一份不透明的消息到远端
    public long sendRpc(ByteBuffer message, final RpcResponseCallback callback);
    //通过给定的streamId,获取远端的数据块
    public void fetchChunk(long streamId,final int chunkIndex,final ChunkReceivedCallback callback);



    Channel Pipeline:

    通过TransportClientFactory的createClient方法追踪ChannelInitializer设置,最后在TransportContext的initializePipeline方法中可以看到具体的Handler的配置:

    channel.pipeline()
            .addLast("encoder", encoder)  
            .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())  
            .addLast("decoder", decoder)
            .addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
            .addLast("handler", channelHandler);

    此处得到的handler链为:

    MessageEncoder-->TransportFrameDecoder-->MessageDecoder-->IdleStateHandler-->TransportChannelHandler

    MessageEncoder:负责将消息转换为netty框架中的ByteBuf
    MessageDecoder:负责网络传输的的ByteBuf转换为具体的消息
    TransportFrameDecoder:负责接收网络传输的ByteBuf,解析为一个指定大小的ByteBuf交予MessageDecoder,或者交给StreamInterceptor处理
    IdleStateHandler:心跳检测
    TransportChannelHandler:负责消息的具体处理



    发送端发送消息的流程:

    1. 通过TransportClient的实例发送RequestMessage消息
    2. MessageEncoder把消息转换为ByteBuf
      所有RequestMessage的子类都继承了AbstractMessage,而AbstractMessage有一个叫body的filed,该字段在RpcRequest中被用来存储具体的请求内容(不止RpcRequest)。当body为空时该消息直接转换为ByteBuf;不为null时,MessageEncoder将消息转换为MessageWithHeader,MessageWithHeader继承了AbstractReferenceCounted ,实现了FileRegion ,最后消息仍会转换为ByteBuf。注意:MessageWithHeader提供了发送文件的能力。

      MessageWithHeader类:
      class MessageWithHeader extends AbstractReferenceCounted implements FileRegion

    3. ByteBuf被发送到网络(ByteBuf中包含这个消息的总长度、字段长度、具体内容等信息)



    接收端接收消息并响应:

    1. TransportFrameDecoder负责拼接一个RequestMessage所需的完整ByteBuf
    2. MessageDecoder将消息解析为一个RequestMessage消息
    3. TransportChannelHandler将消息交给TransportRequestHandler具体处理
    4. TransportRequestHandler将ResponseMessage消息传递给MessageEncoder
    5. MessageEncoder把消息转换为ByteBuf(同上)



    发送端接收响应消息:

    1. TransportFrameDecoder负责拼接一个消息所需的完整ByteBuf,如果是StreamResponse消息并且body是一个FileRegion,那么先拼接这个消息的ByteBuf(注意:StreamResponse消息最终传输到网络上时本身不包含FileRegion的ByteBuf)
    2. MessageDecoder将消息解析为一个ResponseMessage消息
    3. TransportChannelHandler将消息交给TransportResponseHandler具体处理
    4. 如果StreamResponse是一个包含FileRegion的消息,TransportResponseHandler在Channel Pipeline中添加一个handler:StreamInterceptor。TransportFrameDecoder将使用StreamInterceptor处理后续的FileRegion的ByteBuf。
  • 相关阅读:
    Source InSight context 窗口丢失的解决办法
    [EffectiveC++]item41:了解隐式接口和编译器多态
    [EffectiveC++]item04:Make sure the objects are initialized before they're used
    [EffectiveC++]item3:尽可能使用const
    linux man指令问题
    解读ARM成功秘诀:薄利多销推广产品
    source insight设置问题 [问题点数:20分,结帖人leecapacity]
    totalcommander
    firefox
    处理SecureCRT中使用vim出现中文乱码问题
  • 原文地址:https://www.cnblogs.com/ggzone/p/10121119.html
Copyright © 2011-2022 走看看