首先《Netty权威指南》私有协议开发那一章的样例代码是编译不通过的(但是这丝毫不影响本书的价值)
处理方案可以参考:http://www.itnose.net/detail/6112870.html
另外这一章的私有协议开发案例也过于理想化,为什么这么说呢?如果说服务端或客户端有一方基于历史原因是用其他语言实现的呢,比如C,Java和C的int类型字节长度是不一样的
那LineBasedFrameDecoder就用不上了,让我们看一个实际的私有协议案例:
这个协议在C/++程序员看来是个再正常不过的了,尤其注意协议对长度字段是采用字符串方式描述的(最多支持9999),如果用Netty来实现,又该如何处理呢?
public class NettyMessage { private Header header; private Object body; //检验和 private byte crcCode; public byte getCrcCode() { return crcCode; } public void setCrcCode(byte crcCode) { this.crcCode = crcCode; } public Header getHeader() { return header; } public void setHeader(Header header) { this.header = header; } public Object getBody() { return body; } public void setBody(Object body) { this.body = body; } public String toString(){ return ToStringBuilder.reflectionToString(this); } }
public class Header { //固定头 private byte startTag; //命令码,4位 private byte[] cmdCode; //版本 2位 private byte[] version; private int length; public byte[] getVersion() { return version; } public void setVersion(byte[] version) { this.version = version; } public byte[] getCmdCode() { return cmdCode; } public void setCmdCode(byte[] cmdCode) { this.cmdCode = cmdCode; } public byte getStartTag() { return startTag; } public void setStartTag(byte startTag) { this.startTag = startTag; } public int getLength() { return length; } public void setLength(int length) { this.length = length; } public String toString(){ return ToStringBuilder.reflectionToString(this); } }
public class MessageEncoder extends MessageToByteEncoder<NettyMessage>{ @Override protected void encode(ChannelHandlerContext ctx, NettyMessage msg, ByteBuf out) throws Exception { try{ if(msg == null || msg.getHeader() == null){ throw new Exception("The encode message is null"); } out.writeByte(msg.getHeader().getStartTag()); out.writeBytes(msg.getHeader().getCmdCode()); //占位 byte[] lengthBytes = new byte[]{0, 0, 0, 0}; out.writeBytes(lengthBytes); out.writeBytes(msg.getHeader().getVersion()); String body = (String)msg.getBody(); int length = 0; if(body != null){ byte[] bodyBytes = body.getBytes(); out.writeBytes(bodyBytes); length = bodyBytes.length; if(Constants.CRCCODE_DEFAULT != msg.getCrcCode()){ msg.setCrcCode(CRC8.calcCrc8(bodyBytes)); } } //长度从int转换为byte[4] byte l1 = getIndexToByte(length, 3); byte l2 = getIndexToByte(length, 2); byte l3 = getIndexToByte(length, 1); byte l4 = getIndexToByte(length, 0); lengthBytes = new byte[]{l1, l2, l3, l4}; out.setBytes(5, lengthBytes); out.writeByte(msg.getCrcCode()); }catch(Exception e){ e.printStackTrace(); throw e; } } public static byte getIndexToByte(int i, int index){ if(index == 0){ return (byte)(i % 10); }else{ int num = (int)Math.pow(10, index); return (byte)((i / num) % 10); } } }
public class MessageDecoder extends ByteToMessageDecoder { protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { try{ if(in.readableBytes() < 12){ return; } in.markReaderIndex(); NettyMessage message = new NettyMessage(); Header header = new Header(); header.setStartTag(in.readByte()); byte[] cmdCode = new byte[4]; in.readBytes(cmdCode); header.setCmdCode(cmdCode); //长度从byte[4]转int byte[] lengthBytes = new byte[4]; in.readBytes(lengthBytes); int length = toInt(lengthBytes); header.setLength(length); if(length < 0 || length > 10240){//过长消息或不合法消息 throw new IllegalArgumentException("wrong message length"); } byte[] version = new byte[2]; in.readBytes(version); header.setVersion(version); if(header.getLength() > 0){ if(in.readableBytes() < length + 1){ in.resetReaderIndex(); return; } byte[] bodyBytes = new byte[header.getLength()]; in.readBytes(bodyBytes); message.setBody(new String(bodyBytes)); } message.setCrcCode(in.readByte()); message.setHeader(header); out.add(message); }catch(Exception e){ e.printStackTrace(); throw e; } } public static int toInt(byte[] bytes){ int value = 0; for(int i=0; i<bytes.length; i++){ int num = (int)Math.pow(10, bytes.length - 1 - i); value += num * bytes[i]; } return value; } }
服务端代码:
public class NettyServer implements Runnable{ private static final Logger logger = LoggerFactory.getLogger(NettyServer.class); @Autowired Config config; public void bind(int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChildChannelHandler()); ChannelFuture f = b.bind(port).sync(); logger.info("Push server started on port " + port); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new MessageDecoder()) .addLast(new MessageEncoder()) } } @Override public void run() { try { this.bind(config.getServerPort()); } catch (Exception e) { logger.error(e.getMessage()); System.exit(1); } } }
客户端代码:
/** * 客户端 * @author peng */ public class NettyClient { public void connect(String remoteServer, int port) throws Exception { EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(workerGroup) .channel(NioSocketChannel.class) .handler(new ChildChannelHandler()); ChannelFuture f = b.connect(remoteServer,port).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } public static class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new MessageDecoder()) .addLast(new MessageEncoder()) } } public static void main(String[] args){ try { new NettyClient().connect("127.0.0.1", 9080); } catch (Exception e) { e.printStackTrace(); } } }
总结:关键在于Encoder和Decoder的编码实现