zoukankan      html  css  js  c++  java
  • SparkRPC源码分析之RPC管道与消息类型

    SparkRPC源码分析之RPC管道与消息类型
    我们前面看过了netty基础知识扫盲,那我们应该明白,ChannelHandler这个组件内为channel的各种事件提供了处理逻辑,也就是主要业务逻辑写在该组建内。Spark的RPC也不会例外,因此我们看一下Spark的Handler怎么调用的。在TransPortClientFactory初始化客户端之前有一条代码为TransportChannelHandler clientHandler = context.initializePipeline(ch);这里的context定义的地方为private final TransportContext context;也就时我们接下来看TransoprtContext类的方法,代码如下

    public TransportChannelHandler initializePipeline(SocketChannel channel) {
    return initializePipeline(channel, rpcHandler);
    }
    1
    2
    3
    可以看到这里的initializePipeline调用了另一个initializePipeline方法,它的代码如下

    public TransportChannelHandler initializePipeline(
    SocketChannel channel,
    RpcHandler channelRpcHandler) {
    try {
    TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
    channel.pipeline()
    .addLast("encoder", ENCODER)
    .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
    .addLast("decoder", DECODER)
    .addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
    // NOTE: Chunks are currently guaranteed to be returned in the order of request, but this
    // would require more logic to guarantee if this were not part of the same event loop.
    .addLast("handler", channelHandler);
    return channelHandler;
    } catch (RuntimeException e) {
    logger.error("Error while initializing Netty pipeline", e);
    throw e;
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    这里面和我们前面netty基础知识扫盲里面做的内容很类似,就是给pipeline启动添加了一些Handler处理逻辑
    通过addLast添加的Handler会被依次执行顺序或倒序,那么我们就来依次看一些他的Handler都干了什么。

    addLast(“encoder”, ENCODER)
    服务器端用来编码服务器到客户端响应的编码器。通过调用消息的encode()方法对其进行编码。对于非数据消息,将添加一个ByteBuf到“out”,其中包含总帧长度、消息类型和消息本身。在ChunkFetchSuccess的情况下,我们还将与数据对应的ManagedBuffer添加到“Out”,以便启用零拷贝传输。一般会出现的消息类型如下

    0 ChunkFetchRequest;
    1 ChunkFetchSuccess;
    2 ChunkFetchFailure;
    3 RpcRequest;
    4 RpcResponse;
    5 RpcFailure;
    6 StreamRequest;
    7 StreamResponse;
    8 StreamFailure;
    9 OneWayMessage;
    1 UploadStream;
    -1 User
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
    - 一种允许截取原始数据的定制帧解码器。
    - 类似于Netty的帧解码器(具有符合此库需要的硬编码参数)但是不同的是它在封装成帧之前允许去装拦截器直接去读取数据
    - 与Netty的帧解码器不同,每个帧在解码后立即被发送给子处理程序,而不是尽可能多地放入当前缓冲区一次性发出去。这允许子处理程序在需要时安装拦截器。
    - 如果安装了拦截器,则停止封装成帧,数据将直接输入拦截器,当拦截器指示它不需要读取任何更多数据时,封装恢复

    .addLast(“decoder”, DECODER)
    客户端用来解码服务器到客户端响应的解码器。消息类型和加密端一样不再重复写了

    .addLast(“idleStateHandler”, new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
    在服务器和客户端之间一定时间内没有数据交互时, 即处于 idle【空闲】 状态时, 客户端或服务器会发送一个特殊的数据包给对方, 当接收方收到这个数据报文后, 也立即发送一个特殊的数据报文, 回应发送方, 此即一个 PING-PONG 交互,确保TCP连接有效

    .addLast(“handler”, channelHandler);

    channelHandler的创建代码如下

    TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
    1
    createChannelHandler代码如下

    private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
    TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
    TransportClient client = new TransportClient(channel, responseHandler);
    TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
    rpcHandler, conf.maxChunksBeingTransferred());
    return new TransportChannelHandler(client, responseHandler, requestHandler,
    conf.connectionTimeoutMs(), closeIdleConnections);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    值得注意的一点,我们可以看到这里面有客户端的初始化new TransportClient(channel, responseHandler);也许大家会有疑惑,我们前面才看了代码TransportClientFactory中有初始化TransportClient的代码,怎么这里也有呢?

    这里分析一下TransportClientFactory中创建TransportClient时的情况,可以看到代码如下

    final AtomicReference<TransportClient> clientRef = new AtomicReference<>();
    final AtomicReference<Channel> channelRef = new AtomicReference<>();
    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) {
    TransportChannelHandler clientHandler = context.initializePipeline(ch);
    clientRef.set(clientHandler.getClient());
    channelRef.set(ch);
    }
    });
    …… 省略掉一部分代码
    TransportClient client = clientRef.get();
    …… 省略掉一部分代码
    return client;
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    可以看到这里的客户端与其说时创建不如说是获取,从clientHandler中获取,这么看来,客户端真正的创建的地方是在new关键字出现的地方,也就是这里,而TransportClientFactory中的创建不过是从这边取到的而已。

    接着看TransportChannelHandler这个类到底为何方神圣?

    从类图上可以看出来,这个类实现了ChannelInboundHandler接口,那么这个接口是干什么的呢?

    ChannelInboundHandler是一个netty的组件,它是一个常用的Handler。这个Handler的作用就是处理接收到数据时的事件,我们的业务逻辑一般就是写在这个Handler里面。

    这个TransportChannelHandler的处理业务逻辑是什么呢?看下面代码可知它重写了channelRead方法

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception {
    if (request instanceof RequestMessage) {
    requestHandler.handle((RequestMessage) request);
    } else if (request instanceof ResponseMessage) {
    responseHandler.handle((ResponseMessage) request);
    } else {
    ctx.fireChannelRead(request);
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    这里它主要判断请求是什么类型的数据,根据类型交给TransportResponseHandler或者TransportRequestHandler的对象去处理。

    这里可以看出无论是TransportRequestHandler还是TransportResponseHandler都是继承于MessageHandler抽象类。
    那么我们就来看一下MessageHandler,看一下他的方法发现上面调用的handle方法都是来自于重写该类的方法.

    public abstract class MessageHandler<T extends Message> {
    //处理单个消息的接收。
    public abstract void handle(T message) throws Exception;
    //当MessageHandler所在的通道处于活动状态时调用
    public abstract void channelActive();
    //在通道上捕获异常时调用
    public abstract void exceptionCaught(Throwable cause);
    //当MessageHandler所在的通道处于非活动状态时调用
    public abstract void channelInactive();
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    那么我们先看TransportRequestHandler重写的handle方法

    @Override
    public void handle(RequestMessage request) {
    if (request instanceof ChunkFetchRequest) {
    processFetchRequest((ChunkFetchRequest) request);
    } else if (request instanceof RpcRequest) {
    processRpcRequest((RpcRequest) request);
    } else if (request instanceof OneWayMessage) {
    processOneWayMessage((OneWayMessage) request);
    } else if (request instanceof StreamRequest) {
    processStreamRequest((StreamRequest) request);
    } else if (request instanceof UploadStream) {
    processStreamUpload((UploadStream) request);
    } else {
    throw new IllegalArgumentException("Unknown request type: " + request);
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    可以看出,在这里用了if-else逻辑判断消息的类型,然后再交给相应的方法去处理。那么一共有多少消息呢?它都可以处理什么消息呢?请看类图


    那么现在再看一下TransportResponseHandler它复写的handle方法逻辑如下

    @Override
    public void handle(ResponseMessage message) throws Exception {
    if (message instanceof ChunkFetchSuccess) {
    ChunkFetchSuccess resp = (ChunkFetchSuccess) message;
    ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
    if (listener == null) {
    logger.warn("Ignoring response for block {} from {} since it is not outstanding",
    resp.streamChunkId, getRemoteAddress(channel));
    resp.body().release();
    } else {
    outstandingFetches.remove(resp.streamChunkId);
    listener.onSuccess(resp.streamChunkId.chunkIndex, resp.body());
    resp.body().release();
    }
    } else if (message instanceof ChunkFetchFailure) {
    ChunkFetchFailure resp = (ChunkFetchFailure) message;
    ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
    if (listener == null) {
    logger.warn("Ignoring response for block {} from {} ({}) since it is not outstanding",
    resp.streamChunkId, getRemoteAddress(channel), resp.errorString);
    } else {
    outstandingFetches.remove(resp.streamChunkId);
    listener.onFailure(resp.streamChunkId.chunkIndex, new ChunkFetchFailureException(
    "Failure while fetching " + resp.streamChunkId + ": " + resp.errorString));
    }
    } else if (message instanceof RpcResponse) {
    RpcResponse resp = (RpcResponse) message;
    RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);
    if (listener == null) {
    logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding",
    resp.requestId, getRemoteAddress(channel), resp.body().size());
    } else {
    outstandingRpcs.remove(resp.requestId);
    try {
    listener.onSuccess(resp.body().nioByteBuffer());
    } finally {
    resp.body().release();
    }
    }
    } else if (message instanceof RpcFailure) {
    RpcFailure resp = (RpcFailure) message;
    RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);
    if (listener == null) {
    logger.warn("Ignoring response for RPC {} from {} ({}) since it is not outstanding",
    resp.requestId, getRemoteAddress(channel), resp.errorString);
    } else {
    outstandingRpcs.remove(resp.requestId);
    listener.onFailure(new RuntimeException(resp.errorString));
    }
    } else if (message instanceof StreamResponse) {
    StreamResponse resp = (StreamResponse) message;
    Pair<String, StreamCallback> entry = streamCallbacks.poll();
    if (entry != null) {
    StreamCallback callback = entry.getValue();
    if (resp.byteCount > 0) {
    StreamInterceptor<ResponseMessage> interceptor = new StreamInterceptor<>(
    this, resp.streamId, resp.byteCount, callback);
    try {
    TransportFrameDecoder frameDecoder = (TransportFrameDecoder)
    channel.pipeline().get(TransportFrameDecoder.HANDLER_NAME);
    frameDecoder.setInterceptor(interceptor);
    streamActive = true;
    } catch (Exception e) {
    logger.error("Error installing stream handler.", e);
    deactivateStream();
    }
    } else {
    try {
    callback.onComplete(resp.streamId);
    } catch (Exception e) {
    logger.warn("Error in stream handler onComplete().", e);
    }
    }
    } else {
    logger.error("Could not find callback for StreamResponse.");
    }
    } else if (message instanceof StreamFailure) {
    StreamFailure resp = (StreamFailure) message;
    Pair<String, StreamCallback> entry = streamCallbacks.poll();
    if (entry != null) {
    StreamCallback callback = entry.getValue();
    try {
    callback.onFailure(resp.streamId, new RuntimeException(resp.error));
    } catch (IOException ioe) {
    logger.warn("Error in stream failure handler.", ioe);
    }
    } else {
    logger.warn("Stream failure with unknown callback: {}", resp.error);
    }
    } else {
    throw new IllegalStateException("Unknown response type: " + message.type());
    }
    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    详细代码暂且不管,大体可以看出也是使用if-else逻辑判断消息的类型,然后分别进行处理。那么我么来看一些这里的消息。
    --------------------- 

  • 相关阅读:
    Java实现 蓝桥杯 历届试题 网络寻路
    Joda-Time 简介
    Eclipse自动生成返回值对象的快捷键是什么?
    eclipse中使用Maven管理java工程设置jdk版本为jdk1.8
    Windows10系统下,彻底删除卸载MySQL
    win10 安装 mysql解压版安装步骤
    使用MySQL Workbench建立数据库,建立新的表,向表中添加数据
    ubuntu安装mysql可视化工具MySQL-workbench及简单操作
    MySQL Linux压缩版安装方法
    【Linux】MySQL解压版安装及允许远程访问
  • 原文地址:https://www.cnblogs.com/hyhy904/p/11007502.html
Copyright © 2011-2022 走看看