1、什么是粘包/拆包
一般所谓的TCP粘包是在一次接收数据不能完全地体现一个完整的消息数据。TCP通讯为何存在粘包呢?主要原因是TCP是以流的方式来处理数据,再加上网络上MTU的往往小于在应用处理的消息数据,所以就会引发一次接收的数据无法满足消息的需要,导致粘包的存在。处理粘包的唯一方法就是制定应用层的数据通讯协议,通过协议来规范现有接收的数据是否满足消息数据的需要。
2、解决办法
2.1、消息定长,报文大小固定长度,不够空格补全,发送和接收方遵循相同的约定,这样即使粘包了通过接收方编程实现获取定长报文也能区分。
2.2、包尾添加特殊分隔符,例如每条报文结束都添加回车换行符(例如FTP协议)或者指定特殊字符作为报文分隔符,接收方通过特殊分隔符切分报文区分。
2.3、将消息分为消息头和消息体,消息头中包含表示信息的总长度(或者消息体长度)的字段
3、自定义协议,来实现TCP的粘包/拆包问题
3.0 自定义协议,开始标记
3.1 自定义协议的介绍
3.2 自定义协议的类的封装
3.3 自定义协议的编码器
3.4 自定义协议的解码器
4、协议相关的实现
4.1 协议的封装
- import java.util.Arrays;
- /**
- * <pre>
- * 自己定义的协议
- * 数据包格式
- * +——----——+——-----——+——----——+
- * |协议开始标志| 长度 | 数据 |
- * +——----——+——-----——+——----——+
- * 1.协议开始标志head_data,为int类型的数据,16进制表示为0X76
- * 2.传输数据的长度contentLength,int类型
- * 3.要传输的数据
- * </pre>
- */
- public class SmartCarProtocol {
- /**
- * 消息的开头的信息标志
- */
- private int head_data = ConstantValue.HEAD_DATA;
- /**
- * 消息的长度
- */
- private int contentLength;
- /**
- * 消息的内容
- */
- private byte[] content;
- /**
- * 用于初始化,SmartCarProtocol
- *
- * @param contentLength
- * 协议里面,消息数据的长度
- * @param content
- * 协议里面,消息的数据
- */
- public SmartCarProtocol(int contentLength, byte[] content) {
- this.contentLength = contentLength;
- this.content = content;
- }
- public int getHead_data() {
- return head_data;
- }
- public int getContentLength() {
- return contentLength;
- }
- public void setContentLength(int contentLength) {
- this.contentLength = contentLength;
- }
- public byte[] getContent() {
- return content;
- }
- public void setContent(byte[] content) {
- this.content = content;
- }
- @Override
- public String toString() {
- return "SmartCarProtocol [head_data=" + head_data + ", contentLength="
- + contentLength + ", content=" + Arrays.toString(content) + "]";
- }
- }
4.2 协议的编码器
- /**
- * <pre>
- * 自己定义的协议
- * 数据包格式
- * +——----——+——-----——+——----——+
- * |协议开始标志| 长度 | 数据 |
- * +——----——+——-----——+——----——+
- * 1.协议开始标志head_data,为int类型的数据,16进制表示为0X76
- * 2.传输数据的长度contentLength,int类型
- * 3.要传输的数据
- * </pre>
- */
- public class SmartCarEncoder extends MessageToByteEncoder<SmartCarProtocol> {
- @Override
- protected void encode(ChannelHandlerContext tcx, SmartCarProtocol msg,
- ByteBuf out) throws Exception {
- // 写入消息SmartCar的具体内容
- // 1.写入消息的开头的信息标志(int类型)
- out.writeInt(msg.getHead_data());
- // 2.写入消息的长度(int 类型)
- out.writeInt(msg.getContentLength());
- // 3.写入消息的内容(byte[]类型)
- out.writeBytes(msg.getContent());
- }
- }
4.3 协议的解码器
- import java.util.List;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.ByteToMessageDecoder;
- /**
- * <pre>
- * 自己定义的协议
- * 数据包格式
- * +——----——+——-----——+——----——+
- * |协议开始标志| 长度 | 数据 |
- * +——----——+——-----——+——----——+
- * 1.协议开始标志head_data,为int类型的数据,16进制表示为0X76
- * 2.传输数据的长度contentLength,int类型
- * 3.要传输的数据,长度不应该超过2048,防止socket流的攻击
- * </pre>
- */
- public class SmartCarDecoder extends ByteToMessageDecoder {
- /**
- * <pre>
- * 协议开始的标准head_data,int类型,占据4个字节.
- * 表示数据的长度contentLength,int类型,占据4个字节.
- * </pre>
- */
- public final int BASE_LENGTH = 4 + 4;
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf buffer,
- List<Object> out) throws Exception {
- // 可读长度必须大于基本长度
- if (buffer.readableBytes() >= BASE_LENGTH) {
- // 防止socket字节流攻击
- // 防止,客户端传来的数据过大
- // 因为,太大的数据,是不合理的
- if (buffer.readableBytes() > 2048) {
- buffer.skipBytes(buffer.readableBytes());
- }
- // 记录包头开始的index
- int beginReader;
- while (true) {
- // 获取包头开始的index
- beginReader = buffer.readerIndex();
- // 标记包头开始的index
- buffer.markReaderIndex();
- // 读到了协议的开始标志,结束while循环
- if (buffer.readInt() == ConstantValue.HEAD_DATA) {
- break;
- }
- // 未读到包头,略过一个字节
- // 每次略过,一个字节,去读取,包头信息的开始标记
- buffer.resetReaderIndex();
- buffer.readByte();
- // 当略过,一个字节之后,
- // 数据包的长度,又变得不满足
- // 此时,应该结束。等待后面的数据到达
- if (buffer.readableBytes() < BASE_LENGTH) {
- return;
- }
- }
- // 消息的长度
- int length = buffer.readInt();
- // 判断请求数据包数据是否到齐
- if (buffer.readableBytes() < length) {
- // 还原读指针
- buffer.readerIndex(beginReader);
- return;
- }
- // 读取data数据
- byte[] data = new byte[length];
- buffer.readBytes(data);
- SmartCarProtocol protocol = new SmartCarProtocol(data.length, data);
- out.add(protocol);
- }
- }
- }
4.4 服务端加入协议的编/解码器
4.5 客户端加入协议的编/解码器
5、服务端的实现
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.logging.LogLevel;
- import io.netty.handler.logging.LoggingHandler;
- public class Server {
- public Server() {
- }
- public void bind(int port) throws Exception {
- // 配置NIO线程组
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- // 服务器辅助启动类配置
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .handler(new LoggingHandler(LogLevel.INFO))
- .childHandler(new ChildChannelHandler())//
- .option(ChannelOption.SO_BACKLOG, 1024) // 设置tcp缓冲区 // (5)
- .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
- // 绑定端口 同步等待绑定成功
- ChannelFuture f = b.bind(port).sync(); // (7)
- // 等到服务端监听端口关闭
- f.channel().closeFuture().sync();
- } finally {
- // 优雅释放线程资源
- workerGroup.shutdownGracefully();
- bossGroup.shutdownGracefully();
- }
- }
- /**
- * 网络事件处理器
- */
- private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- // 添加自定义协议的编解码工具
- ch.pipeline().addLast(new SmartCarEncoder());
- ch.pipeline().addLast(new SmartCarDecoder());
- // 处理网络IO
- ch.pipeline().addLast(new ServerHandler());
- }
- }
- public static void main(String[] args) throws Exception {
- new Server().bind(9999);
- }
- }
6、服务端Handler的实现
- import io.netty.channel.ChannelHandlerAdapter;
- import io.netty.channel.ChannelHandlerContext;
- public class ServerHandler extends ChannelHandlerAdapter {
- // 用于获取客户端发送的信息
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg)
- throws Exception {
- // 用于获取客户端发来的数据信息
- SmartCarProtocol body = (SmartCarProtocol) msg;
- System.out.println("Server接受的客户端的信息 :" + body.toString());
- // 会写数据给客户端
- String str = "Hi I am Server ...";
- SmartCarProtocol response = new SmartCarProtocol(str.getBytes().length,
- str.getBytes());
- // 当服务端完成写操作后,关闭与客户端的连接
- ctx.writeAndFlush(response);
- // .addListener(ChannelFutureListener.CLOSE);
- // 当有写操作时,不需要手动释放msg的引用
- // 当只有读操作时,才需要手动释放msg的引用
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
- throws Exception {
- // cause.printStackTrace();
- ctx.close();
- }
- }
7、客户端的实现
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
- public class Client {
- /**
- * 连接服务器
- *
- * @param port
- * @param host
- * @throws Exception
- */
- public void connect(int port, String host) throws Exception {
- // 配置客户端NIO线程组
- EventLoopGroup group = new NioEventLoopGroup();
- try {
- // 客户端辅助启动类 对客户端配置
- Bootstrap b = new Bootstrap();
- b.group(group)//
- .channel(NioSocketChannel.class)//
- .option(ChannelOption.TCP_NODELAY, true)//
- .handler(new MyChannelHandler());//
- // 异步链接服务器 同步等待链接成功
- ChannelFuture f = b.connect(host, port).sync();
- // 等待链接关闭
- f.channel().closeFuture().sync();
- } finally {
- group.shutdownGracefully();
- System.out.println("客户端优雅的释放了线程资源...");
- }
- }
- /**
- * 网络事件处理器
- */
- private class MyChannelHandler extends ChannelInitializer<SocketChannel> {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- // 添加自定义协议的编解码工具
- ch.pipeline().addLast(new SmartCarEncoder());
- ch.pipeline().addLast(new SmartCarDecoder());
- // 处理网络IO
- ch.pipeline().addLast(new ClientHandler());
- }
- }
- public static void main(String[] args) throws Exception {
- new Client().connect(9999, "127.0.0.1");
- }
- }
8、客户端Handler的实现
- import io.netty.channel.ChannelHandlerAdapter;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.util.ReferenceCountUtil;
- //用于读取客户端发来的信息
- public class ClientHandler extends ChannelHandlerAdapter {
- // 客户端与服务端,连接成功的售后
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- // 发送SmartCar协议的消息
- // 要发送的信息
- String data = "I am client ...";
- // 获得要发送信息的字节数组
- byte[] content = data.getBytes();
- // 要发送信息的长度
- int contentLength = content.length;
- SmartCarProtocol protocol = new SmartCarProtocol(contentLength, content);
- ctx.writeAndFlush(protocol);
- }
- // 只是读数据,没有写数据的话
- // 需要自己手动的释放的消息
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg)
- throws Exception {
- try {
- // 用于获取客户端发来的数据信息
- SmartCarProtocol body = (SmartCarProtocol) msg;
- System.out.println("Client接受的客户端的信息 :" + body.toString());
- } finally {
- ReferenceCountUtil.release(msg);
- }
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
- throws Exception {
- ctx.close();
- }
- }
9、参考的博客地址
- http://www.cnblogs.com/whthomas/p/netty-custom-protocol.html
- http://www.cnblogs.com/fanguangdexiaoyuer/p/6131042.html