zoukankan      html  css  js  c++  java
  • java 从零开始手写 RPC (04) -序列化

    序列化

    java 从零开始手写 RPC (01) 基于 socket 实现

    java 从零开始手写 RPC (02)-netty4 实现客户端和服务端

    java 从零开始手写 RPC (03) 如何实现客户端调用服务端?

    前面几节我们实现了最基础的客户端调用服务端,这一节来学习一下通讯中的对象序列化。

    fastjson

    为什么需要序列化

    netty 底层都是基于 ByteBuf 进行通讯的。

    前面我们通过编码器/解码器专门为计算的入参/出参进行处理,这样方便我们直接使用 pojo。

    但是有一个问题,如果想把我们的项目抽象为框架,那就需要为所有的对象编写编码器/解码器。

    显然,直接通过每一个对象写一对的方式是不现实的,而且用户如何使用,也是未知的。

    序列化的方式

    基于字节的实现,性能好,可读性不高。

    基于字符串的实现,比如 json 序列化,可读性好,性能相对较差。

    ps: 可以根据个人还好选择,相关序列化可参考下文,此处不做展开。

    json 序列化框架简介

    实现思路

    可以将我们的 Pojo 全部转化为 byte,然后 Byte 转换为 ByteBuf 即可。

    反之亦然。

    代码实现

    maven

    引入序列化包:

    <dependency>
        <groupId>com.github.houbb</groupId>
        <artifactId>json</artifactId>
        <version>0.1.1</version>
    </dependency>
    

    服务端

    核心

    服务端的代码可以大大简化:

    serverBootstrap.group(workerGroup, bossGroup)
        .channel(NioServerSocketChannel.class)
        // 打印日志
        .handler(new LoggingHandler(LogLevel.INFO))
        .childHandler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel ch) throws Exception {
                ch.pipeline()
                        .addLast(new RpcServerHandler());
            }
        })
        // 这个参数影响的是还没有被accept 取出的连接
        .option(ChannelOption.SO_BACKLOG, 128)
        // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。
        .childOption(ChannelOption.SO_KEEPALIVE, true);
    

    这里只需要一个实现类即可。

    RpcServerHandler

    服务端的序列化/反序列化调整为直接使用 JsonBs 实现。

    package com.github.houbb.rpc.server.handler;
    
    import com.github.houbb.json.bs.JsonBs;
    import com.github.houbb.log.integration.core.Log;
    import com.github.houbb.log.integration.core.LogFactory;
    import com.github.houbb.rpc.common.model.CalculateRequest;
    import com.github.houbb.rpc.common.model.CalculateResponse;
    import com.github.houbb.rpc.common.service.Calculator;
    import com.github.houbb.rpc.server.service.CalculatorService;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    /**
     * @author binbin.hou
     * @since 0.0.1
     */
    public class RpcServerHandler extends SimpleChannelInboundHandler {
    
        private static final Log log = LogFactory.getLog(RpcServerHandler.class);
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            final String id = ctx.channel().id().asLongText();
            log.info("[Server] channel {} connected " + id);
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            final String id = ctx.channel().id().asLongText();
    
            ByteBuf byteBuf = (ByteBuf)msg;
            byte[] bytes = new byte[byteBuf.readableBytes()];
            byteBuf.readBytes(bytes);
            CalculateRequest request = JsonBs.deserializeBytes(bytes, CalculateRequest.class);
            log.info("[Server] receive channel {} request: {} from ", id, request);
    
            Calculator calculator = new CalculatorService();
            CalculateResponse response = calculator.sum(request);
    
            // 回写到 client 端
            byte[] responseBytes = JsonBs.serializeBytes(response);
            ByteBuf responseBuffer = Unpooled.copiedBuffer(responseBytes);
            ctx.writeAndFlush(responseBuffer);
            log.info("[Server] channel {} response {}", id, response);
        }
    
    }
    

    客户端

    核心

    客户端可以简化如下:

    channelFuture = bootstrap.group(workerGroup)
        .channel(NioSocketChannel.class)
        .option(ChannelOption.SO_KEEPALIVE, true)
        .handler(new ChannelInitializer<Channel>(){
            @Override
            protected void initChannel(Channel ch) throws Exception {
                channelHandler = new RpcClientHandler();
                ch.pipeline()
                        .addLast(new LoggingHandler(LogLevel.INFO))
                        .addLast(channelHandler);
            }
        })
        .connect(RpcConstant.ADDRESS, port)
        .syncUninterruptibly();
    

    RpcClientHandler

    客户端的序列化/反序列化调整为直接使用 JsonBs 实现。

    package com.github.houbb.rpc.client.handler;
    
    import com.github.houbb.json.bs.JsonBs;
    import com.github.houbb.log.integration.core.Log;
    import com.github.houbb.log.integration.core.LogFactory;
    import com.github.houbb.rpc.client.core.RpcClient;
    import com.github.houbb.rpc.common.model.CalculateResponse;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    /**
     * <p> 客户端处理类 </p>
     *
     * <pre> Created: 2019/10/16 11:30 下午  </pre>
     * <pre> Project: rpc  </pre>
     *
     * @author houbinbin
     * @since 0.0.2
     */
    public class RpcClientHandler extends SimpleChannelInboundHandler {
    
        private static final Log log = LogFactory.getLog(RpcClient.class);
    
        /**
         * 响应信息
         * @since 0.0.4
         */
        private CalculateResponse response;
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf byteBuf = (ByteBuf)msg;
            byte[] bytes = new byte[byteBuf.readableBytes()];
            byteBuf.readBytes(bytes);
    
            this.response = JsonBs.deserializeBytes(bytes, CalculateResponse.class);
            log.info("[Client] response is :{}", response);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            // 每次用完要关闭,不然拿不到response,我也不知道为啥(目测得了解netty才行)
            // 个人理解:如果不关闭,则永远会被阻塞。
            ctx.flush();
            ctx.close();
        }
    
        public CalculateResponse getResponse() {
            return response;
        }
    
    }
    

    小结

    为了便于大家学习,以上源码已经开源:

    https://github.com/houbb/rpc

    希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。

    我是老马,期待与你的下次相遇。

    在这里插入图片描述

  • 相关阅读:
    toodifficult 题解
    Code Chef February Challenge 2019题解
    LOJ#3085. 「GXOI / GZOI2019」特技飞行(KDtree+坐标系变换)
    LOJ#3084. 「GXOI / GZOI2019」宝牌一大堆(递推)
    LOJ#3083. 「GXOI / GZOI2019」与或和(单调栈)
    LOJ#6046. 「雅礼集训 2017 Day8」爷(分块)
    LOJ#6045. 「雅礼集训 2017 Day8」价(最小割)
    LOJ#6044. 「雅礼集训 2017 Day8」共(Prufer序列)
    BZOJ4766: 文艺计算姬(Prufer序列)
    BZOJ3729: Gty的游戏(伪ETT)
  • 原文地址:https://www.cnblogs.com/houbbBlogs/p/15388956.html
Copyright © 2011-2022 走看看