zoukankan      html  css  js  c++  java
  • Netty自定义数据包协议

    粘包和分包出现的原因是:没有一个稳定数据结构

    解决办法: 分割符

         长度 + 数据

    * <pre>  
    * 数据包格式
    * +——----——+——-----——+——----——+——----——+——-----——+
    * | 包头 | 模块号 | 命令号 | 长度 | 数据 |
    * +——----——+——-----——+——----——+——----——+——-----——+
    * </pre>
    * 包头4字节
    * 模块号2字节short
    * 命令号2字节short
    * 长度4字节(描述数据部分字节长度)

    创建encoder  和 decoder  分别 加入pipeline 中

    public class RpcDecoder extends ByteToMessageDecoder {
    
        private Class<?> genericClass;
    
        public RpcDecoder(Class<?> genericClass) {
            this.genericClass = genericClass;
        }
    
        @Override
        public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            if (in.readableBytes() < 4) {
                return;
            }
            in.markReaderIndex();
            int dataLength = in.readInt();
            if (dataLength < 0) {
                ctx.close();
            }
            if (in.readableBytes() < dataLength) {
                in.resetReaderIndex();
                return;
            }
            byte[] data = new byte[dataLength];  
            in.readBytes(data);
    
            Object obj = SerializationUtil.deserialize(data, genericClass);
            out.add(obj);
        }
    }
    public class RpcEncoder extends MessageToByteEncoder {
    
        private Class<?> genericClass;
    
        public RpcEncoder(Class<?> genericClass) {
            this.genericClass = genericClass;
        }
    
        @Override
        public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
            if (genericClass.isInstance(in)) {
                byte[] data = SerializationUtil.serialize(in);
                out.writeInt(data.length);
                out.writeBytes(data);
            }
        }
    }
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            if (in.readableBytes() >= BASE_LENGTH) {
                
                //防止socket字节流攻击
                if (in.readableBytes() > 2048) {
                    in.skipBytes(in.readableBytes());
                }
                
                //记录包头开始的index
                int beginReader;
                
                //读取包头
                while (true) {
                    beginReader = in.readerIndex();
                    in.markReaderIndex();
                    if (in.readInt() == Constantvalue.FLAG) {
                        break;
                    }
                    
                    //未读到包头, 略过一个字节
                    in.resetReaderIndex();
                    in.readByte();
                    
                    //长度又变得不满足
                    if (in.readableBytes() < BASE_LENGTH) {
                        return;
                    }
                }
            }
            
            
                
                // 模块号
                short module  = in.readShort();
                //命令好
                short cmd = in.readShort();
                // 长度
                int dataLength = in.readInt();
                
                if (in.readableBytes() < dataLength) {
                     //还原读指针
                       in.resetReaderIndex();
                       return;
                }
                byte[] data = new byte[dataLength];
                in.readBytes(data);
                Request request = new Request();
                request.setModule(module);
                request.setCmd(cmd);
                request.setData(data);
                //继续往下传递
                out.add(request);
            
        }

    buffer里面数据未被读取完怎么办

    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
            try {
                while (in.isReadable()) {
                    int outSize = out.size();
                    int oldInputLength = in.readableBytes();
                    decode(ctx, in, out);
    
                    // Check if this handler was removed before continuing the loop.
                    // If it was removed, it is not safe to continue to operate on the buffer.
                    //
                    // See https://github.com/netty/netty/issues/1664
                    if (ctx.isRemoved()) {
                        break;
                    }
    
                    if (outSize == out.size()) {  // 这里会对照长度  先判断读到东西了没有, 没有跳出 
                        if (oldInputLength == in.readableBytes()) {   // 读取位置变化没
                            break;
                        } else {
                            continue;
                        }
                    }
    
                    if (oldInputLength == in.readableBytes()) {
                        throw new DecoderException(
                                StringUtil.simpleClassName(getClass()) +
                                ".decode() did not read anything but decoded a message.");
                    }
    
                    if (isSingleDecode()) {
                        break;
                    }
                }

    数据缓存在 cumulation中

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (msg instanceof ByteBuf) {
                RecyclableArrayList out = RecyclableArrayList.newInstance();
                try {
                    ByteBuf data = (ByteBuf) msg;
                    first = cumulation == null;    //第一次请求  cumulation 为 null    true  
                    if (first) {
                        cumulation = data;
                    } else {
                        cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); // 第二次请求时进入 将新的信息追加到cumulation后面
                    }
                    callDecode(ctx, cumulation, out);
                } catch (DecoderException e) {
                    throw e;
                } catch (Throwable t) {
                    throw new DecoderException(t);
                } finally {
                    if (cumulation != null && !cumulation.isReadable()) {
                        cumulation.release();
                        cumulation = null;
                    }
                    int size = out.size();
    
                    for (int i = 0; i < size; i ++) {
                        ctx.fireChannelRead(out.get(i));
                    }
                    out.recycle();
                }
            } else {
                ctx.fireChannelRead(msg);
            }
        }
  • 相关阅读:
    进入全屏 nodejs+express+mysql实现restful风格的增删改查示例
    WebAPI 实现前后端分离
    android 集成支付宝app支付(原生态)-包括android前端与java后台
    Windows 64 位系统下 Python 环境的搭建
    Es6主要特征详解
    js上传图片
    Python socket
    设置windows开机自启某个软件
    oracle导入导出数据
    mysql触发器,答题记录表同步教学跟踪(用户列表)
  • 原文地址:https://www.cnblogs.com/mxz1994/p/9512364.html
Copyright © 2011-2022 走看看