zoukankan      html  css  js  c++  java
  • 【Netty】利用Netty实现心跳检测和重连机制

    一、前言

      心跳机制是定时发送一个自定义的结构体(心跳包),让对方知道自己还活着,以确保连接的有效性的机制。
      我们用到的很多框架都用到了心跳检测,比如服务注册到 Eureka Server 之后会维护一个心跳连接,告诉 Eureka Server 自己还活着。本文就是利用 Netty 来实现心跳检测,以及客户端重连。

    二、设计思路

    1. 分为客户端和服务端
    2. 建立连接后,客户端先发送一个消息询问服务端是否可以进行通信了。
    3. 客户端收到服务端 Yes 的应答后,主动发送心跳消息,服务端接收到心跳消息后,返回心跳应答,周而复始。
    4. 心跳超时利用 Netty 的 ReadTimeOutHandler 机制,当一定周期内(默认值50s)没有读取到对方任何消息时,需要主动关闭链路。如果是客户端,重新发起连接。
    5. 为了避免出现粘/拆包问题,使用 DelimiterBasedFrameDecoderStringDecoder 来处理消息。

    三、编码

    1. 先编写客户端 NettyClient
    1. public class NettyClient
    2.  
    3. private static final String HOST = "127.0.0.1"
    4.  
    5. private static final int PORT = 9911
    6.  
    7. private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); 
    8.  
    9. EventLoopGroup group = new NioEventLoopGroup(); 
    10.  
    11.  
    12. private void connect(String host,int port)
    13. try
    14. Bootstrap b = new Bootstrap(); 
    15. b.group(group) 
    16. .channel(NioSocketChannel.class) 
    17. .option(ChannelOption.TCP_NODELAY,true
    18. .remoteAddress(new InetSocketAddress(host,port)) 
    19. .handler(new ChannelInitializer<SocketChannel>() { 
    20. @Override 
    21. protected void initChannel(SocketChannel ch) throws Exception
    22. ByteBuf delimiter = Unpooled.copiedBuffer("$_", CharsetUtil.UTF_8); 
    23. ch.pipeline() 
    24. .addLast(new DelimiterBasedFrameDecoder(1024,delimiter)) 
    25. .addLast(new StringDecoder()) 
    26. // 当一定周期内(默认50s)没有收到对方任何消息时,需要主动关闭链接 
    27. .addLast("readTimeOutHandler",new ReadTimeoutHandler(50)) 
    28. .addLast("heartBeatHandler",new HeartBeatReqHandler()); 
    29. }); 
    30. // 发起异步连接操作 
    31. ChannelFuture future = b.connect().sync(); 
    32. future.channel().closeFuture().sync(); 
    33. }catch (Exception e){ 
    34. e.printStackTrace(); 
    35. }finally
    36. // 所有资源释放完之后,清空资源,再次发起重连操作 
    37. executor.execute(()->{ 
    38. try
    39. TimeUnit.SECONDS.sleep(5); 
    40. //发起重连操作 
    41. connect(NettyClient.HOST,NettyClient.PORT); 
    42. } catch (InterruptedException e) { 
    43. e.printStackTrace(); 
    44. }); 
    45.  
    46. public static void main(String[] args)
    47. new NettyClient().connect(NettyClient.HOST,NettyClient.PORT); 
    48.  

    这里稍微复杂点的就是38行开始的重连部分。
    2. 心跳消息发送类 HeartBeatReqHandler

    1. package cn.sp.heartbeat; 
    2.  
    3. import io.netty.buffer.Unpooled; 
    4. import io.netty.channel.ChannelHandler; 
    5. import io.netty.channel.ChannelHandlerContext; 
    6. import io.netty.channel.SimpleChannelInboundHandler; 
    7.  
    8. import java.util.concurrent.ScheduledFuture; 
    9. import java.util.concurrent.TimeUnit; 
    10.  
    11. /** 
    12. * Created by 2YSP on 2019/5/23. 
    13. */ 
    14. @ChannelHandler.Sharable 
    15. public class HeartBeatReqHandler extends SimpleChannelInboundHandler<String>
    16.  
    17. private volatile ScheduledFuture<?> heartBeat; 
    18.  
    19. private static final String hello = "start notify with server$_"
    20.  
    21. @Override 
    22. public void channelActive(ChannelHandlerContext ctx) throws Exception
    23. ctx.writeAndFlush(Unpooled.copiedBuffer(hello.getBytes())); 
    24. System.out.println("================"); 
    25.  
    26. @Override 
    27. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
    28. if (heartBeat != null){ 
    29. heartBeat.cancel(true); 
    30. heartBeat = null
    31. ctx.fireExceptionCaught(cause); 
    32.  
    33. @Override 
    34. protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception
    35. if ("ok".equalsIgnoreCase(msg)){ 
    36. //服务端返回ok开始心跳 
    37. heartBeat = ctx.executor().scheduleAtFixedRate(new HeartBeatTask(ctx),0,5000, TimeUnit.MILLISECONDS); 
    38. }else
    39. System.out.println("Client receive server heart beat message : --->"+msg); 
    40.  
    41. private class HeartBeatTask implements Runnable
    42.  
    43. private final ChannelHandlerContext ctx; 
    44.  
    45. public HeartBeatTask(ChannelHandlerContext ctx)
    46. this.ctx = ctx; 
    47.  
    48.  
    49. @Override 
    50. public void run()
    51. String heartBeat = "I am ok"
    52. System.out.println("Client send heart beat message to server: ----->"+heartBeat); 
    53. ctx.writeAndFlush(Unpooled.copiedBuffer((heartBeat+"$_").getBytes())); 
    54.  

    channelActive()方法在首次建立连接后向服务端问好,如果服务端返回了 "ok" 就创建一个线程每隔5秒发送一次心跳消息。如果发生了异常,就取消定时任务并将其设置为 null,等待 GC 回收。
    3. 服务端 NettyServer

    1. public class NettyServer
    2.  
    3. public static void main(String[] args)
    4. new NettyServer().bind(9911); 
    5.  
    6. private void bind(int port)
    7. EventLoopGroup group = new NioEventLoopGroup(); 
    8. try
    9. ServerBootstrap b = new ServerBootstrap(); 
    10. b.group(group) 
    11. .channel(NioServerSocketChannel.class) 
    12. .childHandler(new ChannelInitializer<SocketChannel>() { 
    13. @Override 
    14. protected void initChannel(SocketChannel ch) throws Exception
    15. ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); 
    16.  
    17. ch.pipeline() 
    18. .addLast(new DelimiterBasedFrameDecoder(1024,delimiter)) 
    19. .addLast(new StringDecoder()) 
    20. .addLast("readTimeOutHandler",new ReadTimeoutHandler(50)) 
    21. .addLast("HeartBeatHandler",new HeartBeatRespHandler()); 
    22. }); 
    23. // 绑定端口,同步等待成功 
    24. b.bind(port).sync(); 
    25. System.out.println("Netty Server start ok ...."); 
    26. }catch (Exception e){ 
    27. e.printStackTrace(); 
    1. 心跳响应类 HeartBeatRespHandler
    1. package cn.sp.heartbeat; 
    2.  
    3. import io.netty.buffer.Unpooled; 
    4. import io.netty.channel.ChannelHandler; 
    5. import io.netty.channel.ChannelHandlerContext; 
    6. import io.netty.channel.SimpleChannelInboundHandler; 
    7.  
    8. /** 
    9. * Created by 2YSP on 2019/5/23. 
    10. */ 
    11. @ChannelHandler.Sharable 
    12. public class HeartBeatRespHandler extends SimpleChannelInboundHandler<String>
    13.  
    14. private static final String resp = "I have received successfully$_"
    15.  
    16. @Override 
    17. protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception
    18. if (msg.equals("start notify with server")){ 
    19. ctx.writeAndFlush(Unpooled.copiedBuffer("ok$_".getBytes())); 
    20. }else
    21. //返回心跳应答信息 
    22. System.out.println("Receive client heart beat message: ---->"+ msg); 
    23. ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes())); 
    24.  
    25.  

    第一次告诉客户端我已经准备好了,后面打印客户端发过来的信息并告诉客户端我已经收到你的消息了。

    四、测试

    启动服务端再启动客户端,可以看到心跳检测正常,如下图。

     

    服务端控制台
    服务端控制台

     

     

    客户端控制台
    客户端控制台

    现在让服务端宕机一段时间,看客户端能否重连并开始正常工作。

     

    关闭服务端后,客户端周期性的连接失败,控制台输出如图:

     

    连接失败
    连接失败

    重新启动服务端,过一会儿发现重连成功了。

     

     

    成功重连
    成功重连

     

    五、总结

    总得来说,使用 Netty 实现心跳检测还是比较简单的,这里比较懒没有使用其他序列化协议(如 ProtoBuf 等),如果感兴趣的话大家可以自己试试。
    代码地址,点击这里
    有篇SpringBoot 整合长连接心跳机制的文章写的也很不错,地址https://crossoverjie.top/2018/05/24/netty/Netty(1)TCP-Heartbeat/

  • 相关阅读:
    更改THttpClientSocket连接超时时间
    咏南跨平台中间件REST API
    INDY10 BASE64编码
    HTTP协议之multipart/form-data
    WWF3动态修改工作流<第九篇>
    WWF3自定义活动<第八篇>
    WWF3追踪功能<WWF第六篇>
    WWF3状态机工作流<WWF第七篇>
    WWF3的持续化<第五篇>
    WWF3事务和异常处理类型活动<第四篇>
  • 原文地址:https://www.cnblogs.com/2YSP/p/10917664.html
Copyright © 2011-2022 走看看