zoukankan      html  css  js  c++  java
  • TCP拆包粘包之分隔符解码器

    TCP以流的方式进行数据传输,上层的应用协议为了对消息进行区分,往往采用如下4种方式。

    (1)消息长度固定,累计读取到长度总和为定长LEN的报文后,就认为读取到了一个完整的消息;将计数器置位,重新开始读取下一个数据报;

    (2)将回车换行符作为消息结束符,例如FTP协议,这种方式在文本协议中应用比较广泛;

    (3)将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符;

    (4)通过在消息头中定义长度字段来标识消息的总长度。

    Netty对上面四种应用做了统一的抽象,提供了4种解码器来解决对应的问题,使用起来非常方便。有了这些解码器,用户不需要自己对读取的报文进行人工解码,也不需要考虑TCP的粘包和拆包。

    两种实用的解码器——DelimiterBasedFrameDecoderFixedLengthFrameDecoder,前者可以自动完成以分隔符做结束标志的消息的解码,后者可以自动完成对定长消息的解码,它们都能解决TCP粘包/拆包导致的读半包问题。

    DelimiterBasedFrameDecoder应用开发

    演示程序以经典的Echo服务为例。EchoServer接收到EchoClient的请求消息后,将其打印出来,然后将原始消息返回给客户端,消息以“$_”作为分隔符。

    服务端示例:

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    
    public class EchoServer {
    
        public void bind(int port) throws Exception {
    // 配置服务端的NIO线程组
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 100)
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new ChildChannelHandler());
                // 绑定端口,同步等待成功
                ChannelFuture f = b.bind(port).sync();
    
                // 等待服务端监听端口关闭
                f.channel().closeFuture().sync();
            } finally {
                // 优雅退出,释放线程池资源
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        private class ChildChannelHandler extends ChannelInitializer {
            @Override
            protected void initChannel(Channel arg0) throws Exception {
                //首先创建分隔符缓冲对象ByteBuf,本例程中使用“$_”作为分隔符。
                ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
                //创建DelimiterBasedFrameDecoder对象,将其加入到ChannelPipeline中。
                //DelimiterBasedFrameDecoder有多个构造方法,这里我们传递两个参数,
                //第一个1024表示单条消息的最大长度,当达到该长度后仍然没有查找到分隔符,
                //就抛出TooLongFrame Exception异常,防止由于异常码流缺失分隔符导致的内存溢出,
                //这是Netty解码器的可靠性保护;第二个参数就是分隔符缓冲对象。
                arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
                arg0.pipeline().addLast(new StringDecoder());
                arg0.pipeline().addLast(new EchoServerHandler());
            }
        }
    
        public static void main(String[] args) throws Exception {
            int port = 8080;
            if (args != null && args.length > 0) {
                try {
                    port = Integer.valueOf(args[0]);
                } catch (NumberFormatException e) {
                    // 采用默认值
                }
            }
            new EchoServer().bind(port);
        }
    }
    
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    @ChannelHandler.Sharable
    public class EchoServerHandler extends ChannelHandlerAdapter {
    
        int counter = 0;
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
            //直接将接收的消息打印出来,由于DelimiterBasedFrameDecoder自动对请求消息进行了解码
            //后续的ChannelHandler接收到的msg对象就是个完整的消息包;
            //第二个ChannelHandler是StringDecoder,它将ByteBuf解码成字符串对象
            //第三个EchoServerHandler接收到的msg消息就是解码后的字符串对象。
            String body = (String) msg;
            System.out.println("This is " + ++counter + " times receive client : ["+ body + "]");
            body += "$_";
            //由于我们设置DelimiterBasedFrameDecoder过滤掉了分隔符,
            //所以,返回给客户端时需要在请求消息尾部拼接分隔符“$_”,
            //最后创建ByteBuf,将原始消息重新返回给客户端。
            ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
            ctx.writeAndFlush(echo);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();// 发生异常,关闭链路
        }
    }

    客户端示例:

    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class EchoClient {
    
        public void connect(int port, String host) throws Exception {
            // 配置客户端NIO线程组
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer() {
                            @Override
                            public void initChannel(Channel ch)
                                    throws Exception {
                                ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
                                ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
                                ch.pipeline().addLast(new StringDecoder());
                                ch.pipeline().addLast(new EchoClientHandler());
                            }
                        });
    
                // 发起异步连接操作
                ChannelFuture f = b.connect(host, port).sync();
    
                // 等待客户端链路关闭
                f.channel().closeFuture().sync();
            } finally {
                // 优雅退出,释放NIO线程组
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            int port = 8080;
            if (args != null && args.length > 0) {
                try {
                    port = Integer.valueOf(args[0]);
                } catch (NumberFormatException e) {
                    // 采用默认值
                }
            }
            new EchoClient().connect(port, "127.0.0.1");
        }
    }
    
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class EchoClientHandler extends ChannelHandlerAdapter {
    
        private int counter;
    
        static final String ECHO_REQ = "Hi, Netty. Welcome to Netty.$_";
    
        public EchoClientHandler() {
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            for (int i = 0; i < 10; i++) {
                ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
            }
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
            System.out.println("This is " + ++counter + " times receive server : ["
                    + msg + "]");
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }

    运行结果:

    服务端

    This is 1 times receive client : [Hi, Netty. Welcome to Netty.]
    ...............................
    This is 10 times receive client : [Hi, Netty. Welcome to Netty.]

    客户端

    This is 1 times receive client : [Hi, Netty. Welcome to Netty.]
    ...............................
    This is 10 times receive client : [Hi, Netty. Welcome to Netty.]

    FixedLengthFrameDecoder应用开发

    FixedLengthFrameDecoder是固定长度解码器,它能够按照指定的长度对消息进行自动解码,开发者不需要考虑TCP的粘包/拆包问题,非常实用。

    利用FixedLengthFrameDecoder解码器,无论一次接收到多少数据报,它都会按照构造函数中设置的固定长度进行解码,如果是半包消息,FixedLengthFrameDecoder会缓存半包消息并等待下个包到达后进行拼包,直到读取到一个完整的包。

    服务端示例:

    在服务端的ChannelPipeline中新增FixedLengthFrameDecoder,长度设置为20,然后再依次增加字符串解码器和EchoServerHandler

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.FixedLengthFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    
    public class EchoServer {
        public void bind(int port) throws Exception {
            // 配置服务端的NIO线程组
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 100)
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new ChannelInitializer() {
                            @Override
                            public void initChannel(Channel ch)throws Exception {
                                ch.pipeline().addLast( new FixedLengthFrameDecoder(20));
                                ch.pipeline().addLast(new StringDecoder());
                                ch.pipeline().addLast(new EchoServerHandler());
                            }
                        });
    
                // 绑定端口,同步等待成功
                ChannelFuture f = b.bind(port).sync();
    
                // 等待服务端监听端口关闭
                f.channel().closeFuture().sync();
            } finally {
                // 优雅退出,释放线程池资源
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            int port = 8080;
            if (args != null && args.length > 0) {
                try {
                    port = Integer.valueOf(args[0]);
                } catch (NumberFormatException e) {
                    // 采用默认值
                }
            }
            new EchoServer().bind(port);
        }
    }
    
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    @ChannelHandler.Sharable
    public class EchoServerHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("Receive client : [" + msg + "]");
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();// 发生异常,关闭链路
        }
    }

    客户端示例:

    1. telnet localhost 8080
    2. 通过set localecho命令打开本地回显功能
    ➜  zcy-fixed git:(feature/transaction) ✗ telnet localhost 8080
    Trying ::1...
    Connected to localhost.
    Escape character is '^]'.
    1234567890123456789012
    123456789012345678
    
    //连接日志 10:10:59.755 [nioEventLoopGroup-2-1] INFO i.n.handler.logging.LoggingHandler - [id: 0x979fc7a5, /0:0:0:0:0:0:0:0:8080] RECEIVED: [id: 0x79dc7a78, /0:0:0:0:0:0:0:1:50749 => /0:0:0:0:0:0:0:1:8080] Receive client : [12345678901234567890] Receive client : [12 1234567890123456]
  • 相关阅读:
    UVA 11174 Stand in a Line,UVA 1436 Counting heaps —— (组合数的好题)
    UVA 1393 Highways,UVA 12075 Counting Triangles —— (组合数,dp)
    【Same Tree】cpp
    【Recover Binary Search Tree】cpp
    【Binary Tree Zigzag Level Order Traversal】cpp
    【Binary Tree Level Order Traversal II 】cpp
    【Binary Tree Level Order Traversal】cpp
    【Binary Tree Post order Traversal】cpp
    【Binary Tree Inorder Traversal】cpp
    【Binary Tree Preorder Traversal】cpp
  • 原文地址:https://www.cnblogs.com/wade-luffy/p/6168712.html
Copyright © 2011-2022 走看看