zoukankan      html  css  js  c++  java
  • Netty5 时间服务器 有粘包问题

    Netty官网:http://netty.io/

    本例程使用最新的netty5.x版本编写

    服务器端:

    TimeServer 时间服务器 服务端接收客户端的连接请求和查询当前时间的指令,判断指令正确后响应返回当前服务器的校准时间。

    ?
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    package c1;
     
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
     
    /**
     * server 有粘包问题
     * @author xwalker
     */
    public class TimeServer {
        public void bind(int port) throws Exception {
            // 服务器线程组 用于网络事件的处理 一个用于服务器接收客户端的连接
            // 另一个线程组用于处理SocketChannel的网络读写
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                // NIO服务器端的辅助启动类 降低服务器开发难度
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)// 类似NIO中serverSocketChannel
                        .option(ChannelOption.SO_BACKLOG, 1024)// 配置TCP参数
                        .childHandler(new ChildChannelHandler());// 最后绑定I/O事件的处理类
                                                                    // 处理网络IO事件
     
                // 服务器启动后 绑定监听端口 同步等待成功 主要用于异步操作的通知回调 回调处理用的ChildChannelHandler
                ChannelFuture f = serverBootstrap.bind(port).sync();
                System.out.println("timeServer启动");
                // 等待服务端监听端口关闭
                f.channel().closeFuture().sync();
     
            finally {
                // 优雅退出 释放线程池资源
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
                System.out.println("服务器优雅的释放了线程资源...");
            }
     
        }
     
        /**
         * 网络事件处理器
         */
        private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new TimeServerHandler());
            }
     
        }
     
        public static void main(String[] args) throws Exception {
            int port = 8000;
            new TimeServer().bind(port);
        }
     
    }

    TimerServer接收到客户端的连接和读写请求后交给处理器handler进行事件的响应处理,服务器定义两组线程组,一组用来处理客户端连接,一组用来处理网络IO事件(SocketChannel)的响应,NioEventLoopGroup是Netty提供的NIO线程组,实际上就是Java NIO中的Reactor线程组。

    ServerBootstrap是Netty提供的用于NIO服务端辅助启动类,降低了NIO服务端的开发复杂度。

    ServerBootstrap需要绑定服务器网络IO事件的处理类ChildChannelHandler ,用于实际处理具体的IO事件,例如记录日志,对消息编解码等。

    TimeServerHandler需要继承Netty提供的适配器ChannelhandlerAdapter重写channelRead等方法完成消息的读写。

    ?
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    package c1;
     
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
     
    import java.util.Date;
    /**
     * server端网络IO事件处理
     * @author xwalker
     *
     */
    public class TimeServerHandler extends ChannelHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
            System.out.println("服务器读取到客户端请求...");
            ByteBuf buf=(ByteBuf) msg;
            byte[] req=new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body=new String(req,"UTF-8");
            System.out.println("the time server receive order:"+body);
            String curentTime="QUERY TIME ORDER".equalsIgnoreCase(body)?new Date(System.currentTimeMillis()).toString():"BAD ORDER";
            ByteBuf resp=Unpooled.copiedBuffer(curentTime.getBytes());
            ctx.write(resp);
            System.out.println("服务器做出了响应");
        }
         
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
            System.out.println("服务器readComplete 响应完成");
        }
         
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            ctx.close();
            System.out.println("服务器异常退出"+cause.getMessage());
        }
    }

    服务器通过handler接收和处理消息请求,channelRead中的msg就是客户端请求的消息,通过解码获取具体信息后根据消息格式和定义完成后续的响应。

    ByteBuf是netty封装和扩展的java NIO中的ByteBuffer类,功能更完善。通过ByteBuf接收和解码msg 转成String类型 然后判断命令是都准确,根据结果做出响应。

    客户端:

    客户端的处理比较简单,启动客户端,链接服务器成功后发送时间查询的指令,等待服务器响应。

    ?
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    package c1;
     
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    /**
     * client 存在TCP粘包问题
     * @author xwlaker
     *
     */
    public class TimeClient {
        /**
         * 连接服务器
         * @param port
         * @param host
         * @throws Exception
         */
        public void connect(int port, String host) throws Exception {
            //配置客户端NIO线程组
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                //客户端辅助启动类 对客户端配置
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch)
                                    throws Exception {
                                ch.pipeline().addLast(new TimeClientHandler());
                            }
                        });
                //异步链接服务器 同步等待链接成功
                ChannelFuture f = b.connect(host, port).sync();
                //等待链接关闭
                f.channel().closeFuture().sync();
     
            finally {
                group.shutdownGracefully();
                System.out.println("客户端优雅的释放了线程资源...");
            }
     
        }
     
        public static void main(String[] args) throws Exception {
            new TimeClient().connect(8000"127.0.0.1");
        }
     
    }

    客户端定义一组线程组用于处理与服务器的网络IO事件。通过客户端辅助启动类 Bootstrap来配置线程组、TCP参数以及IO事件处理的Handler。

    ?
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    package c1;
     
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
     
    import java.util.logging.Logger;
    /**
     * Client 网络IO事件处理
     * @author xwalker
     *
     */
    public class TimeClientHandler extends ChannelHandlerAdapter {
        private static final Logger logger=Logger.getLogger(TimeClientHandler.class.getName());
        private  ByteBuf firstMessage;
        public TimeClientHandler(){
            byte[] req ="QUERY TIME ORDER".getBytes();
            firstMessage=Unpooled.buffer(req.length);
            firstMessage.writeBytes(req);
        }
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.writeAndFlush(firstMessage);
            System.out.println("客户端active");
        }
         
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
            System.out.println("客户端收到服务器响应数据");
            ByteBuf buf=(ByteBuf) msg;
            byte[] req=new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body=new String(req,"UTF-8");
            System.out.println("Now is:"+body);
             
        }
         
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
            System.out.println("客户端收到服务器响应数据处理完成");
        }
         
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            logger.warning("Unexpected exception from downstream:"+cause.getMessage());
            ctx.close();
            System.out.println("客户端异常退出");
        }
    }

    TimeClienthandler继承Netty提供的Handler适配器,重写channelActive和channelRead方法 前者通道打开active状态时 发送查询指令,后者接收服务器响应的消息并解码输出。

    运行结果:

    客户端启动后首先处理器channelActive被调用发送查询指令,服务器端接收到查询指令后返回了当前时间,客户端接收到服务器响应后解码输出当前时间。

  • 相关阅读:
    五、生产者消费者模型_ThreadLocal
    四、多线程基础-线程池的创建和使用
    spring根据beanName获取bean
    spring容器的功能扩展
    机甲大师S1机器人编程学习,Windows 10 安装Scratch和简单实例学习
    如何建设高可用系统
    详解Condition的await和signal等待/通知机制
    从源码角度彻底理解ReentrantLock(重入锁)
    MySQL 分库分表及其平滑扩容方案
    机甲大师S1机器人编程学习
  • 原文地址:https://www.cnblogs.com/hzcya1995/p/13318139.html
Copyright © 2011-2022 走看看