zoukankan      html  css  js  c++  java
  • 如何实现一个RPC框架2 ——用Netty实现协议通信

    前言:

    ​ 大家好,我是秋雨清笛,一个在读学生。这两个月里我初步实现了一个简单的RPC框架。做这个RPC框架的主要目的是为了学习,让自己能在平时的CRUD之余学习到一些不一样的东西,了解更多造轮子过程中的细节。实现并不是很复杂,主要目的还是学习。

    接下来我将以几篇博文来谈谈我是怎么实现它的,遇到了一些什么样的技术细节。大家可以参照以下链接的源码:

    https://github.com/PanYuDi/qyqd-rpc

    本篇主要内容是用Netty实现通信

    一、实现服务端

    1. 定义接口
    public interface RpcServer {
        /**
         * 启动Rpc服务端
         */
        void start() throws InterruptedException;
    }
    
    
    1. 写NettyServer
    @Slf4j
    public class NettyServer implements RpcServer {
        EventLoopGroup bossGroup;
        EventLoopGroup workerGroup;
        @Override
        public void start() {
            bossGroup = new NioEventLoopGroup();
            workerGroup = new NioEventLoopGroup();
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new NettyChannelHandlerInitializer());
            try {
                ChannelFuture cf = bootstrap.bind(RpcConfig.PORT).sync();
                log.info("rpc server started at port " + RpcConfig.PORT);
                cf.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                log.error("occur exception when start server:", e);
            } finally {
                log.info("netty server closed");
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
    
    
        }
    }
    
    

    需要说的不多,基本的Netty服务端模板,值得一提的是

    .childOption(ChannelOption.SO_KEEPALIVE, true)
    

    这一句启动了TCP长连接,为我们以后实现心跳机制和链接缓存埋下了伏笔

    二、实现客户端

    大家可以参考NettyClient客户端的代码,这里涉及到心跳机制的实现,具体实现我们下一篇再仔细讨论,我们要知道的是客户端唯一目的就是发送数据

    public interface RpcClient {
        /**
         * 传输信息
         * @param message
         */
        public Object send(ProtocolRequestEndpointWrapper message);
    }
    
    

    三、编码和解码

    这节我们这篇聊的重点

    我们在上一篇定义了协议,现在我们要在Netty通信中实现这个协议,我们将在Netty handler调用链中实现协议的编码和解码。

    Netty调用链机制分为了InboundHandler和OutboundHandler,这里不做赘诉。我们要实现的是把一个ProtocolMessage塞进Netty客户端然后经过客户端和服务端的handler能原封不动的由服务端拿到这个类

    编码的实现

    public class ChannelMessageEncoder extends MessageToByteEncoder<ProtocolMessage> {
    
        @Override
        protected void encode(ChannelHandlerContext ctx, ProtocolMessage msg, ByteBuf out) throws Exception {
            // 写入魔数
            out.writeBytes(ProtocolConstant.MAGIC);
            if(msg.getContent().length >= ProtocolConstant.MAX_FRAME_LENGTH) {
                throw new MessageCodecException("message size exceed");
            }
            // 长度包含了content长度加上其他字段的长度
            out.writeInt(msg.getContent().length + 14);
            // 写入版本号
            out.writeByte(ProtocolConstant.CURRENT_VERSION);
            // 写入消息类型
            out.writeByte(msg.getMessageType().getCode());
            // 写入requestId
            out.writeInt(msg.getRequestId());
            // 写入消息内容
            out.writeBytes(msg.getContent());
        }
    }
    
    

    我们使用Netty的MessageToByteEncoder,它可以将一个类写成Byte流。我们仅需按照我们定义的协议顺序依次写入

    解码的实现

    @Slf4j
    public class ChannelMessageDecoder extends LengthFieldBasedFrameDecoder {
        public ChannelMessageDecoder() {
            super(ProtocolConstant.MAX_FRAME_LENGTH, 4,4, -8, 0);
    
        }
        public ChannelMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
            super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
        }
    
        @Override
        protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
            ProtocolMessage protocolMessage = new ProtocolMessage();
            Object decoded = super.decode(ctx, in);
            if (decoded instanceof ByteBuf) {
                ByteBuf buf = (ByteBuf) decoded;
                return decodeFrame(buf);
            }
            return decoded;
        }
    
        private ProtocolMessage decodeFrame(ByteBuf buf) {
            ProtocolMessage protocolMessage = new ProtocolMessage();
            checkMagic(buf);
            protocolMessage.setLen(buf.readInt() - 14);
            checkVersion(buf);
            protocolMessage.setMessageType(ProtocolMessageTypeEnum.getEnum(buf.readByte()));
            protocolMessage.setRequestId(buf.readInt());
            byte[] content = new byte[protocolMessage.getLen()];
            buf.readBytes(content);
            protocolMessage.setContent(content);
            return protocolMessage;
        }
        private void checkMagic(ByteBuf buf) {
            byte[] magicData = new byte[ProtocolConstant.MAGIC_LEN];
            buf.readBytes(magicData);
            for(int i = 0; i < magicData.length; i++) {
                if(magicData[i] != ProtocolConstant.MAGIC[i]) {
                    throw new MessageCodecException("magic check failed");
                }
            }
        }
        private void checkVersion(ByteBuf buf) {
            byte b = buf.readByte();
            Integer version = Integer.valueOf(b);
            if(!version.equals(ProtocolConstant.CURRENT_VERSION)) {
                throw new MessageCodecException("version check failed");
            }
        }
    }
    
    

    Netty提供的LengthFieldBasedFrameDecoder为我们提供了方便的解码方式

    它的作用主要是按照偏移量先读取报文长度,然后自动读取到整个报文

        public ChannelMessageDecoder() {
            super(ProtocolConstant.MAX_FRAME_LENGTH, 4,4, -8, 0);
    
        }
    

    构造函数的后四个参数决定了怎样读取这个报文

    1. lengthFieldOffset 长度域偏移量,因为我们的长度域之前有一个4B的魔数,所以这里我们传4
    2. lengthFieldLength 长度与的大小,我们协议中定义的是4B,所以传4
    3. lengthAdjustment 决定了读取完长度后,我们从哪里开始读?因为我们还是需要读取魔数和长度,所以我们回到开始的地方,也就是-8
    4. initialBytesToStrip 指定这个字段可以忽略到请求头的一些数据,我们设置为0不去跳过

    了解这个解码器的原理后就简单了,接下来我们要做的就是依次从ByteBuf中读取数据就行了,也不用担心读多了,读多了也会抛出异常的,很容易debug

    四、后续的处理

    下一步就是在handler中继续处理ProtocolMessage了

    public class NettyRpcServerChannelHandler extends ChannelInboundHandlerAdapter {
        MessageHandler messageHandler = new ServerMessageHandlerContextFactory().create();
        public NettyRpcServerChannelHandler() {
            super();
        }
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if(msg instanceof ProtocolMessage) {
                ProtocolMessageUtils.setRequestId(((ProtocolMessage) msg).getRequestId());
                if(messageHandler.canHandle(msg)) {
                    messageHandler.handle((RequestMessage) msg, ctx);
                }
            }
        }
    }
    

    这里的MessageHandler是我自己定义的抽象接口,这个handler会按照类型依次处理请求。

    接下来就是具体的业务处理了,通过Netty的协议通信也基本实现了,下一篇将介绍心跳机制的实现

  • 相关阅读:
    区间DP入门
    Prime Permutation(思维好题 )
    小字辈 (bfs好题)
    博弈论小结之尼姆博弈
    Hometask
    Lucky Sum (dfs打表)
    对称博弈
    尼姆博弈
    莫队算法 ( MO's algorithm )
    Codeforces 988D Points and Powers of Two ( 思维 || 二的幂特点 )
  • 原文地址:https://www.cnblogs.com/PanYuDi/p/15795481.html
Copyright © 2011-2022 走看看