粘包和分包出现的原因是:没有一个稳定数据结构
解决办法: 分割符
长度 + 数据
* <pre>
* 数据包格式
* +——----——+——-----——+——----——+——----——+——-----——+
* | 包头 | 模块号 | 命令号 | 长度 | 数据 |
* +——----——+——-----——+——----——+——----——+——-----——+
* </pre>
* 包头4字节
* 模块号2字节short
* 命令号2字节short
* 长度4字节(描述数据部分字节长度)
创建encoder 和 decoder 分别 加入pipeline 中
public class RpcDecoder extends ByteToMessageDecoder { private Class<?> genericClass; public RpcDecoder(Class<?> genericClass) { this.genericClass = genericClass; } @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() < 4) { return; } in.markReaderIndex(); int dataLength = in.readInt(); if (dataLength < 0) { ctx.close(); } if (in.readableBytes() < dataLength) { in.resetReaderIndex(); return; } byte[] data = new byte[dataLength]; in.readBytes(data); Object obj = SerializationUtil.deserialize(data, genericClass); out.add(obj); } }
public class RpcEncoder extends MessageToByteEncoder { private Class<?> genericClass; public RpcEncoder(Class<?> genericClass) { this.genericClass = genericClass; } @Override public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception { if (genericClass.isInstance(in)) { byte[] data = SerializationUtil.serialize(in); out.writeInt(data.length); out.writeBytes(data); } } }
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() >= BASE_LENGTH) { //防止socket字节流攻击 if (in.readableBytes() > 2048) { in.skipBytes(in.readableBytes()); } //记录包头开始的index int beginReader; //读取包头 while (true) { beginReader = in.readerIndex(); in.markReaderIndex(); if (in.readInt() == Constantvalue.FLAG) { break; } //未读到包头, 略过一个字节 in.resetReaderIndex(); in.readByte(); //长度又变得不满足 if (in.readableBytes() < BASE_LENGTH) { return; } } } // 模块号 short module = in.readShort(); //命令好 short cmd = in.readShort(); // 长度 int dataLength = in.readInt(); if (in.readableBytes() < dataLength) { //还原读指针 in.resetReaderIndex(); return; } byte[] data = new byte[dataLength]; in.readBytes(data); Request request = new Request(); request.setModule(module); request.setCmd(cmd); request.setData(data); //继续往下传递 out.add(request); }
buffer里面数据未被读取完怎么办
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { while (in.isReadable()) { int outSize = out.size(); int oldInputLength = in.readableBytes(); decode(ctx, in, out); // Check if this handler was removed before continuing the loop. // If it was removed, it is not safe to continue to operate on the buffer. // // See https://github.com/netty/netty/issues/1664 if (ctx.isRemoved()) { break; } if (outSize == out.size()) { // 这里会对照长度 先判断读到东西了没有, 没有跳出 if (oldInputLength == in.readableBytes()) { // 读取位置变化没 break; } else { continue; } } if (oldInputLength == in.readableBytes()) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } if (isSingleDecode()) { break; } }
数据缓存在 cumulation中
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { RecyclableArrayList out = RecyclableArrayList.newInstance(); try { ByteBuf data = (ByteBuf) msg; first = cumulation == null; //第一次请求 cumulation 为 null true if (first) { cumulation = data; } else { cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); // 第二次请求时进入 将新的信息追加到cumulation后面 } callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { if (cumulation != null && !cumulation.isReadable()) { cumulation.release(); cumulation = null; } int size = out.size(); for (int i = 0; i < size; i ++) { ctx.fireChannelRead(out.get(i)); } out.recycle(); } } else { ctx.fireChannelRead(msg); } }