zoukankan      html  css  js  c++  java
  • 简易RPC框架-私有协议栈

    HTTP协议

    客户机与服务端之间的数据交互需要遵守一定的约定,比如协议版本,数据类型,是否有缓存,是否有压缩等,只有在这些约定的基础上才能相互之间愉快的工作。

    Netty通信过程中的编解码

    这时说的是基于TCP/IP的Netty之间的通信。TCP/IP协议下客户端与服务端之间要进行数据交互,一般需要将数据转换成二进制格式,直接传java bean是不能支持的。在RPC模式下客户端在向服务端发起请求前需要将数据做编码,服务端在接收客户端发的数据后需要做解码之后才能正常工作。

    • 解码流程

    • 编码流程

    Netty 私有协议栈

    为了更好的控制RPC客户端与服务端之间的通信,也可以编写私有的协议栈来支撑。

    定义消息体

    类似HTTP协议,包含头信息以及内容信息。

    
    public class RpcMessage implements Serializable {
    
        private RpcMessageHeader messageHeader;
    
        private Object messageBody;
    
    }
    

    头信息,包含内容体长度,消息类型等信息。可以根据消息类型来做不同的业务,比如区分是心跳信息还是业务或者是监控之类的信息。

    
    public class RpcMessageHeader implements Serializable {
        private int length;
    
        private int type;
       
    }
    

    定义解码器

    因为TCP/IP协议容易出现粘包拆包现象,这里为了简单直接选择继承组件提供的LengthFieldBasedFrameDecoder,只需要重写下面的方法即可:

    
     public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
            ByteBuf frame=(ByteBuf)super.decode(ctx,in);
            if(null==frame){
                return null;
            }
    
            RpcMessage message=new RpcMessage();
            RpcMessageHeader messageHeader=new RpcMessageHeader();
            messageHeader.setLength(frame.readInt());
            message.setMessageHeader(messageHeader);
    
            byte[] data = new byte[message.getMessageHeader().getLength()];
            frame.readBytes(data);
    
            Object obj = ProtoStuffSerializeUtil.deserialize(data, genericClass);
            message.setMessageBody(obj);
            return message;
        }
    

    定义编码器

    编码器继承MessageToByteEncoder,将对象转换成字节的编码器

    
    public class RpcEncoder extends MessageToByteEncoder<RpcMessage>
    

    重点是下面的编码函数,在ByteBuf中输出数据长度以及数据体,如有其它需要可以补充其它的字段,比如消息类型。

    
     public void encode(ChannelHandlerContext ctx, RpcMessage in, ByteBuf out) throws Exception {
            if(null==in){
                throw new RpcException("RpcMessage is null");
            }
            if (genericClass.isInstance(in.getMessageBody())) {
                byte[] data = ProtoStuffSerializeUtil.serialize(in.getMessageBody());
                out.writeInt(data.length);
                out.writeBytes(data);
            }
        }
    

    ServerHandle

    • 修改服务端执行器消息实体类型为新定义的RpcMessage
    
    public class RpcServerInvoker extends AbstractInvoker<RpcMessage> 
    
    • 修改服务端回调

    从服务端方法获取到返回的结果后,重新封装成消息对象(RpcMessage)发送给客户端。

    
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage message) {
    
            this.executor.execute(new Runnable() {
                @Override
                public void run() {
                    RpcInvoker rpcInvoker=RpcServerInvoker.this.buildInvokerChain(RpcServerInvoker.this);
                    RpcResponse response=(RpcResponse) rpcInvoker.invoke(RpcServerInvoker.this.buildRpcInvocation((RpcRequest) message.getMessageBody()));
                    RpcMessage responseMessage=new RpcMessage();
                    byte[] data = ProtoStuffSerializeUtil.serialize(response);
                    RpcMessageHeader messageHeader=new RpcMessageHeader();
                    messageHeader.setLength(data.length);
                    responseMessage.setMessageHeader(messageHeader);
                    responseMessage.setMessageBody(response);
                    channelHandlerContext.writeAndFlush(responseMessage);
                }
            });
    
        }

    ClientHandle

    • 修改客户端执行器消息实体类型为新定义的RpcMessage
    
    public class RpcClientInvoker extends AbstractInvoker<RpcMessage>
    
    • 修改客户端回调方法

    接收的返回结果修改为RpcMessage,从body属性中获取原来的RpcResponse对象

    
    public void channelRead0(ChannelHandlerContext ctx, RpcMessage message) {
            RpcResponse response=(RpcResponse) message.getMessageBody();
            String requestId = response.getRequestId();
            ResponseFuture responseFuture = pendingRPC.get(requestId);
            if (responseFuture != null) {
                pendingRPC.remove(requestId);
                responseFuture.done(response);
            }
        }
    
    • 修改发送请求的消息对象,组装成RpcMessage发送
    
    public ResponseFuture invoke(RpcInvocation invocation) {
            RpcRequest request=this.getRpcRequest();
            ResponseFuture responseFuture = new ResponseFuture(request);
            pendingRPC.put(request.getRequestId(), responseFuture);
            RpcMessage message=new RpcMessage();
            byte[] data = ProtoStuffSerializeUtil.serialize(request);
            RpcMessageHeader messageHeader=new RpcMessageHeader();
            messageHeader.setLength(data.length);
            message.setMessageHeader(messageHeader);
            message.setMessageBody(request);
            channel.writeAndFlush(message);
            return responseFuture;
        }

    本文源码

    https://github.com/jiangmin168168/jim-framework

    文中代码是依赖上述项目的,如果有不明白的可下载源码

    引用

    • 文中插图来自来网络
    • 文中的思路参考了Netty权威指南

  • 相关阅读:
    java中Executor、ExecutorService、ThreadPoolExecutor介绍
    JAVA多线程实现的四种方式
    JVM内存结构
    Synchronized修饰静态变量和普通变量的区别
    tcpkill工作原理分析
    数据库路由中间件MyCat
    数据库路由中间件MyCat
    数据库路由中间件MyCat
    数据库路由中间件MyCat
    数据库路由中间件MyCat
  • 原文地址:https://www.cnblogs.com/ASPNET2008/p/7588822.html
Copyright © 2011-2022 走看看