继上篇博客写了一篇自定义协议之后,总结了下,补充下LengthFieldBasedFrameDecoder 的netty的使用
我们从它的源码解释中可以知道,这种netty 封装的主键,使用的文本格式是
* BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
* +--------+----------------+ +--------+----------------+
* | Length | Actual Content |----->| Length | Actual Content |
* | 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" |
* +--------+----------------+ +--------+----------------+
简单点总结就是,header + length + content ,至于header可能会包含标识头,或者命令字,或者些其他的。length一般是指代content的字节长度,当然这个非必然,有可能length
是整个消息的长度。所以在使用LengthFieldBasedFrameDecoder 的时候,需要注意的是指针偏移量。知道了自己的自定义协议之后,那我们就可以定解码器跟编码器了。直接上代码
自定义协议
public class MyProtocolBean { private byte type; private byte flag; private int length; private String content; public byte getType() { return this.type; } public void setType(byte type) { this.type = type; } public byte getFlag() { return this.flag; } public void setFlag(byte flag) { this.flag = flag; } public int getLength() { return this.length; } public void setLength(int length) { this.length = length; } public String getContent() { return this.content; } public void setContent(String content) { this.content = content; } public MyProtocolBean() { } public MyProtocolBean(byte flag, byte type, int length, String content) { this.flag = flag; this.type = type; this.length = length; this.content = content; } }
解码器MyProtocolDecoder继承LengthFieldBasedFrameDecoder
public class MyProtocolDecoder extends LengthFieldBasedFrameDecoder { private static final InternalLogger log = InternalLoggerFactory.getInstance(MyProtocolDecoder.class); //private static final int HEADER_SIZE = 6; /** * * @param maxFrameLength 帧的最大长度 * @param lengthFieldOffset length字段偏移的地址 * @param lengthFieldLength length字段所占的字节长 * @param lengthAdjustment 修改帧数据长度字段中定义的值,可以为负数 因为有时候我们习惯把头部记入长度,若为负数,则说明要推后多少个字段 * @param initialBytesToStrip 解析时候跳过多少个长度 * @param failFast 为true,当frame长度超过maxFrameLength时立即报TooLongFrameException异常,为false,读取完整个帧再报异 */ public MyProtocolDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) { super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast); } protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { in = (ByteBuf) super.decode(ctx, in); if (in == null) { return null; } int i = in.readableBytes(); System.out.println(i); if (i < 6) { log.error("字节数不足"); throw new Exception("字节数不足"); } byte type = in.readByte(); byte flag = in.readByte(); int length = in.readInt(); int j = in.readableBytes(); if (j != length) { log.error("标记的长度不符合实际长度"); throw new Exception("标记的长度不符合实际长度"); } byte[] bytes = new byte[in.readableBytes()]; log.info("数据包长度:"+bytes.length); in.readBytes(bytes); String s = new String(bytes, "UTF-8"); log.info("字节转UTF-8字符串:"+s); s = URLDecoder.decode(s, "UTF-8"); log.info("字符串URLDecoder.decode UTF-8字符串:"+s); //JSONObject parse = JSON.parseObject(s); //String msg = (String) parse.get("msg"); return new MyProtocolBean(type, flag, length, s); } }
编码器MyProtocolEncoder继承MessageToByteEncoder
public class MyProtocolEncoder extends MessageToByteEncoder<MyProtocolBean> { private static final InternalLogger log = InternalLoggerFactory.getInstance(MyProtocolEncoder.class); protected void encode(ChannelHandlerContext ctx, MyProtocolBean msg, ByteBuf out) throws Exception { if (msg == null) { log.error("msg is null "); throw new Exception("msg is null"); } out.writeByte(msg.getType()); out.writeByte(msg.getFlag()); out.writeInt(msg.getLength()); out.writeBytes(msg.getContent().getBytes(Charset.forName("UTF-8"))); } }
netty启动,这边需要注意的就是传值了
maxFrameLength: 帧的最大长度
lengthFieldOffset length: 字段偏移的地址
lengthFieldLength length;字段所占的字节长
lengthAdjustment: 修改帧数据长度字段中定义的值,可以为负数 因为有时候我们习惯把头部记入长度,若为负数,则说明要推后多少个字段
initialBytesToStrip: 解析时候跳过多少个长度
failFast; 为true,当frame长度超过maxFrameLength时立即报TooLongFrameException异常,为false,读取完整个帧再报异
lengthFieldOffset length: 字段偏移的地址
lengthFieldLength length;字段所占的字节长
lengthAdjustment: 修改帧数据长度字段中定义的值,可以为负数 因为有时候我们习惯把头部记入长度,若为负数,则说明要推后多少个字段
initialBytesToStrip: 解析时候跳过多少个长度
failFast; 为true,当frame长度超过maxFrameLength时立即报TooLongFrameException异常,为false,读取完整个帧再报异
public class NettyServer { private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyServer.class); EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(4); private ChannelFuture sync; public void startServer(final int port) { Thread thread = new Thread(new Runnable() { public void run() { try { ServerBootstrap serverBootstrap = new ServerBootstrap(); ((ServerBootstrap) ((ServerBootstrap) serverBootstrap .group(NettyServer.this.bossGroup, NettyServer.this.workerGroup) .channel(NioServerSocketChannel.class)).localAddress(port)) .childHandler(new ChannelInitializer() { protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ChannelHandler[] { new IdleStateHandler(60, 50, 50) }); pipeline.addLast(new ChannelHandler[] {
//最大的数据长度,长度字段所占的长度,长度偏移量, new MyProtocolDecoder(1048576, 2, 4, 0, 0, false) }); pipeline.addLast(new ChannelHandler[] { new MyProtocolEncoder() }); pipeline.addLast(new ChannelHandler[] { new Server3Handler() }); } }); log.info("==================NettyServer port :"+port+"=========="); NettyServer.this.sync = serverBootstrap.bind().sync(); NettyServer.this.sync.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { NettyServer.this.bossGroup.shutdownGracefully(); NettyServer.this.workerGroup.shutdownGracefully(); } } }); thread.start(); } public void stopServer() throws Exception { try { this.sync.channel().close(); this.sync.channel().closeFuture().sync(); log.info("==================stopServer =========="); this.bossGroup.shutdownGracefully(); this.workerGroup.shutdownGracefully(); } finally { this.bossGroup.shutdownGracefully(); this.workerGroup.shutdownGracefully(); } } }