zoukankan      html  css  js  c++  java
  • 基于Netty的IdleStateHandler实现Mqtt心跳

    基于Netty的IdleStateHandler实现Mqtt心跳

    IdleStateHandler解析

    最近研究jetlinks编写的基于Nettymqtt-client(https://github.com/jetlinks/netty-mqtt-client),总结若干知识点.
    Netty中,实现心跳机制较为简单,主要依赖于IdleStateHandler判断channel的读写超时.

        /**
         * Creates a new instance firing {@link IdleStateEvent}s.
         *
         * @param readerIdleTimeSeconds
         *        an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
         *        will be triggered when no read was performed for the specified
         *        period of time.  Specify {@code 0} to disable.
         * @param writerIdleTimeSeconds
         *        an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
         *        will be triggered when no write was performed for the specified
         *        period of time.  Specify {@code 0} to disable.
         * @param allIdleTimeSeconds
         *        an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
         *        will be triggered when neither read nor write was performed for
         *        the specified period of time.  Specify {@code 0} to disable.
         */
        public IdleStateHandler(
                int readerIdleTimeSeconds,
                int writerIdleTimeSeconds,
                int allIdleTimeSeconds) {
    
            this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
                 TimeUnit.SECONDS);
        }
    

    以上是IdleStateHandler的构造函数,主要依赖于三个参数readerIdleTimeSeconds,writerIdleTimeSeconds以及allIdleTimeSeconds.

    如果难于理解英文注释,可参考<<浅析 Netty 实现心跳机制与断线重连>>https://segmentfault.com/a/1190000006931568一文中的解释:

    • readerIdleTimeSeconds, 读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的 IdleStateEvent 事件.
    • writerIdleTimeSeconds, 写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的 IdleStateEvent 事件.
    • allIdleTimeSeconds, 读/写超时. 即当在指定的时间间隔内没有读或写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件.

    IdleStateHandler中,分别通过如下函数实现对channel读写操作事件的跟踪:

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
                reading = true;
                firstReaderIdleEvent = firstAllIdleEvent = true;
            }
            ctx.fireChannelRead(msg);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
                lastReadTime = ticksInNanos();
                reading = false;
            }
            ctx.fireChannelReadComplete();
        }
    
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            // Allow writing with void promise if handler is only configured for read timeout events.
            if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
                ctx.write(msg, promise.unvoid()).addListener(writeListener);
            } else {
                ctx.write(msg, promise);
            }
        }
    
        // Not create a new ChannelFutureListener per write operation to reduce GC pressure.
        private final ChannelFutureListener writeListener = new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                lastWriteTime = ticksInNanos();
                firstWriterIdleEvent = firstAllIdleEvent = true;
            }
        };
    

    其中:

    • channelRead: 判断channel是否有数据可读取;
    • channelReadComplete: 判断channel是否有数据可读取;
    • write: 判断channel是否有数据写(通过writeListener判断当前写操作是否执行成功).

    IdleStateHandlerchannel激活或注册时,会执行initialize函数,根据读写超时时间创建对应的定时任务.

        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            // Initialize early if channel is active already.
            if (ctx.channel().isActive()) {
                initialize(ctx);
            }
            super.channelRegistered(ctx);
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            // This method will be invoked only if this handler was added
            // before channelActive() event is fired.  If a user adds this handler
            // after the channelActive() event, initialize() will be called by beforeAdd().
            initialize(ctx);
            super.channelActive(ctx);
        }
    
            private void initialize(ChannelHandlerContext ctx) {
            // Avoid the case where destroy() is called before scheduling timeouts.
            // See: https://github.com/netty/netty/issues/143
            switch (state) {
            case 1:
            case 2:
                return;
            }
    
            state = 1;
            initOutputChanged(ctx);
    
            lastReadTime = lastWriteTime = ticksInNanos();
            if (readerIdleTimeNanos > 0) {
                // 创建读超时判断定时任务
                readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                        readerIdleTimeNanos, TimeUnit.NANOSECONDS);
            }
            if (writerIdleTimeNanos > 0) {
                // 创建写超时判断定时任务
                writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                        writerIdleTimeNanos, TimeUnit.NANOSECONDS);
            }
            if (allIdleTimeNanos > 0) {
                // 创建读写超时判断定时任务
                allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                        allIdleTimeNanos, TimeUnit.NANOSECONDS);
            }
        }
    

    此处,我们将剖析AllIdleTimeoutTask任务.
    此任务,会判断在超时时间段内,是否有读写操作:

    • 有读或者写操作,则重新创建定时任务,等待下次执行;
    • 没有读或者写操作,则创建IdleStateEvent对象,通过ChannelHandlerContext通知注册了用户事件触发器的handler(即handler重载了userEventTriggered函数).
      private final class AllIdleTimeoutTask extends AbstractIdleTask {
    
            AllIdleTimeoutTask(ChannelHandlerContext ctx) {
                super(ctx);
            }
    
            @Override
            protected void run(ChannelHandlerContext ctx) {
    
                long nextDelay = allIdleTimeNanos;
                if (!reading) {
                    nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
                }
                if (nextDelay <= 0) {
                    // Both reader and writer are idle - set a new timeout and
                    // notify the callback.
                    allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
    
                    boolean first = firstAllIdleEvent;
                    firstAllIdleEvent = false;
    
                    try {
                        if (hasOutputChanged(ctx, first)) {
                            return;
                        }
    
                        IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
                        channelIdle(ctx, event);
                    } catch (Throwable t) {
                        ctx.fireExceptionCaught(t);
                    }
                } else {
                    // Either read or write occurred before the timeout - set a new
                    // timeout with shorter delay.
                    allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
                }
            }
        }
    

    了解了IdleStateHandler,我们接下来学习如何编写Mqtt的心跳handler.

    Mqtt心跳handler

    以下是jetlinks编写的Mqtt心跳handler代码,我们截取部分代码学习.

    final class MqttPingHandler extends ChannelInboundHandlerAdapter {
    
        private final int keepaliveSeconds;
    
        private ScheduledFuture<?> pingRespTimeout;
    
        MqttPingHandler(int keepaliveSeconds) {
            this.keepaliveSeconds = keepaliveSeconds;
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (!(msg instanceof MqttMessage)) {
                ctx.fireChannelRead(msg);
                return;
            }
            MqttMessage message = (MqttMessage) msg;
            if (message.fixedHeader().messageType() == MqttMessageType.PINGREQ) {
                this.handlePingReq(ctx.channel());
            } else if (message.fixedHeader().messageType() == MqttMessageType.PINGRESP) {
                this.handlePingResp();
            } else {
                ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
            }
        }
    
        /**
         * IdleStateHandler,在连接处于idle状态超过设定时间后,会发送IdleStateEvent
         * 接收到IdleStateEvent,当前类会发送心跳包至server,保持连接
         *
         * @param ctx 上下文
         * @param evt 事件
         * @throws Exception 异常
         */
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            super.userEventTriggered(ctx, evt);
    
            // 确认监听事件为IdleStateEvent,即发送心跳包至server
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
                if (event.state() == IdleState.WRITER_IDLE) {
                    this.sendPingReq(ctx.channel());
                }
            }
        }
    
        /**
         * 发送心跳包至server端,并建立心跳超时断开连接任务
         * 此处,先行创建心跳超时任务,后续再发送心跳包(避免收到心跳响应时,心跳超时任务未建立完成)
         *
         * @param channel 连接
         */
        private void sendPingReq(Channel channel) {
    
            // 创建心跳超时,断开连接任务
            if (this.pingRespTimeout == null) {
                this.pingRespTimeout = channel.eventLoop().schedule(() -> {
                    MqttFixedHeader disconnectHeader =
                            new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
                    channel.writeAndFlush(new MqttMessage(disconnectHeader)).addListener(ChannelFutureListener.CLOSE);
                    //TODO: what do when the connection is closed ?
                }, this.keepaliveSeconds, TimeUnit.SECONDS);
            }
    
            // 创建心跳包,并发送至Mqtts Server
            MqttFixedHeader pingHeader = new MqttFixedHeader(MqttMessageType.PINGREQ, false, MqttQoS.AT_MOST_ONCE, false, 0);
            channel.writeAndFlush(new MqttMessage(pingHeader));
        }
    
        /**
         * 处理ping resp,取消ping超时任务(断开连接)
         */
        private void handlePingResp() {
            if (this.pingRespTimeout != null && !this.pingRespTimeout.isCancelled() && !this.pingRespTimeout.isDone()) {
                this.pingRespTimeout.cancel(true);
                this.pingRespTimeout = null;
            }
        }
    }
    

    函数解析:

    (1) 接收超时事件,发送心跳请求

    MqttPingHandler中重载了userEventTriggered函数,用以接收ChannelHandlerContext传递的事件,代码中会判断事件是否为IdleStateEvent.
    如果当前接收事件为IdleStateEvent,则说明当前channel在超时时间内未发生读写事件,则客户端发送Mqtt心跳请求.

    (2) 发送心跳请求,建立请求响应超时关闭连接任务

    sendPingReq函数中(以下两步操作,顺序可任意安排):

    • 建立心跳请求响应超时判断任务,如果在一定时长内未接收到心跳响应,则会关闭连接;
    • 构建Mqtt心跳包,发送至远端服务器.

    (3) 取消心跳响应超时关闭连接任务

    channelRead读取数据,判断是否是Mqtt的心跳响应包.
    如果是,则执行handlePingResp函数,取消心跳响应超时关闭连接任务.

    handler添加

        ch.pipeline().addLast("idleStateHandler",
            new IdleStateHandler(keepAliveTimeSeconds, keepAliveTimeSeconds, 0));
        ch.pipeline().addLast("mqttPingHandler",
            new MqttPingHandler(MqttClientImpl.this.clientConfig.getKeepAliveTimeSeconds()));
    

    只需要以上两句代码,就可以完成Mqtt心跳维持功能.

    PS:
    如果您觉得我的文章对您有帮助,请关注我的微信公众号,谢谢!
    程序员打怪之路

  • 相关阅读:
    PowerDesigner_15连接Oracle11g,反向工程导出模型图
    angular学习
    GoEasy消息推送
    Spring 工作原理
    JAVA解析HTML,获取待定元素属性
    设计模式之工厂方法模式
    设计模式之单例模式
    通过Java代码获取系统信息
    centos7下NAT模式下设置静态ip
    关于在Spring项目中使用thymeleaf报Exception parsing document错误
  • 原文地址:https://www.cnblogs.com/jason1990/p/11600070.html
Copyright © 2011-2022 走看看