zoukankan      html  css  js  c++  java
  • Netty5 时间服务器 有粘包问题

    Netty官网:http://netty.io/

    本例程使用最新的netty5.x版本编写

    服务器端:

    TimeServer 时间服务器 服务端接收客户端的连接请求和查询当前时间的指令,判断指令正确后响应返回当前服务器的校准时间。

    ?
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    package c1;
     
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
     
    /**
     * server 有粘包问题
     * @author xwalker
     */
    public class TimeServer {
        public void bind(int port) throws Exception {
            // 服务器线程组 用于网络事件的处理 一个用于服务器接收客户端的连接
            // 另一个线程组用于处理SocketChannel的网络读写
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                // NIO服务器端的辅助启动类 降低服务器开发难度
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)// 类似NIO中serverSocketChannel
                        .option(ChannelOption.SO_BACKLOG, 1024)// 配置TCP参数
                        .childHandler(new ChildChannelHandler());// 最后绑定I/O事件的处理类
                                                                    // 处理网络IO事件
     
                // 服务器启动后 绑定监听端口 同步等待成功 主要用于异步操作的通知回调 回调处理用的ChildChannelHandler
                ChannelFuture f = serverBootstrap.bind(port).sync();
                System.out.println("timeServer启动");
                // 等待服务端监听端口关闭
                f.channel().closeFuture().sync();
     
            finally {
                // 优雅退出 释放线程池资源
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
                System.out.println("服务器优雅的释放了线程资源...");
            }
     
        }
     
        /**
         * 网络事件处理器
         */
        private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new TimeServerHandler());
            }
     
        }
     
        public static void main(String[] args) throws Exception {
            int port = 8000;
            new TimeServer().bind(port);
        }
     
    }

    TimerServer接收到客户端的连接和读写请求后交给处理器handler进行事件的响应处理,服务器定义两组线程组,一组用来处理客户端连接,一组用来处理网络IO事件(SocketChannel)的响应,NioEventLoopGroup是Netty提供的NIO线程组,实际上就是Java NIO中的Reactor线程组。

    ServerBootstrap是Netty提供的用于NIO服务端辅助启动类,降低了NIO服务端的开发复杂度。

    ServerBootstrap需要绑定服务器网络IO事件的处理类ChildChannelHandler ,用于实际处理具体的IO事件,例如记录日志,对消息编解码等。

    TimeServerHandler需要继承Netty提供的适配器ChannelhandlerAdapter重写channelRead等方法完成消息的读写。

    ?
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    package c1;
     
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
     
    import java.util.Date;
    /**
     * server端网络IO事件处理
     * @author xwalker
     *
     */
    public class TimeServerHandler extends ChannelHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
            System.out.println("服务器读取到客户端请求...");
            ByteBuf buf=(ByteBuf) msg;
            byte[] req=new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body=new String(req,"UTF-8");
            System.out.println("the time server receive order:"+body);
            String curentTime="QUERY TIME ORDER".equalsIgnoreCase(body)?new Date(System.currentTimeMillis()).toString():"BAD ORDER";
            ByteBuf resp=Unpooled.copiedBuffer(curentTime.getBytes());
            ctx.write(resp);
            System.out.println("服务器做出了响应");
        }
         
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
            System.out.println("服务器readComplete 响应完成");
        }
         
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            ctx.close();
            System.out.println("服务器异常退出"+cause.getMessage());
        }
    }

    服务器通过handler接收和处理消息请求,channelRead中的msg就是客户端请求的消息,通过解码获取具体信息后根据消息格式和定义完成后续的响应。

    ByteBuf是netty封装和扩展的java NIO中的ByteBuffer类,功能更完善。通过ByteBuf接收和解码msg 转成String类型 然后判断命令是都准确,根据结果做出响应。

    客户端:

    客户端的处理比较简单,启动客户端,链接服务器成功后发送时间查询的指令,等待服务器响应。

    ?
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    package c1;
     
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    /**
     * client 存在TCP粘包问题
     * @author xwlaker
     *
     */
    public class TimeClient {
        /**
         * 连接服务器
         * @param port
         * @param host
         * @throws Exception
         */
        public void connect(int port, String host) throws Exception {
            //配置客户端NIO线程组
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                //客户端辅助启动类 对客户端配置
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch)
                                    throws Exception {
                                ch.pipeline().addLast(new TimeClientHandler());
                            }
                        });
                //异步链接服务器 同步等待链接成功
                ChannelFuture f = b.connect(host, port).sync();
                //等待链接关闭
                f.channel().closeFuture().sync();
     
            finally {
                group.shutdownGracefully();
                System.out.println("客户端优雅的释放了线程资源...");
            }
     
        }
     
        public static void main(String[] args) throws Exception {
            new TimeClient().connect(8000"127.0.0.1");
        }
     
    }

    客户端定义一组线程组用于处理与服务器的网络IO事件。通过客户端辅助启动类 Bootstrap来配置线程组、TCP参数以及IO事件处理的Handler。

    ?
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    package c1;
     
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
     
    import java.util.logging.Logger;
    /**
     * Client 网络IO事件处理
     * @author xwalker
     *
     */
    public class TimeClientHandler extends ChannelHandlerAdapter {
        private static final Logger logger=Logger.getLogger(TimeClientHandler.class.getName());
        private  ByteBuf firstMessage;
        public TimeClientHandler(){
            byte[] req ="QUERY TIME ORDER".getBytes();
            firstMessage=Unpooled.buffer(req.length);
            firstMessage.writeBytes(req);
        }
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.writeAndFlush(firstMessage);
            System.out.println("客户端active");
        }
         
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
            System.out.println("客户端收到服务器响应数据");
            ByteBuf buf=(ByteBuf) msg;
            byte[] req=new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body=new String(req,"UTF-8");
            System.out.println("Now is:"+body);
             
        }
         
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
            System.out.println("客户端收到服务器响应数据处理完成");
        }
         
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            logger.warning("Unexpected exception from downstream:"+cause.getMessage());
            ctx.close();
            System.out.println("客户端异常退出");
        }
    }

    TimeClienthandler继承Netty提供的Handler适配器,重写channelActive和channelRead方法 前者通道打开active状态时 发送查询指令,后者接收服务器响应的消息并解码输出。

    运行结果:

    客户端启动后首先处理器channelActive被调用发送查询指令,服务器端接收到查询指令后返回了当前时间,客户端接收到服务器响应后解码输出当前时间。

  • 相关阅读:
    【转】CUDA5/CentOS6.4
    【转】centos 6.4 samba 安装配置
    【转】Install MATLAB 2013a on CentOS 6.4 x64 with mode silent
    【转】Getting xrdp to work on CentOS 6.4
    【VLFeat】使用matlab版本计算HOG
    Unofficial Windows Binaries for Python Extension Packages
    March 06th, 2018 Week 10th Tuesday
    March 05th, 2018 Week 10th Monday
    March 04th, 2018 Week 10th Sunday
    March 03rd, 2018 Week 9th Saturday
  • 原文地址:https://www.cnblogs.com/hzcya1995/p/13318139.html
Copyright © 2011-2022 走看看