zoukankan      html  css  js  c++  java
  • Netty — 心跳检测和断线重连

    一.前言

    由于在通信层的网络连接的不可靠性,比如:网络闪断,网络抖动等,经常会出现连接断开。这样对于使用长连接的应用而言,当突然高流量冲击势必会造成进行网络连接,从而产生网络堵塞,应用响应速度下降,延迟上升,用户体验较差。

    在通信层的高可用设计中,需要保活长连接的网络,保证通信能够正常。一般有两种设计方式:

    1. 利用TCP提供的连接保活特性
    2. 应用层做连接保活

    本文主要介绍使用netty时应用层如何做连接保活,提高应用的可用性。


    ### 二.TCP连接保活性的局限

    TCP协议层面提供了KeepAlive的机制保证连接的活跃,但是其有很多劣势:

    • 该保活机制非TCP协议的标准,默认是关闭
    • 该机制依赖操作系统,需要进行系统级配置,不够灵活方便
    • 当应用底层传输协议变更时,将无法适用

    由于以上的原因,绝大多数的框架、应用处理连接的保活性都是在应用层处理。目前的主流方案是心跳检测,断线重连


    ### 三.应用层保证连接的活跃性

    1.心跳检测

    心跳检测机制:客户端每隔一段时间发送PING消息给服务端,服务端接受到后回复PONG消息。客户端如果在一定时间内没有收到PONG响应,则认为连接断开,服务端如果在一定时间内没有收到来自客户端的PING请求,则认为连接已经断开。通过这种来回的PING-PONG消息机制侦测连接的活跃性。

    netty本身也提供了IdleStateHandler用于检测连接闲置,该Handler可以检测连接未发生读写事件而触发相应事件。

    首先编写客户端心跳检测的Handler:

    /**
     * 心跳检测:
     * 1. client发送"PING"消息
     *
     * @author huaijin
     */
    public class ClientHeartBeatHandler extends ChannelHandlerAdapter {
    
        /**
         * PING消息
         */
        private static final String PING = "0";
    
        /**
         * PONG消息
         */
        private static final String PONG = "1";
    
        /**
         * 分隔符
         */
        private static final String SPLIT = "$_";
    
        /**
         * 读取到服务端响应,如果是PONG响应,则打印。如果是非PONG响应,则传递至下一个Handler
         *
         * @param ctx 处理上下文
         * @param msg 消息
         * @throws Exception
         * @author huaijin
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (PONG.equals(msg)) {
                System.out.println("from heart bean: " + msg);
            } else {
                ctx.fireChannelRead(msg);
            }
        }
    
        /**
         * 处理触发的事件,如果是{@link IdleStateEvent},则判断是读或者是写。如果是du,则断开连接;
         * 如果是写,则发送PING消息
         *
         * @param ctx 处理上下文
         * @param evt 事件
         * @throws Exception
         * @author huaijin
         */
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
                throws Exception {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
                switch (idleStateEvent.state()) {
                    case WRITER_IDLE:
                        sendPing(ctx);
                        break;
                    case READER_IDLE:
                        System.out.println("client close connection.");
                        closeConnection(ctx);
                        break;
                    case ALL_IDLE:
                        closeConnection(ctx);
                        break;
                    default:
                        break;
                }
            }
        }
    
        /**
         * 发送PING消息
         *
         * @param ctx 上下文
         * @author huaijin
         */
        private void sendPing(ChannelHandlerContext ctx) {
            System.out.println("send heart beat: " + PING);
            ctx.writeAndFlush(Unpooled.copiedBuffer((PING + SPLIT).getBytes()));
        }
    
        /**
         * 关闭连接
         *
         * @param ctx
         * @author huaijin
         */
        private void closeConnection(ChannelHandlerContext ctx) {
            ctx.disconnect();
            ctx.close();
        }
    }
    

    然后再编写服务单心跳检测Handler:

    /**
     * 心跳检测:
     * 1. server端接受到"PING",返回"PONG"消息
     *
     * @author huaijin
     */
    public class ServerHeartBeatHandler extends ChannelHandlerAdapter {
    
        /**
         * PONG消息
         */
        private static final String PONG = "1";
    
        /**
         * PING消息
         */
        private static final String PING = "0";
    
        /**
         * 消息分隔符
         */
        private static final String SPLIT = "$_";
    
        /**
         * 如果是PING消息,则相应PONG。如果非,则传递至下个Handler
         *
         * @param ctx 上下文
         * @param msg 消息
         * @throws Exception
         * @author huaijin
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (PING.equals(msg)) {
                System.out.println("from heart beat: " + msg);
                sendPong(ctx);
            } else {
                ctx.fireChannelRead(msg);
            }
        }
    
        /**
         * 处理触发事件,如果是读事件,则关闭连接
         *
         * @param ctx 上下文
         * @param evt 事件
         * @throws Exception
         * @author huaijin
         */
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
                throws Exception {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
                if (idleStateEvent.state() == READER_IDLE) {
                    System.out.println("server close connection.");
                    closeConnection(ctx);
                }
            }
        }
    
        /**
         * 发送PONG消息
         *
         * @param ctx 上下文
         * @author huaijin
         */
        private void sendPong(ChannelHandlerContext ctx) {
            System.out.println("send heart bean: " + PONG);
            ctx.writeAndFlush(Unpooled.copiedBuffer((PONG + SPLIT).getBytes()));
        }
    
        /**
         * 关闭连接
         *
         * @param ctx 上下文
         * @author huaijin
         */
        private void closeConnection(ChannelHandlerContext ctx) {
            ctx.disconnect();
            ctx.close();
        }
    
    }
    

    通过以上的ClientHeartbeatHandler和ServerHeartBeatHandler和netty本身提供的IdleStateHandler能够完成心跳检测。

    Note:
    但是IdleStateHandler中有未读和未写的事件设置,这里需要非常着重注意。客户端的为读时间最好设置为服务端的未写时间的两倍,服务端的未读时间最好设置为客户端的未写时间的两倍。

    2.断线重连

    当心跳检测发现连接断开后,为了保证通信层的可用性,仍然需要重新连接,保证通信的可靠。对于短线重连一般有两种设计方式比较常见:

    1. 通过额外的线程定时轮循所有的连接的活跃性,如果发现其中有死连接,则执行重连
    2. 监听连接上发送的断开事件,如果发送则执行重连操作

    这里我们首先看下第一种实现方式,netty中当Bootstrap执行connect操作后,会获得ChannelFuture对象,在该对象上执行close事件的监听,如果发生了close则提交重连操作。

    public void connect(int port, String host) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
                            ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new IdleStateHandler(
                                    10, 5, 10));
                            ch.pipeline().addLast(new ClientHeartBeatHandler());
                            ch.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            ChannelFuture f = b.connect(host, port).sync();
            // 监听channel上的close事件
            f.channel().closeFuture().sync();
        } finally {
            // 提交重连操作
            executor.execute(() -> {
                try {
                    System.out.println("reconnection to: " + "127.0.0.1:8080");
                    connect(8080, "127.0.0.1");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        new EchoClient().connect(8080, "127.0.0.1");
        Thread.currentThread().join();
    }
    

    但是该种方式对于应用而言,需要每个连接都有重连的线程,这样对于资源消耗比较大。建议采用第二种情况,使用额外的单线程轮循所有的连接,检测其是否活跃。该种方式在开源框架中有应用。

     timerExecutor.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        reconnect();
                    }
                }, SCHEDULE_INTERVAL_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.SECONDS);
    

    使用Java的定时线程池,定时执行重连操作。在重连操作中将检测连接的活跃性,如果非活跃,则执行重连。

    参考

    fescar中心跳和重连的源码
    浅析 Netty 实现心跳机制与断线重连
    Netty权威指南-心跳检测机制和断连重连

  • 相关阅读:
    修改NavigationBarItem的字体大小和颜色的使用方法
    iOS 大文件断点下载
    iOS 文件下载
    UITableView优化
    iOS 应用的生命周期
    iOS RunLoop简介
    iOS 线程间的通信 (GCD)
    iOS 多线程GCD的基本使用
    iOS 多线程GCD简介
    CSS--复习之旅(一)
  • 原文地址:https://www.cnblogs.com/lxyit/p/10401400.html
Copyright © 2011-2022 走看看