zoukankan      html  css  js  c++  java
  • Netty心跳简单Demo

    前面简单地了解了一下IdleStateHandler,我们现在写一个简单的心跳demo:

    1)服务器端每隔5秒检测服务器端的读超时,如果5秒没有接受到客户端的写请求,也就说服务器端5秒没有收到读事件,则视为一次超时

    2)如果超时二次则说明连接处于不活跃的状态,关闭ServerChannel

    3)客户端每隔4秒发送一些写请求,这个请求相当于一次心跳包,告之服务器端:客户端仍旧活着

    我们开始先开始写服务器端的handler,继承ChannelInboundHandlerAdapter,我们先重写userEventTriggered方法,这个方法我们前面讲过,如果超时则会触发相应的超时事件

    HeartBeatServerHandler.java

    [java] view plain copy
     
    1. package com.lyncc.netty.heartbeats;  
    2.   
    3. import io.netty.channel.ChannelHandlerContext;  
    4. import io.netty.channel.ChannelInboundHandlerAdapter;  
    5. import io.netty.handler.timeout.IdleState;  
    6. import io.netty.handler.timeout.IdleStateEvent;  
    7.   
    8. public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {  
    9.   
    10.     private int loss_connect_time = 0;  
    11.   
    12.     @Override  
    13.     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {  
    14.         if (evt instanceof IdleStateEvent) {  
    15.             IdleStateEvent event = (IdleStateEvent) evt;  
    16.             if (event.state() == IdleState.READER_IDLE) {  
    17.                 loss_connect_time++;  
    18.                 System.out.println("5 秒没有接收到客户端的信息了");  
    19.                 if (loss_connect_time > 2) {  
    20.                     System.out.println("关闭这个不活跃的channel");  
    21.                     ctx.channel().close();  
    22.                 }  
    23.             }  
    24.         } else {  
    25.             super.userEventTriggered(ctx, evt);  
    26.         }  
    27.     }  
    28.   
    29.     @Override  
    30.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
    31.         System.out.println("server channelRead..");  
    32.         System.out.println(ctx.channel().remoteAddress() + "->Server :" + msg.toString());  
    33.     }  
    34.   
    35.     @Override  
    36.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {  
    37.         cause.printStackTrace();  
    38.         ctx.close();  
    39.     }  
    40.   
    41. }  



    再写一下服务器端,我们要注意的是,我们要在channelPipeline中加入IdleStateHandler,我们在handler中提示的是5秒读,所以我们配置的是:


    这样就可以每隔5秒检测一下服务端的读超时。完整代码清单如下:

    [java] view plain copy
     
    1. package com.lyncc.netty.heartbeats;  
    2.   
    3. import io.netty.bootstrap.ServerBootstrap;  
    4. import io.netty.channel.ChannelFuture;  
    5. import io.netty.channel.ChannelInitializer;  
    6. import io.netty.channel.ChannelOption;  
    7. import io.netty.channel.EventLoopGroup;  
    8. import io.netty.channel.nio.NioEventLoopGroup;  
    9. import io.netty.channel.socket.SocketChannel;  
    10. import io.netty.channel.socket.nio.NioServerSocketChannel;  
    11. import io.netty.handler.codec.string.StringDecoder;  
    12. import io.netty.handler.codec.string.StringEncoder;  
    13. import io.netty.handler.logging.LogLevel;  
    14. import io.netty.handler.logging.LoggingHandler;  
    15. import io.netty.handler.timeout.IdleStateHandler;  
    16.   
    17. import java.net.InetSocketAddress;  
    18. import java.util.concurrent.TimeUnit;  
    19.   
    20. public class HeartBeatServer {  
    21.       
    22. private int port;  
    23.       
    24.     public HeartBeatServer(int port) {  
    25.         this.port = port;  
    26.     }  
    27.       
    28.     public void start(){  
    29.         EventLoopGroup bossGroup = new NioEventLoopGroup(1);  
    30.         EventLoopGroup workerGroup = new NioEventLoopGroup();  
    31.         try {  
    32.             ServerBootstrap sbs = new ServerBootstrap().group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO)).localAddress(new InetSocketAddress(port))  
    33.                     .childHandler(new ChannelInitializer<SocketChannel>() {  
    34.                         protected void initChannel(SocketChannel ch) throws Exception {  
    35.                             ch.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));  
    36.                             ch.pipeline().addLast("decoder", new StringDecoder());  
    37.                             ch.pipeline().addLast("encoder", new StringEncoder());  
    38.                             ch.pipeline().addLast(new HeartBeatServerHandler());  
    39.                         };  
    40.                           
    41.                     }).option(ChannelOption.SO_BACKLOG, 128)     
    42.                     .childOption(ChannelOption.SO_KEEPALIVE, true);  
    43.              // 绑定端口,开始接收进来的连接  
    44.              ChannelFuture future = sbs.bind(port).sync();    
    45.                
    46.              System.out.println("Server start listen at " + port );  
    47.              future.channel().closeFuture().sync();  
    48.         } catch (Exception e) {  
    49.             bossGroup.shutdownGracefully();  
    50.             workerGroup.shutdownGracefully();  
    51.         }  
    52.     }  
    53.       
    54.     public static void main(String[] args) throws Exception {  
    55.         int port;  
    56.         if (args.length > 0) {  
    57.             port = Integer.parseInt(args[0]);  
    58.         } else {  
    59.             port = 8080;  
    60.         }  
    61.         new HeartBeatServer(port).start();  
    62.     }  
    63.   
    64. }  

    HeartBeatClientHandler.java方法也重写userEventTriggered方法,因为客户端没有任何写的情况,所以我们可以每次都能进行写超时:

    也就说这个方法每隔4秒都能触发:

    红色边框代码在客户端没有写事件的时候,一超时就会触发写请求:

    完整代码如下:

    HeartBeatClientHandler.java

    [java] view plain copy
     
    1. package com.lyncc.netty.heartbeats;  
    2.   
    3. import java.util.Date;  
    4.   
    5. import io.netty.buffer.ByteBuf;  
    6. import io.netty.buffer.Unpooled;  
    7. import io.netty.channel.ChannelHandlerContext;  
    8. import io.netty.channel.ChannelInboundHandlerAdapter;  
    9. import io.netty.handler.timeout.IdleState;  
    10. import io.netty.handler.timeout.IdleStateEvent;  
    11. import io.netty.util.CharsetUtil;  
    12. import io.netty.util.ReferenceCountUtil;  
    13.   
    14. public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {  
    15.   
    16.       
    17.     private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",  
    18.             CharsetUtil.UTF_8));  
    19.       
    20.     private static final int TRY_TIMES = 3;  
    21.       
    22.     private int currentTime = 0;  
    23.       
    24.     @Override  
    25.     public void channelActive(ChannelHandlerContext ctx) throws Exception {  
    26.         System.out.println("激活时间是:"+new Date());  
    27.         System.out.println("HeartBeatClientHandler channelActive");  
    28.         ctx.fireChannelActive();  
    29.     }  
    30.   
    31.     @Override  
    32.     public void channelInactive(ChannelHandlerContext ctx) throws Exception {  
    33.         System.out.println("停止时间是:"+new Date());  
    34.         System.out.println("HeartBeatClientHandler channelInactive");  
    35.     }  
    36.   
    37.     @Override  
    38.     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {  
    39.         System.out.println("循环触发时间:"+new Date());  
    40.         if (evt instanceof IdleStateEvent) {  
    41.             IdleStateEvent event = (IdleStateEvent) evt;  
    42.             if (event.state() == IdleState.WRITER_IDLE) {  
    43.                 if(currentTime <= TRY_TIMES){  
    44.                     System.out.println("currentTime:"+currentTime);  
    45.                     currentTime++;  
    46.                     ctx.channel().writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());  
    47.                 }  
    48.             }  
    49.         }  
    50.     }  
    51.   
    52.     @Override  
    53.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
    54.         String message = (String) msg;  
    55.         System.out.println(message);  
    56.         if (message.equals("Heartbeat")) {  
    57.             ctx.write("has read message from server");  
    58.             ctx.flush();  
    59.         }  
    60.         ReferenceCountUtil.release(msg);  
    61.     }  
    62. }  

    HeartBeatsClient.java

    客户端代码也要加入IdleStateHandler这个handler,注意的是,我们要注意的是写超时,所以要设置写超时的时间,因为服务器端是5秒检测读超时,所以客户端必须在5秒内发送一次心跳,告之服务端,所以我们设置4秒:

    完整代码如下:

    [java] view plain copy
     
    1. package com.lyncc.netty.heartbeats;  
    2.   
    3. import java.util.concurrent.TimeUnit;  
    4.   
    5. import io.netty.bootstrap.Bootstrap;  
    6. import io.netty.channel.ChannelFuture;  
    7. import io.netty.channel.ChannelInitializer;  
    8. import io.netty.channel.ChannelOption;  
    9. import io.netty.channel.ChannelPipeline;  
    10. import io.netty.channel.EventLoopGroup;  
    11. import io.netty.channel.nio.NioEventLoopGroup;  
    12. import io.netty.channel.socket.SocketChannel;  
    13. import io.netty.channel.socket.nio.NioSocketChannel;  
    14. import io.netty.handler.codec.string.StringDecoder;  
    15. import io.netty.handler.codec.string.StringEncoder;  
    16. import io.netty.handler.logging.LogLevel;  
    17. import io.netty.handler.logging.LoggingHandler;  
    18. import io.netty.handler.timeout.IdleStateHandler;  
    19.   
    20. public class HeartBeatsClient {  
    21.   
    22.     public void connect(int port, String host) throws Exception {  
    23.      // Configure the client.  
    24.         EventLoopGroup group = new NioEventLoopGroup();  
    25.         try {  
    26.             Bootstrap b = new Bootstrap();  
    27.             b.group(group)  
    28.              .channel(NioSocketChannel.class)  
    29.              .option(ChannelOption.TCP_NODELAY, true)  
    30.              .handler(new LoggingHandler(LogLevel.INFO))  
    31.              .handler(new ChannelInitializer<SocketChannel>() {  
    32.                  @Override  
    33.                  public void initChannel(SocketChannel ch) throws Exception {  
    34.                      ChannelPipeline p = ch.pipeline();  
    35.                      p.addLast("ping", new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));  
    36.                      p.addLast("decoder", new StringDecoder());  
    37.                      p.addLast("encoder", new StringEncoder());  
    38.                      p.addLast(new HeartBeatClientHandler());  
    39.                  }  
    40.              });  
    41.   
    42.             ChannelFuture future = b.connect(host, port).sync();  
    43.             future.channel().closeFuture().sync();  
    44.         } finally {  
    45.             group.shutdownGracefully();  
    46.         }  
    47.     }  
    48.   
    49.     /** 
    50.      * @param args 
    51.      * @throws Exception 
    52.      */  
    53.     public static void main(String[] args) throws Exception {  
    54.         int port = 8080;  
    55.         if (args != null && args.length > 0) {  
    56.             try {  
    57.                 port = Integer.valueOf(args[0]);  
    58.             } catch (NumberFormatException e) {  
    59.                 // 采用默认值  
    60.             }  
    61.         }  
    62.         new HeartBeatsClient().connect(port, "127.0.0.1");  
    63.     }  
    64.   
    65. }  

    我们先启动服务器端:

    再启动客户端:

    此时客户端还存活着,我们看看服务器端的输出:

    我们再看看客户端的输出:

    inactive的事件触发了,且客户端自动停止了~

  • 相关阅读:
    和大家分享下我的找工作历程。
    Traits 编程技法+模板偏特化+template参数推导+内嵌型别编程技巧
    SGI STL空间配置器和内存池
    调试Release版本应用程序
    HawkNL 源码剖析
    C++:float 转型到 std::string 之总结。
    将一个数上调至2^n的倍数《参考STL源码》
    JavaScript中Ajax的使用
    jQuery之noConflict() 方法
    Linq to Object 的简单使用示例
  • 原文地址:https://www.cnblogs.com/duan2/p/8919520.html
Copyright © 2011-2022 走看看