TCP协议以流的方式进行数据传输,它无法理解其上层协议的数据意义,而是根据TCP缓冲区的大小对数据进行拆分或组装,即上层一个完整的包可能被拆为几个TCP包来发送,或上层几个包会被组合为一个TCP包发送,这就是TCP的粘包和拆包问题。TCP协议按照自己的工作方式工作,包的拆分和组装问题就需要上层应用来解决,比如每次发送定长的数据包,数据包中包含数据的大小或在包结尾处增加标记等方法。
Netty提供了多种解码器如LineBasedFrameDecoder和StringDecoder等来解决这个问题,只需要在pipeline中加入这些解码器即可,如:
--
new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024)); socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new TimeClientHandler()); } }
--
StringDecoder 作用是将接受到的对象转换为字符串,然后继续调用后面的handler。
LineBasedFrameDecoder 以换号符为结束标记的解码器,它依次遍历ByteBuf中的可读字节,如果碰到 或 则设置为结束。参数为最大长度,如果读到最大长度还是没有换号则会抛出异常: Unexpected exception from downstream : frame length (16) exceeds the allowed maximum (10)
DelimiterBasedFrameDecoder 是以分隔符作为结束标志的解码器,并且解码时会自动去掉分隔符
FixedLengthFrameDecoder 是用于定长消息的解码器。
下面是一个简单的例子,演示几种decoder的用法:
EchoServer
package com.luangeng.netty.echo; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; /** * Created by LG on 2017/11/20. */ public class EchoServer { public static void main(String[] args) { new EchoServer().bind(8080); } public void bind(int port) { //配置服务端的线程组,一个用于服务端接收客户端连接,另一个进行SocketChannel的网络读写 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //ServerBootstrap启动NIO服务端的辅助启动类 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer("$".getBytes()); //ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new FixedLengthFrameDecoder(25)); //ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new EchoServerHandler()); } }); //绑定端口,sync为同步阻塞方法,等待绑定成功,ChannelFuture用于异步操作的通知回调 ChannelFuture future = bootstrap.bind(port).sync(); System.out.println("server started"); //等待服务端监听端口关闭 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("server shuting down"); //释放线程资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
---
EchoServerHandler
package com.luangeng.netty.echo; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * Created by LG on 2017/11/20. */ public class EchoServerHandler extends ChannelInboundHandlerAdapter { int count = 0; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String)msg; System.out.println("The server received("+count+++"): " + body); body = body + "$"; ByteBuf response = Unpooled.copiedBuffer(body.getBytes()); ctx.writeAndFlush(response);//异步发送 } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("Unexpected exception from downstream : " + cause.getMessage()); ctx.close(); } }
---
EchoClient
package com.luangeng.netty.echo; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.codec.string.StringDecoder; /** * Created by LG on 2017/11/20. */ public class EchoClient { public static void main(String[] args) { new EchoClient().connect("127.0.0.1", 8080); } public void connect(String host, int port) { //配置客户端NIO线程组 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer("$".getBytes()); //ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new FixedLengthFrameDecoder(25)); //ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new EchoClientHandler()); } }); //发起异步连接操作,同步等待连接成功 ChannelFuture future = bootstrap.connect(host, port).sync(); System.out.println("client started"); //等待客户端链路关闭 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("client shuting down"); //释放NIO线程组 group.shutdownGracefully(); } } }
---
EchoClientHandler
package com.luangeng.netty.echo; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * Created by LG on 2017/11/20. */ public class EchoServerHandler extends ChannelInboundHandlerAdapter { int count = 0; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String)msg; System.out.println("The server received("+count+++"): " + body); body = body + ""; ByteBuf response = Unpooled.copiedBuffer(body.getBytes()); ctx.writeAndFlush(response);//异步发送 } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("Unexpected exception from downstream : " + cause.getMessage()); ctx.close(); } }
---
执行结果:
Server:
2017-11-20 22:21:35,508 INFO [io.netty.handler.logging.LoggingHandler] - [id: 0x9b011f21] REGISTERED
2017-11-20 22:21:35,510 INFO [io.netty.handler.logging.LoggingHandler] - [id: 0x9b011f21] BIND: 0.0.0.0/0.0.0.0:8080
server started
2017-11-20 22:21:35,512 INFO [io.netty.handler.logging.LoggingHandler] - [id: 0x9b011f21, L:/0:0:0:0:0:0:0:0:8080] ACTIVE
2017-11-20 22:21:39,919 INFO [io.netty.handler.logging.LoggingHandler] - [id: 0x9b011f21, L:/0:0:0:0:0:0:0:0:8080] READ: [id: 0x09644a50, L:/127.0.0.1:8080 - R:/127.0.0.1:52039]
2017-11-20 22:21:39,920 INFO [io.netty.handler.logging.LoggingHandler] - [id: 0x9b011f21, L:/0:0:0:0:0:0:0:0:8080] READ COMPLETE
The server received(0): Hi,my name is luangeng
The server received(1): Hi,my name is luangeng
The server received(2): Hi,my name is luangeng
The server received(3): Hi,my name is luangeng
The server received(4): Hi,my name is luangeng
The server received(5): Hi,my name is luangeng
The server received(6): Hi,my name is luangeng
The server received(7): Hi,my name is luangeng
The server received(8): Hi,my name is luangeng
The server received(9): Hi,my name is luangeng
Client:
client started
0 client get: Hi,my name is luangeng
1 client get: Hi,my name is luangeng
2 client get: Hi,my name is luangeng
3 client get: Hi,my name is luangeng
4 client get: Hi,my name is luangeng
5 client get: Hi,my name is luangeng
6 client get: Hi,my name is luangeng
7 client get: Hi,my name is luangeng
8 client get: Hi,my name is luangeng
9 client get: Hi,my name is luangeng
end