zoukankan      html  css  js  c++  java
  • netty实现长连接心跳检

    主要逻辑

    使用netty实现长连接,主要靠心跳来维持服务器端及客户端连接。

    实现的逻辑主要是:

    服务器端方面

    1, 服务器在网络空闲操作一定时间后,服务端失败心跳计数器加1。

    2, 如果收到客户端的ping心跳包,则清零失败心跳计数器,如果连续n次未收到客户端的ping心跳包,则关闭链路,释放资源,等待客户端重连。


    客户端方面

    1, 客户端网络空闲在一定时间内没有进行写操作时,则发送一个ping心跳包。

    2, 如果服务器端未在发送下一个心跳包之前回复pong心跳应答包,则失败心跳计数器加1。

    3, 如果客户端连续发送n(此处根据具体业务进行定义)次ping心跳包,服务器端均未回复pong心跳应答包,则客户端断开连接,间隔一定时间进行重连操作,直至连接服务器成功。

    环境:netty5,tomcat7,jdk7,myeclipse

    服务器端心跳处理类:

    [java] view plain copy
     
    1. public class HeartBeatRespHandler extends ChannelInboundHandlerAdapter {   
    2.     private  final Logger log=Logger.getLogger(HeartBeatRespHandler.class);  
    3.        //线程安全心跳失败计数器  
    4.        private AtomicInteger unRecPingTimes = new AtomicInteger(1);  
    5.        @Override  
    6.        public void channelRead(ChannelHandlerContext ctx, Object msg)    
    7.                 throws Exception {    
    8.            NettyMessageProto message = (NettyMessageProto)msg;  
    9.            unRecPingTimes = new AtomicInteger(1);  
    10.            //接收客户端心跳信息  
    11.            if(message.getHeader() != null  && message.getHeader().getType() == Constants.MSGTYPE_HEARTBEAT_REQUEST){  
    12.                 //清零心跳失败计数器  
    13.                 log.info("server receive client"+ctx.channel().attr(SysConst.SERIALNO_KEY)+" ping msg :---->"+message);  
    14.                 //接收客户端心跳后,进行心跳响应  
    15.                 NettyMessageProto replyMsg = buildHeartBeat();  
    16.                 ctx.writeAndFlush(replyMsg);  
    17.             }else{  
    18.                 ctx.fireChannelRead(msg);  
    19.             }  
    20.         }  
    21.          
    22.          
    23.         /** 
    24.          * 事件触发器,该处用来处理客户端空闲超时,发送心跳维持连接。 
    25.          */  
    26.         @Override  
    27.         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {    
    28.             if (evt instanceof IdleStateEvent) {    
    29.                 IdleStateEvent event = (IdleStateEvent) evt;    
    30.                 if (event.state() == IdleState.READER_IDLE) {    
    31.                     /*读超时*/    
    32.                     log.info("===服务器端===(READER_IDLE 读超时)");  
    33.                     unRecPingTimes.getAndIncrement();   
    34.                   //客户端未进行ping心跳发送的次数等于3,断开此连接  
    35.                     if(unRecPingTimes.intValue() == 3){    
    36.                           
    37.                           ctx.disconnect();  
    38.                           System.out.println("此客户端连接超时,服务器主动关闭此连接....");  
    39.                           log.info("此客户端连接超时,服务器主动关闭此连接....");  
    40.                     }   
    41.                 } else if (event.state() == IdleState.WRITER_IDLE) {    
    42.                     /*服务端写超时*/       
    43.                     log.info("===服务器端===(WRITER_IDLE 写超时)");  
    44.                       
    45.                 } else if (event.state() == IdleState.ALL_IDLE) {    
    46.                     /*总超时*/    
    47.                     log.info("===服务器端===(ALL_IDLE 总超时)");    
    48.                 }    
    49.             }    
    50.         }  
    51.           
    52.          
    53.        /** 
    54.         * 创建心跳响应消息 
    55.         * @return 
    56.         */  
    57.        private NettyMessageProto buildHeartBeat(){  
    58.            HeaderProto header = HeaderProto.newBuilder().setType(Constants.MSGTYPE_HEARTBEAT_RESPONSE).build();  
    59.            NettyMessageProto message =NettyMessageProto.newBuilder().setHeader(header).build();  
    60.            return message;  
    61.        }  

    客户端心跳处理类:
    [java] view plain copy
     
    1. public class HeartBeatReqHandler extends ChannelHandlerAdapter {  
    2.     private  final Logger log=Logger.getLogger(HeartBeatReqHandler.class);  
    3.       
    4.     //线程安全心跳失败计数器  
    5.     private AtomicInteger unRecPongTimes = new AtomicInteger(1);  
    6.       
    7.     public void channelRead(ChannelHandlerContext ctx, Object msg)    
    8.             throws Exception {    
    9.         NettyMessageProto message = (NettyMessageProto)msg;    
    10.         //服务器端心跳回复  
    11.         if(message.getHeader() != null  && message.getHeader().getType() == Constants.MSGTYPE_HEARTBEAT_RESPONSE){  
    12.             //如果服务器进行pong心跳回复,则清零失败心跳计数器  
    13.             unRecPongTimes = new AtomicInteger(1);  
    14.             log.debug("client receive server pong msg :---->"+message);  
    15.         }else{  
    16.             ctx.fireChannelRead(msg);  
    17.         }  
    18.     }    
    19.       
    20.     /** 
    21.      * 事件触发器,该处用来处理客户端空闲超时,发送心跳维持连接。 
    22.      */  
    23.     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {    
    24.         if (evt instanceof IdleStateEvent) {    
    25.             IdleStateEvent event = (IdleStateEvent) evt;    
    26.             if (event.state() == IdleState.READER_IDLE) {    
    27.                 /*读超时*/    
    28.                 log.info("===客户端===(READER_IDLE 读超时)");  
    29.             } else if (event.state() == IdleState.WRITER_IDLE) {    
    30.                 /*客户端写超时*/       
    31.                 log.info("===客户端===(WRITER_IDLE 写超时)");  
    32.                 unRecPongTimes.getAndIncrement();    
    33.                 //服务端未进行pong心跳响应的次数小于3,则进行发送心跳,否则则断开连接。  
    34.                 if(unRecPongTimes.intValue() < 3){    
    35.                     //发送心跳,维持连接  
    36.                     ctx.channel().writeAndFlush(buildHeartBeat()) ;   
    37.                     log.info("客户端:发送心跳");  
    38.                 }else{    
    39.                     ctx.channel().close();    
    40.                 }    
    41.             } else if (event.state() == IdleState.ALL_IDLE) {    
    42.                 /*总超时*/    
    43.                 log.info("===客户端===(ALL_IDLE 总超时)");    
    44.             }    
    45.         }    
    46.     }  
    47.           
    48.     private NettyMessageProto buildHeartBeat(){  
    49.         HeaderProto header = HeaderProto.newBuilder().setType(Constants.MSGTYPE_HEARTBEAT_REQUEST).build();  
    50.         NettyMessageProto  message = NettyMessageProto.newBuilder().setHeader(header).build();  
    51.         return message;  
    52.     }  
    53.       
    54.     /** 
    55.      * 异常处理 
    56.      */  
    57.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception{  
    58.         ctx.fireExceptionCaught(cause);  
    59.     }  
    60.   
    61. }  


    [java] view plain copy
     
    在CODE上查看代码片派生到我的代码片
    1. <pre code_snippet_id="2489110" snippet_file_name="blog_20170719_2_6056366" name="code" class="java"><pre code_snippet_id="2489110" snippet_file_name="blog_20170719_2_6056366"></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre></pre>  
    2. <pre></pre>  
    3. <pre></pre>  
    4. <pre></pre>  
    5. <pre></pre>  
    6. <link rel="stylesheet" href="http://static.blog.csdn.net/public/res-min/markdown_views.css?v=1.0">  
    7.                         
     
     
  • 相关阅读:
    使用RecyclerView打造Gallery
    Retrofit简介与使用方法(翻译)
    迷宫实现递归版本C++
    牛客笔试题
    牛客笔试题---求最长重复词长度之和
    C++句柄解析
    C++双向循环链表实现
    String C++完整实现。
    String写时拷贝实现
    顺序表操作补充(查找方法增加)
  • 原文地址:https://www.cnblogs.com/austinspark-jessylu/p/7324982.html
Copyright © 2011-2022 走看看