zoukankan      html  css  js  c++  java
  • Netty--TCP粘包和拆包

    https://luangeng.space

    TCP协议以流的方式进行数据传输,它无法理解其上层协议的数据意义,而是根据TCP缓冲区的大小对数据进行拆分或组装,即上层一个完整的包可能被拆为几个TCP包来发送,或上层几个包会被组合为一个TCP包发送,这就是TCP的粘包和拆包问题。TCP协议按照自己的工作方式工作,包的拆分和组装问题就需要上层应用来解决,比如每次发送定长的数据包,数据包中包含数据的大小或在包结尾处增加标记等方法。

    Netty提供了多种解码器如LineBasedFrameDecoder和StringDecoder等来解决这个问题,只需要在pipeline中加入这些解码器即可,如:

    --

    new ChannelInitializer<SocketChannel>() {
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
            socketChannel.pipeline().addLast(new StringDecoder());
            socketChannel.pipeline().addLast(new TimeClientHandler());
        }
    }

    --

    StringDecoder 作用是将接受到的对象转换为字符串,然后继续调用后面的handler。

    LineBasedFrameDecoder 以换号符为结束标记的解码器,它依次遍历ByteBuf中的可读字节,如果碰到 或 则设置为结束。参数为最大长度,如果读到最大长度还是没有换号则会抛出异常: Unexpected exception from downstream : frame length (16) exceeds the allowed maximum (10)

    DelimiterBasedFrameDecoder 是以分隔符作为结束标志的解码器,并且解码时会自动去掉分隔符

    FixedLengthFrameDecoder 是用于定长消息的解码器。

    下面是一个简单的例子,演示几种decoder的用法:

    EchoServer

    package com.luangeng.netty.echo;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.FixedLengthFrameDecoder;
    import io.netty.handler.codec.LineBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    
    /**
     * Created by LG on 2017/11/20.
     */
    public class EchoServer {
        public static void main(String[] args) {
            new EchoServer().bind(8080);
        }
    
        public void bind(int port) {
            //配置服务端的线程组,一个用于服务端接收客户端连接,另一个进行SocketChannel的网络读写
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                //ServerBootstrap启动NIO服务端的辅助启动类
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 100)
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ByteBuf delimiter = Unpooled.copiedBuffer("$".getBytes());
                                //ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                                ch.pipeline().addLast(new FixedLengthFrameDecoder(25));
                                //ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                                ch.pipeline().addLast(new StringDecoder());
                                ch.pipeline().addLast(new EchoServerHandler());
                            }
                        });
                //绑定端口,sync为同步阻塞方法,等待绑定成功,ChannelFuture用于异步操作的通知回调
                ChannelFuture future = bootstrap.bind(port).sync();
                System.out.println("server started");
                //等待服务端监听端口关闭
                future.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println("server shuting down");
                //释放线程资源
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
    }

    ---

    EchoServerHandler

    package com.luangeng.netty.echo;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    /**
     * Created by LG on 2017/11/20.
     */
    public class EchoServerHandler extends ChannelInboundHandlerAdapter {
        int count = 0;
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String body = (String)msg;
            System.out.println("The server received("+count+++"): " + body);
    
            body = body + "$";
            ByteBuf response = Unpooled.copiedBuffer(body.getBytes());
            ctx.writeAndFlush(response);//异步发送
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("Unexpected exception from downstream : " + cause.getMessage());
            ctx.close();
        }
    
    }

    ---

    EchoClient

    package com.luangeng.netty.echo;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.FixedLengthFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    /**
     * Created by LG on 2017/11/20.
     */
    public class EchoClient {
    
        public static void main(String[] args) {
            new EchoClient().connect("127.0.0.1", 8080);
        }
    
        public void connect(String host, int port) {
            //配置客户端NIO线程组
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ByteBuf delimiter = Unpooled.copiedBuffer("$".getBytes());
                                //ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                                ch.pipeline().addLast(new FixedLengthFrameDecoder(25));
                                //ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                                ch.pipeline().addLast(new StringDecoder());
                                ch.pipeline().addLast(new EchoClientHandler());
                            }
                        });
                //发起异步连接操作,同步等待连接成功
                ChannelFuture future = bootstrap.connect(host, port).sync();
                System.out.println("client started");
                //等待客户端链路关闭
                future.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println("client shuting down");
                //释放NIO线程组
                group.shutdownGracefully();
            }
        }
    }

    ---

    EchoClientHandler

    package com.luangeng.netty.echo;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    /**
     * Created by LG on 2017/11/20.
     */
    public class EchoServerHandler extends ChannelInboundHandlerAdapter {
        int count = 0;
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String body = (String)msg;
            System.out.println("The server received("+count+++"): " + body);
    
            body = body + "";
            ByteBuf response = Unpooled.copiedBuffer(body.getBytes());
            ctx.writeAndFlush(response);//异步发送
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("Unexpected exception from downstream : " + cause.getMessage());
            ctx.close();
        }
    
    }

    ---

    执行结果:

    Server:

    2017-11-20 22:21:35,508 INFO [io.netty.handler.logging.LoggingHandler] - [id: 0x9b011f21] REGISTERED
    2017-11-20 22:21:35,510 INFO [io.netty.handler.logging.LoggingHandler] - [id: 0x9b011f21] BIND: 0.0.0.0/0.0.0.0:8080
    server started
    2017-11-20 22:21:35,512 INFO [io.netty.handler.logging.LoggingHandler] - [id: 0x9b011f21, L:/0:0:0:0:0:0:0:0:8080] ACTIVE
    2017-11-20 22:21:39,919 INFO [io.netty.handler.logging.LoggingHandler] - [id: 0x9b011f21, L:/0:0:0:0:0:0:0:0:8080] READ: [id: 0x09644a50, L:/127.0.0.1:8080 - R:/127.0.0.1:52039]
    2017-11-20 22:21:39,920 INFO [io.netty.handler.logging.LoggingHandler] - [id: 0x9b011f21, L:/0:0:0:0:0:0:0:0:8080] READ COMPLETE
    The server received(0): Hi,my name is luangeng
    The server received(1): Hi,my name is luangeng
    The server received(2): Hi,my name is luangeng
    The server received(3): Hi,my name is luangeng
    The server received(4): Hi,my name is luangeng
    The server received(5): Hi,my name is luangeng
    The server received(6): Hi,my name is luangeng
    The server received(7): Hi,my name is luangeng
    The server received(8): Hi,my name is luangeng
    The server received(9): Hi,my name is luangeng

    Client:

    client started
    0 client get: Hi,my name is luangeng
    1 client get: Hi,my name is luangeng
    2 client get: Hi,my name is luangeng
    3 client get: Hi,my name is luangeng
    4 client get: Hi,my name is luangeng
    5 client get: Hi,my name is luangeng
    6 client get: Hi,my name is luangeng
    7 client get: Hi,my name is luangeng
    8 client get: Hi,my name is luangeng
    9 client get: Hi,my name is luangeng

    end

  • 相关阅读:
    git使用
    Git常用命令梳理
    git fetch 更新远程代码到本地仓库
    理解RESTful架构
    漫谈五种IO模型(主讲IO多路复用)
    python 单下划线/双下划线使用总结
    闰秒导致MySQL服务器的CPU sys过高
    闰秒问题
    Java线上应用故障排查之一:高CPU占用
    ZooKeeper安装与配置
  • 原文地址:https://www.cnblogs.com/luangeng/p/7616713.html
Copyright © 2011-2022 走看看