zoukankan      html  css  js  c++  java
  • Netty实现心跳机制

    netty心跳机制示例,使用Netty实现心跳机制,使用netty4,IdleStateHandler 实现。Netty心跳机制,netty心跳检测,netty,心跳

    本文假设你已经了解了Netty的使用,或者至少写过netty的helloworld,知道了netty的基本使用。我们知道使用netty的时候,大多数的东西都与Handler有关,我们的业务逻辑基本都是在Handler中实现的。Netty中自带了一个IdleStateHandler 可以用来实现心跳检测。

    心跳检测的逻辑

    本文中我们将要实现的心跳检测逻辑是这样的:服务端启动后,等待客户端连接,客户端连接之后,向服务端发送消息。如果客户端在“干活”那么服务端必定会收到数据,如果客户端“闲下来了”那么服务端就接收不到这个客户端的消息,既然客户端闲下来了,不干事,那么何必浪费连接资源呢?所以服务端检测到一定时间内客户端不活跃的时候,将客户端连接关闭。本文要实现的逻辑步骤为:

    1. 启动服务端,启动客户端
    2. 客户端向服务端发送"I am alive",并sleep随机时间,用来模拟空闲。
    3. 服务端接收客户端消息,并返回"copy that",客户端空闲时 计数+1.
    4. 服务端客户端继续通信
    5. 服务端检测客户端空闲太多,关闭连接。客户端发现连接关闭了,就退出了。

    有了这个思路,我们先来编写服务端。

    心跳检测服务端代码

    public class HeartBeatServer {
    
        int port ;
        public HeartBeatServer(int port){
            this.port = port;
        }
    
        public void start(){
            ServerBootstrap bootstrap = new ServerBootstrap();
            EventLoopGroup boss = new NioEventLoopGroup();
            EventLoopGroup worker = new NioEventLoopGroup();
            try{
                bootstrap.group(boss,worker)
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new HeartBeatInitializer());
    
                ChannelFuture future = bootstrap.bind(port).sync();
                future.channel().closeFuture().sync();
            }catch(Exception e){
                e.printStackTrace();
            }finally {
                worker.shutdownGracefully();
                boss.shutdownGracefully();
            }
        }
        public static void main(String[] args) throws Exception {
            HeartBeatServer server = new HeartBeatServer(8090);
            server.start();
        }
    }
    

    熟悉netty的同志,对于上面的模板一样的代码一定是在熟悉不过了。啥都不用看,只需要看childHandler(new HeartBeatInitializer()) 这一句。HeartBeatInitializer就是一个ChannelInitializer顾名思义,他就是在初始化channel的时做一些事情。我们所需要开发的业务逻辑Handler就是在这里添加的。其代码如下:

    public class HeartBeatInitializer extends ChannelInitializer<Channel> {
    
        @Override
        protected void initChannel(Channel channel) throws Exception {
            ChannelPipeline pipeline = channel.pipeline();
            pipeline.addLast("decoder", new StringDecoder());
            pipeline.addLast("encoder", new StringEncoder());
            pipeline.addLast(new IdleStateHandler(2,2,2, TimeUnit.SECONDS));
            pipeline.addLast(new HeartBeatHandler());
        }
    }
    

    代码很简单,我们先添加了StringDecoder,和StringEncoder。这两个其实就是编解码用的,下面的IdleStateHandler才是本次心跳的核心组件。我们可以看到IdleStateHandler的构造函数中接收了4个参数,其定义如下:

    public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit);
    

    三个空闲时间参数,以及时间参数的格式。我们的例子中设置的是2,2,2,意思就是客户端2秒没有读/写,这个超时时间就会被触发。超时事件触发就需要我们来处理了,这就是上的HeartBeatInitializer中最后一行的HeartBeatHandler所做的事情。代码如下:

    public class HeartBeatHandler extends SimpleChannelInboundHandler<String> {
    
        int readIdleTimes = 0;
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
            System.out.println(" ====== > [server] message received : " + s);
           if("I am alive".equals(s)){
                ctx.channel().writeAndFlush("copy that");
            }else {
               System.out.println(" 其他信息处理 ... ");
           }
        }
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            IdleStateEvent event = (IdleStateEvent)evt;
    
            String eventType = null;
            switch (event.state()){
                case READER_IDLE:
                    eventType = "读空闲";
                    readIdleTimes ++; // 读空闲的计数加1
                    break;
                case WRITER_IDLE:
                    eventType = "写空闲";
                    // 不处理
                    break;
                case ALL_IDLE:
                    eventType ="读写空闲";
                    // 不处理
                    break;
            }
            System.out.println(ctx.channel().remoteAddress() + "超时事件:" +eventType);
            if(readIdleTimes > 3){
                System.out.println(" [server]读空闲超过3次,关闭连接");
                ctx.channel().writeAndFlush("you are out");
                ctx.channel().close();
            }
        }
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.err.println("=== " + ctx.channel().remoteAddress() + " is active ===");
        }
    
    }
    

    至此,我们的服务端写好了。

    心跳检测客户端代码

    netty的api设计使得编码的模式非常具有通用性,所以客户端代码和服务端的代码几乎一样:启动client端的代码几乎一样,也需要一个ChannelInitializer,也需要Handler。改动的地方很少,因此本文不对客户端代码进行详细解释。下面给出client端的完整代码:

    public class HeartBeatClient  {
    
        int port;
        Channel channel;
        Random random ;
    
        public HeartBeatClient(int port){
            this.port = port;
            random = new Random();
        }
        public static void main(String[] args) throws Exception{
            HeartBeatClient client = new HeartBeatClient(8090);
            client.start();
        }
    
        public void start() {
            EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
            try{
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                        .handler(new HeartBeatClientInitializer());
    
                connect(bootstrap,port);
                String  text = "I am alive";
                while (channel.isActive()){
                    sendMsg(text);
                }
            }catch(Exception e){
                // do something
            }finally {
                eventLoopGroup.shutdownGracefully();
            }
        }
    
        public void connect(Bootstrap bootstrap,int port) throws Exception{
            channel = bootstrap.connect("localhost",8090).sync().channel();
        }
    
        public void sendMsg(String text) throws Exception{
            int num = random.nextInt(10);
            Thread.sleep(num * 1000);
            channel.writeAndFlush(text);
        }
    
        static class HeartBeatClientInitializer extends ChannelInitializer<Channel> {
    
            @Override
            protected void initChannel(Channel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast("decoder", new StringDecoder());
                pipeline.addLast("encoder", new StringEncoder());
                pipeline.addLast(new HeartBeatClientHandler());
            }
        }
    
        static class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> {
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
                System.out.println(" client received :" +msg);
                if(msg!= null && msg.equals("you are out")) {
                    System.out.println(" server closed connection , so client will close too");
                    ctx.channel().closeFuture();
                }
            }
        }
    }
    

    运行代码

    在上面的代码写好之后,我们先启动服务端,然后在启动客户端。运行日志如下:

    server端:

    === /127.0.0.1:57700 is active ===
     ====== > [server] message received : I am alive
     ====== > [server] message received : I am alive
    /127.0.0.1:57700超时事件:写空闲
    /127.0.0.1:57700超时事件:读空闲
    /127.0.0.1:57700超时事件:读写空闲
    /127.0.0.1:57700超时事件:写空闲
    /127.0.0.1:57700超时事件:读空闲
    /127.0.0.1:57700超时事件:读写空闲
    /127.0.0.1:57700超时事件:写空闲
     ====== > [server] message received : I am alive
    /127.0.0.1:57700超时事件:写空闲
    /127.0.0.1:57700超时事件:读写空闲
    /127.0.0.1:57700超时事件:读空闲
    /127.0.0.1:57700超时事件:写空闲
    /127.0.0.1:57700超时事件:读写空闲
    /127.0.0.1:57700超时事件:读空闲
     [server]读空闲超过3次,关闭连接
    

    client端:

     client sent msg and sleep 2
     client received :copy that
     client received :copy that
     client sent msg and sleep 6
     client sent msg and sleep 6
     client received :copy that
     client received :you are out
     server closed connection , so client will close too
    
    Process finished with exit code 0
    

    通过上面的运行日志,我们可以看到:

    1.客户端在与服务器成功建立之后,发送了3次'I am alive',服务端也回应了3次:'copy that'

    2.由于客户端消极怠工,超时了多次,服务端关闭了链接。

    3.客户端知道服务端抛弃自己之后,也关闭了连接,程序退出。

    以上简单了演示了一下,netty的心跳机制,其实主要就是使用了IdleStateHandler源码下载


    使用Netty实现HTTP服务器
    Netty实现心跳机制
    Netty开发redis客户端,Netty发送redis命令,netty解析redis消息
    Netty系列

    spring如何启动的?这里结合spring源码描述了启动过程
    SpringMVC是怎么工作的,SpringMVC的工作原理
    spring 异常处理。结合spring源码分析400异常处理流程及解决方法
    Mybatis Mapper接口是如何找到实现类的-源码分析
    alt 我的公众号

  • 相关阅读:
    charles使用
    断言
    JDBC Request
    HTTP请求建立一个测试计划
    利用badboy进行脚本录制
    接口测试用例
    Monkey常用命令
    charles安装与使用
    celery配置与基本使用
    图片验证码接口
  • 原文地址:https://www.cnblogs.com/demingblog/p/9957143.html
Copyright © 2011-2022 走看看