zoukankan      html  css  js  c++  java
  • 学习netty遇到的关于 LineBasedFrameDecoder 的问题

    最近在看《Netty权威指南》这本书,关于TCP粘包/拆包,书中使用的是 LineBasedFrameDecoder 来解决的,但是我在实践的过程中出现了问题,上代码吧。

    这个是 server 的代码

    package com.cd.netty4.zhc.demo.ex01;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.LineBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.util.CharsetUtil;
    
    /** 
     * 本例子参考《Netty权威指南(第2版)》第4章 
     * 先运行 TimeServerExc02,然后运行 TimeClientExc02,可解决 TCP 粘包/拆包问题
     * 使用 LineBasedFrameDecoder + StringDecoder 解决 TCP 粘包/拆包问题
     * */
    public class TimeServerExc02 {
        public static void main(String args[]) {
            System.out.println("---------------------- server 测试开始 ---------------------");
            new TimeServerExc02().bind("127.0.0.1", 1234);
            System.out.println("---------------------- server 测试end ---------------------");
        }
    
        public void bind(String host, int port) {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            ServerBootstrap serboot = new ServerBootstrap().group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel arg0) throws Exception {
                            arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            arg0.pipeline().addLast("decoder", new StringDecoder());
                            arg0.pipeline().addLast("handler", new TimeServerHandlerExc02());
                        }
                    });
    
            try {
                // 绑定端口,同步等待成功
                ChannelFuture future = serboot.bind(host, port).sync();
                // 等待服务端监听端口关闭
                future.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    class TimeServerHandlerExc02 extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("server channelActive(有client连接上了)..");
            ctx.writeAndFlush(Unpooled.copiedBuffer("您已经成功连接上了 server!", CharsetUtil.UTF_8)); // 必须有flush
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("server channelRead..");
            String msgStr = msg.toString();
            System.out.println("读入client消息:" + msgStr);
            String currentTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
            ByteBuf resp = Unpooled.copiedBuffer(currentTime, CharsetUtil.UTF_8);
            ctx.writeAndFlush(resp);
            System.out.println("向client发送消息:" + currentTime);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
    }

    这个是client的代码:

    package com.cd.netty4.zhc.demo.ex01;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.LineBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.util.CharsetUtil;
    
    /** 
     * 本例子参考《Netty权威指南(第2版)》第4.2章 
     * */
    public class TimeClientExc02 {
    
        public static void main(String[] args) {
            try {
                System.out.println("---------------------- client 测试开始 ---------------------");
                new TimeClientExc02().connect("127.0.0.1", 1234);
                System.out.println("---------------------- client 测试end ---------------------");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        private int count;
    
        public void connect(String host, int port) throws InterruptedException {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap boot = new Bootstrap().group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                                ch.pipeline().addLast("decoder", new StringDecoder());
                                ch.pipeline().addLast(new TimeClientHandlerExc02());
                            }
                        });
                ChannelFuture future = boot.connect(host, port);
                // 等待客户端链路关闭
                future.channel().closeFuture().sync();
            } finally {
                // 优雅的退出,释放NIO线程组
                group.shutdownGracefully();
            }
        }
    
        class TimeClientHandlerExc02 extends ChannelInboundHandlerAdapter {
    
            @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                System.out.println("client channelActive(client 连接成功)..");
                for (int i = 0; i < 50; i++) {
                    System.out.print(i + ",");
                    ctx.writeAndFlush(
                            Unpooled.copiedBuffer("It's a good day , I want to know time--" + i , CharsetUtil.UTF_8)); // 必须有flush
                }
                ctx.flush();
            }
    
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                System.out.println("client channelRead.." + ++count);
                String msgStr = msg.toString();
                System.out.println("读入 server 消息:" + msgStr);
            }
    
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                cause.printStackTrace();
                ctx.close();
            }
    
        }
    
    }

    我先运行的是server,然后是client,发现 server 的 channelActive(..) 以及 client 的 channelActive(..) 都有运行到,但是后续的 channelRead(..) 方法却迟迟没有运行到,我把 LineBasedFrameDecoder 和 StringDecoder 这两个 解码器去掉,则代码正常,但是会有 TCP 粘包/拆包问题。

    在网上查了问题原因,无果,认真看了两遍书,发现 LineBasedFrameDecoder  的工作原理是“ 它依次遍历ByteBuf中的可读字节,判断看是否有‘ ’或者‘ ’,如果有,就在此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行 ”。

    所以,我的问题就出在消息结尾处没有加上换行符,修改代码后,可运行。修改后代码如下:

    package com.cd.netty4.zhc.demo.ex01;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.LineBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.util.CharsetUtil;
    
    /** 
     * 本例子参考《Netty权威指南(第2版)》第4章 
     * 先运行 TimeServerExc02,然后运行 TimeClientExc02,可解决 TCP 粘包/拆包问题
     * 使用 LineBasedFrameDecoder + StringDecoder 解决 TCP 粘包/拆包问题
     * 注意:使用 LineBasedFrameDecoder时,发送的消息结尾一定要是
    (官方是 System.getProperty("line.separator")),server端 和 client 都必须如此
     * 因为LineBasedFrameDecoder 的工作原理是,依次遍历Bytebuf中的可读字节,判断是否有“
    ”或者“
    ”,如果有则在此位置结束
     * */
    public class TimeServerExc02 {
        public static void main(String args[]) {
            System.out.println("---------------------- server 测试开始 ---------------------");
            new TimeServerExc02().bind("127.0.0.1", 1234);
            System.out.println("---------------------- server 测试end ---------------------");
        }
    
        public void bind(String host, int port) {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            ServerBootstrap serboot = new ServerBootstrap().group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel arg0) throws Exception {
                            arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            arg0.pipeline().addLast("decoder", new StringDecoder());
                            arg0.pipeline().addLast("handler", new TimeServerHandlerExc02());
                        }
                    });
    
            try {
                // 绑定端口,同步等待成功
                ChannelFuture future = serboot.bind(host, port).sync();
                // 等待服务端监听端口关闭
                future.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    class TimeServerHandlerExc02 extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("server channelActive(有client连接上了)..");
            ctx.writeAndFlush(Unpooled.copiedBuffer("您已经成功连接上了 server!", CharsetUtil.UTF_8)); // 必须有flush
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("server channelRead..");
            String msgStr = msg.toString();
            System.out.println("读入client消息:" + msgStr);
            String currentTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
            ByteBuf resp = Unpooled.copiedBuffer(currentTime + System.getProperty("line.separator"), CharsetUtil.UTF_8);
            ctx.writeAndFlush(resp);
            System.out.println("向client发送消息:" + currentTime);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
    }
    package com.cd.netty4.zhc.demo.ex01;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.LineBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.util.CharsetUtil;
    
    /** 
     * 本例子参考《Netty权威指南(第2版)》第4.2章 
     * 注意:使用 LineBasedFrameDecoder时,发送的消息结尾一定要是
    (官方是 System.getProperty("line.separator")),server端 和 client 都必须如此
     * 因为LineBasedFrameDecoder 的工作原理是,依次遍历Bytebuf中的可读字节,判断是否有“
    ”或者“
    ”,如果有则在此位置结束
     * */
    public class TimeClientExc02 {
    
        public static void main(String[] args) {
            try {
                System.out.println("---------------------- client 测试开始 ---------------------");
                new TimeClientExc02().connect("127.0.0.1", 1234);
                System.out.println("---------------------- client 测试end ---------------------");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        private int count;
    
        public void connect(String host, int port) throws InterruptedException {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap boot = new Bootstrap().group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                                ch.pipeline().addLast("decoder", new StringDecoder());
                                ch.pipeline().addLast(new TimeClientHandlerExc02());
                            }
                        });
                ChannelFuture future = boot.connect(host, port);
                // 等待客户端链路关闭
                future.channel().closeFuture().sync();
            } finally {
                // 优雅的退出,释放NIO线程组
                group.shutdownGracefully();
            }
        }
    
        class TimeClientHandlerExc02 extends ChannelInboundHandlerAdapter {
    
            @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                System.out.println("client channelActive(client 连接成功)..");
                for (int i = 0; i < 50; i++) {
                    System.out.print(i + ",");
                    ctx.writeAndFlush(
                            Unpooled.copiedBuffer("It's a good day , I want to know time--" + i + "
    ", CharsetUtil.UTF_8)); // 必须有flush
                }
                ctx.flush();
            }
    
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                System.out.println("client channelRead.." + ++count);
                String msgStr = msg.toString();
                System.out.println("读入 server 消息:" + msgStr);
            }
    
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                cause.printStackTrace();
                ctx.close();
            }
    
        }
    
    }

    查看了 LineBasedFrameDecoder 的部分源码,确实是以换行作为分割符的。

  • 相关阅读:
    div+css 遮罩层
    高可用开源方案Heartbeat vs Keepalived
    nginx+keepalive 实现高可用负载均衡方案
    KeepAlive详解
    (转)高可用可伸缩架构实用经验谈 ---- 重要
    OpenStack与KVM的区别与联系
    架构师于小波:魅族实时消息推送架构
    抛开flash,自己开发实现C++ RTMP直播流播放器
    (转)C++实现RTMP协议发送H.264编码及AAC编码的音视频,摄像头直播
    (转)OC学习笔记 @property的属性 strong 和 weak 理解
  • 原文地址:https://www.cnblogs.com/klbc/p/10887072.html
Copyright © 2011-2022 走看看