zoukankan      html  css  js  c++  java
  • Netty解决TCP粘包/拆包问题

    服务端

    package org.zln.netty.five.timer;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * 时间服务器服务端
     * Created by sherry on 16/11/5.
     */
    public class TimerServer {
        /**
         * 服务端绑定端口号
         */
        private int PORT;
    
        public TimerServer(int PORT) {
            this.PORT = PORT;
        }
    
        /**
         * 日志
         */
        private static Logger logger = LoggerFactory.getLogger(TimerServer.class);
    
        public void bind() {
            /*
            NioEventLoopGroup是线程池组
            包含了一组NIO线程,专门用于网络事件的处理
            bossGroup:服务端,接收客户端连接
            workGroup:进行SocketChannel的网络读写
             */
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workGroup = new NioEventLoopGroup();
            try {
                /*
                ServerBootstrap:用于启动NIO服务的辅助类,目的是降低服务端的开发复杂度
                 */
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup, workGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 1024)//配置TCP参数,能够设置很多,这里就只设置了backlog=1024,
                        .childHandler(new TimerServerInitializer());//绑定I/O事件处理类
                logger.debug("绑定端口号:" + PORT + ",等待同步成功");
                /*
                bind:绑定端口
                sync:同步阻塞方法,等待绑定完成,完成后返回 ChannelFuture ,主要用于通知回调
                 */
                ChannelFuture channelFuture = serverBootstrap.bind(PORT).sync();
                logger.debug("等待服务端监听窗口关闭");
                /*
                 closeFuture().sync():为了阻塞,服务端链路关闭后才退出.也是一个同步阻塞方法
                 */
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            } finally {
                logger.debug("优雅退出,释放线程池资源");
                bossGroup.shutdownGracefully();
                workGroup.shutdownGracefully();
            }
        }
    }
    TimerServer
     1 package org.zln.netty.five.timer;
     2 
     3 import io.netty.channel.ChannelInitializer;
     4 import io.netty.channel.ChannelPipeline;
     5 import io.netty.channel.socket.SocketChannel;
     6 import io.netty.handler.codec.LineBasedFrameDecoder;
     7 import io.netty.handler.codec.string.StringDecoder;
     8 
     9 /**
    10  * Created by sherry on 16/11/5.
    11  */
    12 public class TimerServerInitializer extends ChannelInitializer<SocketChannel> {
    13     @Override
    14     protected void initChannel(SocketChannel socketChannel) throws Exception {
    15 
    16         ChannelPipeline pipeline = socketChannel.pipeline();
    17 
    18         pipeline.addLast(new LineBasedFrameDecoder(1024));
    19         pipeline.addLast(new StringDecoder());
    20         pipeline.addLast(new TimerServerHandler());
    21 
    22 
    23 
    24     }
    25 }
    TimerServerInitializer
     1 package org.zln.netty.five.timer;
     2 
     3 import io.netty.buffer.ByteBuf;
     4 import io.netty.buffer.Unpooled;
     5 import io.netty.channel.ChannelHandlerAdapter;
     6 import io.netty.channel.ChannelHandlerContext;
     7 import org.slf4j.Logger;
     8 import org.slf4j.LoggerFactory;
     9 
    10 import java.text.SimpleDateFormat;
    11 import java.util.Date;
    12 
    13 /**
    14  * Handler主要用于对网络事件进行读写操作,是真正的业务类
    15  * 通常只需要关注 channelRead 和 exceptionCaught 方法
    16  * Created by sherry on 16/11/5.
    17  */
    18 public class TimerServerHandler extends ChannelHandlerAdapter {
    19 
    20     /**
    21      * 日志
    22      */
    23     private Logger logger = LoggerFactory.getLogger(TimerServerHandler.class);
    24 
    25     private static int count = 0;
    26 
    27     @Override
    28     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    29 
    30         String body = (String) msg;
    31         logger.debug("第  "+(++count)+"  次收到请求  -  "+body);
    32 
    33         String timeNow = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS").format(new Date())+System.lineSeparator();
    34 
    35         //获取发送给客户端的数据
    36         ByteBuf resBuf = Unpooled.copiedBuffer(timeNow.getBytes("UTF-8"));
    37 
    38         ctx.writeAndFlush(resBuf);
    39     }
    40 
    41 
    42     @Override
    43     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    44         //将消息发送队列中的消息写入到SocketChannel中发送给对方
    45         logger.debug("channelReadComplete");
    46         ctx.flush();
    47     }
    48 
    49     @Override
    50     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    51         //发生异常时,关闭 ChannelHandlerContext,释放ChannelHandlerContext 相关的句柄等资源
    52         logger.error(cause.getMessage(),cause);
    53         ctx.close();
    54     }
    55 }
    TimerServerHandler

    客户端

     1 package org.zln.netty.five.timer;
     2 
     3 import io.netty.bootstrap.Bootstrap;
     4 import io.netty.channel.ChannelFuture;
     5 import io.netty.channel.ChannelOption;
     6 import io.netty.channel.EventLoopGroup;
     7 import io.netty.channel.nio.NioEventLoopGroup;
     8 import io.netty.channel.socket.nio.NioSocketChannel;
     9 import org.slf4j.Logger;
    10 import org.slf4j.LoggerFactory;
    11 
    12 /**
    13  * 时间服务器客户端
    14  * Created by sherry on 16/11/5.
    15  */
    16 public class TimerClient {
    17     /**
    18      * 日志
    19      */
    20     private Logger logger = LoggerFactory.getLogger(TimerServer.class);
    21 
    22     private String HOST;
    23     private int PORT;
    24 
    25     public TimerClient(String HOST, int PORT) {
    26         this.HOST = HOST;
    27         this.PORT = PORT;
    28     }
    29 
    30     public void connect(){
    31         //配置客户端NIO线程组
    32         EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    33         try {
    34             Bootstrap bootstrap = new Bootstrap();
    35             bootstrap.group(eventLoopGroup)
    36                     .channel(NioSocketChannel.class)
    37                     .option(ChannelOption.TCP_NODELAY,true)
    38                     .handler(new TimerClientInitializer());
    39             //发起异步连接操作
    40             logger.debug("发起异步连接操作 - start");
    41             ChannelFuture channelFuture = bootstrap.connect(HOST,PORT).sync();
    42             logger.debug("发起异步连接操作 - end");
    43             //等待客户端链路关闭
    44             logger.debug("等待客户端链路关闭 - start");
    45             channelFuture.channel().closeFuture().sync();
    46             logger.debug("等待客户端链路关闭 - end");
    47         } catch (InterruptedException e) {
    48             logger.error(e.getMessage(),e);
    49         }finally {
    50             //优雅的关闭
    51             eventLoopGroup.shutdownGracefully();
    52         }
    53     }
    54 }
    TimerClient
     1 package org.zln.netty.five.timer;
     2 
     3 import io.netty.channel.ChannelInitializer;
     4 import io.netty.channel.ChannelPipeline;
     5 import io.netty.channel.socket.SocketChannel;
     6 import io.netty.handler.codec.LineBasedFrameDecoder;
     7 import io.netty.handler.codec.string.StringDecoder;
     8 
     9 /**
    10  * Created by sherry on 16/11/5.
    11  */
    12 public class TimerClientInitializer extends ChannelInitializer<SocketChannel> {
    13     @Override
    14     protected void initChannel(SocketChannel socketChannel) throws Exception {
    15         ChannelPipeline pipeline = socketChannel.pipeline();
    16         pipeline.addLast(new LineBasedFrameDecoder(1024));
    17         pipeline.addLast(new StringDecoder());
    18         pipeline.addLast(new TimerClientHandler());
    19     }
    20 }
    TimerClientInitializer
     1 package org.zln.netty.five.timer;
     2 
     3 import io.netty.buffer.ByteBuf;
     4 import io.netty.buffer.Unpooled;
     5 import io.netty.channel.ChannelHandlerAdapter;
     6 import io.netty.channel.ChannelHandlerContext;
     7 import org.slf4j.Logger;
     8 import org.slf4j.LoggerFactory;
     9 
    10 import java.io.UnsupportedEncodingException;
    11 
    12 /**
    13  * Created by sherry on 16/11/5.
    14  */
    15 public class TimerClientHandler extends ChannelHandlerAdapter {
    16 
    17     /**
    18      * 日志
    19      */
    20     private Logger logger = LoggerFactory.getLogger(TimerClientHandler.class);
    21 
    22     private static int count = 0;
    23 
    24     @Override
    25     public void channelActive(ChannelHandlerContext ctx) throws Exception {
    26         logger.debug("客户端连接上了服务端");
    27 
    28         //发送请求
    29         ByteBuf reqBuf = null;
    30         for (int i = 0; i < 100; i++) {
    31             reqBuf = getReq("GET TIME"+System.lineSeparator());
    32             ctx.writeAndFlush(reqBuf);
    33         }
    34 
    35 
    36     }
    37 
    38     /**
    39      * 将字符串包装成ByteBuf
    40      * @param s
    41      * @return
    42      */
    43     private ByteBuf getReq(String s) throws UnsupportedEncodingException {
    44         byte[] data = s.getBytes("UTF-8");
    45         ByteBuf reqBuf = Unpooled.buffer(data.length);
    46         reqBuf.writeBytes(data);
    47         return reqBuf;
    48     }
    49 
    50     @Override
    51     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    52         String body = (String) msg;
    53         logger.debug("这是收到的第 "+(++count)+" 笔响应 -- "+body);
    54     }
    55 
    56     @Override
    57     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    58         ctx.close();
    59     }
    60 }
    TimerClientHandler

    这里主要使用 LineBasedFrameDecoder 和 StringDecoder 来实现解决粘包问题

    原理如下:

      LineBasedFrameDecoder 依次遍历 ByteBuf 中的可读字节,判断是否有 或 ,如果有,就作为结束位置。从可读索引到结束位置区间的字节组成一行。它是以换行符为结束标志的解码器。支持携带结束符或者不懈怠结束符两种解码方式。同时支持配置单行的最大长度。如果读取到了最大长度仍旧没有发现换行符,就会抛出异常,同时忽略掉之前读到的数据。

      StringDecoder 的作用就是讲接收到的对象转化成字符串,然后继续调用handler。这样就不需要再handler中手动将对象转化成字符串了,直接强制转化就行。

      LineBasedFrameDecoder+StringDecoder组合就是按行切割的文本解码器,用来解决TCP的粘包和拆包问题。

      

  • 相关阅读:
    [VirtaulBox]网络连接设置
    LeetCode
    LeetCode
    LeetCode
    LeetCode-37.Sudok Solver
    LeetCode-36.Valid Sudoku
    LeetCode-52.N-Queen II
    LeetCode-51.N-Queens
    LeetCode-22.Generate Parentheses
    LeetCode-111.Mininum Depth of Binary Tree
  • 原文地址:https://www.cnblogs.com/sherrykid/p/6036381.html
Copyright © 2011-2022 走看看