zoukankan      html  css  js  c++  java
  • Mina 系列(四)之KeepAliveFilter -- 心跳检测

    Mina 系列(四)之KeepAliveFilter -- 心跳检测

    摘要: 心跳协议,对基于CS模式的系统开发来说是一种比较常见与有效的连接检测方式,最近在用MINA框架,原本自己写了一个心跳协议实现,后来突然发现MINA本身带有这样一个心跳实现,感于对框架的小小崇拜,在实践的同时研究了一下!

    MINA 本身提供了一个过滤器类: org.apache.mina.filter.keepalive.KeepAliveFilter,该过滤器用于在 IO 空闲的时候发送并且反馈心跳包(keep-alive request/response)。

    KeepAliveFilter

    KeepAliveFilter 构造器

    public KeepAliveFilter(KeepAliveMessageFactory messageFactory, IdleStatus interestedIdleStatus,
            KeepAliveRequestTimeoutHandler policy) {
        this(messageFactory, interestedIdleStatus, policy, 60, 30);
    }
    
    • KeepAvlieMessageFactory:该实例引用用于判断接受与发送的包是否是心跳包,以及心跳请求包的实现

    • IdleStatus:该过滤器所关注的空闲状态,默认认为读取空闲。即当读取通道空闲的时候发送心跳包

    • KeepAliveRequestTimeoutHandler:心跳包请求后超时无反馈情况下的处理机制,默认为 CLOSE,即关闭连接

    首先需要实现接口 KeepAliveMessageFactory。该接口中的抽象方法有:

    Modifier and Type Method and Description
    Object getRequest(IoSession session)
    Returns a (new) keep-alive request message.
    Object getResponse(IoSession session, Object request)
    Returns a (new) response message for the specified keep-alive request.
    boolean isRequest(IoSession session, Object message) Returns true if and only if the specified message is a keep-alive request message.
    boolean isResponse(IoSession session, Object message)Returns true if and only if the specified message is a keep-alive response message;
    一般来说心跳机制主要分为以下四类:
    1. active 活跃型(活跃型心跳机制):当读取通道空闲的时候发送心跳请求,一旦该心跳请求被发送,那么需要在 keepAliveRequestTimeout 时间内接收到心跳反馈,否则 KeepAliveRequestTimeoutHandler 将会被调用,当一个心跳请求包被接受到后,那么心跳反馈也会立即发出。

      KeepAliveMessageFactory 类的实现方法:

      • getRequest(IoSession session) 必须反馈 non-null
      • getResponse( IoSession session, Object request) 必须反馈 non-null
    2. semi-active 半活跃型(半活跃型心跳机制):当读取通道空闲的时候发送心跳请求,然而并不在乎心跳反馈有没有,当一个心跳请求包被接收到后,那么心跳反馈也会立即发出。

      KeepAliveMessageFactory 类的实现方法:

      • getRequest(IoSession session) 必须反馈 non-null
      • getResponse( IoSession session, Object request) 必须反馈 non-null

      心跳包请求超时后的处理机制设置为 KeepAliveRequestTimeoutHandler.NOOP(不做任何处理),KeepAliveRequestTimeoutHandler.LOG(只输出警告信息不做其他处理)

    3. passive 被动型(半活跃型心跳机制):当前 IO 不希望主动发送心跳请求,但是当接受到一个心跳请求后,那么该心跳反馈也会立即发出。

      KeepAliveMessageFactory 类的实现方法:

      • getRequest(IoSession session) 必须反馈 null
      • getResponse( IoSession session, Object request) 必须反馈 non-null
    4. deaf speaker 聋子型(聋子型心跳机制):当前IO会主动发送心跳请求,但是不想发送任何心跳反馈。

      KeepAliveMessageFactory 类的实现方法:

      • getRequest(IoSession session) 必须反馈 non-null
      • getResponse( IoSession session, Object request) 必须反馈 null

      心跳包请求超时后的处理机制设置为 KeepAliveRequestTimeoutHandler.DEAF_SPEAKER

    5. sient-listener 持续监听型(持续监听型心跳机制):既不想发送心跳请求也不想发送心跳反馈。

      KeepAliveMessageFactory 类的实现方法:

      • getRequest(IoSession session) 必须反馈 null
      • getResponse( IoSession session, Object request) 必须反馈 null
    心跳包请求超时后的处理机制

    接口 KeepAliveRequestTimeoutHandler,一般该处理主要是针对能够发送心跳请求的心跳机制。

    1. CLOSE: 关闭连接
    2. LOG:输出 警告信息
    3. NOOP:不做任何处理
    4. EXCEPTION:抛出异常
    5. DEAF_SPEAKER: 一个特殊的处理,停止当前过滤器对对心跳反馈监听,因此让过滤器丢失请求超时的侦测功能。(让其变成聋子)
    6. keepAliveRequestTimeout(KeepAliveFilter filter, IoSession session):自定义处理

    KeepAliveFilter 配制

    KeepAliveMessageFactoryImpl kamfi = new KeepAliveMessageFactoryImpl();
    KeepAliveFilter kaf = new KeepAliveFilter(kamfi, IdleStatus.READER_IDLE, KeepAliveRequestTimeoutHandler.CLOSE);
    // idle 事件回调
    kaf.setForwardEvent(true);
    // 心跳检测间隔时间
    kaf.setRequestInterval(60);
    // 心跳检测超时时间
    kaf.setRequestTimeout(30);
    
    • setForwardEvent 使用了 KeepAliveFilter 之后,IoHandlerAdapter 中的 sessionIdle 方法默认是不会再被调用的! 所以必须加入这句话 sessionIdle 才会被调用

    • setRequestInterval 设置心跳包请求时间间隔,其实对于被动型的心跳机制来说,设置心跳包请求间隔貌似是没有用的,因为它是不会发送心跳包的,但是它会触发 sessionIdle 事件, 我们利用该方法,可以来判断客户端是否在该时间间隔内没有发心跳包,一旦 sessionIdle 方法被调用,则认为 客户端丢失连接并将其踢出。因此其中参数 heartPeriod 其实就是服务器对于客户端的 IDLE 监控时间。默认 60 s。

    • setRequestTimeout 超时时间,如果当前发出一个心跳请求后需要反馈。默认 30 s

    下面对客户端与服务端和分别举个例子

    服务器

    以被动型心跳机制为例,服务器在接受到客户端连接以后被动接受心跳请求,当在规定时间内没有收到客户端心跳请求时 将客户端连接关闭。

    import org.apache.mina.core.service.IoAcceptor;
    import org.apache.mina.core.service.IoHandlerAdapter;
    import org.apache.mina.core.session.IdleStatus;
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.filter.codec.ProtocolCodecFilter;
    import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
    import org.apache.mina.filter.keepalive.KeepAliveFilter;
    import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
    import org.apache.mina.filter.logging.LoggingFilter;
    import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    
    public class Server {
    
        private static final int PORT = 9123;
        /** 30秒后超时 */
        private static final int IDEL_TIMEOUT = 30;
        /** 15秒发送一次心跳包 */
        private static final int HEART_BEAT_RATE = 15;
        /** 心跳包内容 */
        private static final String HEART_BEAT_REQUEST = "0x11";
        private static final String HEART_BEAT_RESPONSE = "0x12";
        private static final Logger LOG = LoggerFactory.getLogger(Server.class);
    
        public static void main(String[] args) throws IOException {
            IoAcceptor acceptor = new NioSocketAcceptor();
            acceptor.getSessionConfig().setReadBufferSize(1024);
            acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, IDEL_TIMEOUT);
    
            acceptor.getFilterChain().addLast("logger", new LoggingFilter());
            acceptor.getFilterChain().addLast(
                    "codec",
                    new ProtocolCodecFilter(new TextLineCodecFactory()));
    
            KeepAliveMessageFactory heartBeatFactory = new KeepAliveMessageFactoryImpl();
            KeepAliveFilter heartBeat = new KeepAliveFilter(heartBeatFactory, IdleStatus.BOTH_IDLE);
            // 设置是否forward到下一个filter
            heartBeat.setForwardEvent(true);
            // 设置心跳频率
            heartBeat.setRequestInterval(HEART_BEAT_RATE);
    
            acceptor.getFilterChain().addLast("heartbeat", heartBeat);
    
            acceptor.setHandler(new IoHandlerAdapter());
            acceptor.bind(new InetSocketAddress(PORT));
            System.out.println("Server started on port: " + PORT);
        }
    
        /**
         * 被动型心跳机制,服务器在接受到客户端连接以后被动接受心跳请求,当在规定时间内没有收到客户端心跳请求时将客户端连接关闭
         * @ClassName KeepAliveMessageFactoryImpl
         * @Description 内部类,实现 KeepAliveMessageFactory(心跳工厂)
         */
        private static class KeepAliveMessageFactoryImpl implements KeepAliveMessageFactory {
    
            /* 判断是否心跳请求包,是的话返回true */
            @Override
            public boolean isRequest(IoSession session, Object message) {
                LOG.info("请求心跳包信息: " + message);
                return message.equals(HEART_BEAT_REQUEST);
            }
    
            /* 由于被动型心跳机制,没有请求当然也就不关注反馈,因此直接返回 false */
            @Override
            public boolean isResponse(IoSession session, Object message) {
                return false;
            }
    
            /* 被动型心跳机制无请求,因此直接返回 null */
            @Override
            public Object getRequest(IoSession session) {
                return null;
            }
    
            /* 根据心跳请求 request,反回一个心跳反馈消息 non-null  */
            @Override
            public Object getResponse(IoSession session, Object request) {
                LOG.info("响应预设信息: " + HEART_BEAT_RESPONSE);
                return HEART_BEAT_RESPONSE;
            }
        }
    }
    

    客户端

    客户端会定时发送心跳请求(注意定时时间必须小于,服务器端的IDLE监控时间),同时需要监听心跳反馈,以此来判断是否与服务器丢失连接。对于服务器的心跳请求不给与反馈。

    import org.apache.mina.core.service.IoAcceptor;
    import org.apache.mina.core.service.IoConnector;
    import org.apache.mina.core.service.IoHandlerAdapter;
    import org.apache.mina.core.session.IdleStatus;
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.filter.codec.ProtocolCodecFilter;
    import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
    import org.apache.mina.filter.keepalive.KeepAliveFilter;
    import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
    import org.apache.mina.filter.logging.LoggingFilter;
    import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
    import org.apache.mina.transport.socket.nio.NioSocketConnector;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    
    public class Client {
    
        private static final int PORT = 9123;
        /** 30秒后超时 */
        private static final int IDEL_TIMEOUT = 30;
        /** 15秒发送一次心跳包 */
        private static final int HEART_BEAT_RATE = 15;
        /** 心跳包内容 */
        private static final String HEART_BEAT_REQUEST = "0x11";
        private static final String HEART_BEAT_RESPONSE = "0x12";
        private static final Logger LOG = LoggerFactory.getLogger(Client.class);
    
        public static void main(String[] args) throws IOException {
            IoConnector connector = new NioSocketConnector();
            connector.getSessionConfig().setReadBufferSize(1024);
            connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, IDEL_TIMEOUT);
    
            connector.getFilterChain().addLast("logger", new LoggingFilter());
            connector.getFilterChain().addLast(
                    "codec",
                    new ProtocolCodecFilter(new TextLineCodecFactory()));
    
            KeepAliveMessageFactory heartBeatFactory = new KeepAliveMessageFactoryImpl();
            KeepAliveFilter heartBeat = new KeepAliveFilter(heartBeatFactory, IdleStatus.BOTH_IDLE);
            // 设置是否forward到下一个filter
            heartBeat.setForwardEvent(true);
            // 设置心跳频率
            heartBeat.setRequestInterval(HEART_BEAT_RATE);
    
            connector.getFilterChain().addLast("heartbeat", heartBeat);
    
            connector.setHandler(new IoHandlerAdapter());
            connector.connect(new InetSocketAddress("127.0.0.1", PORT));
            System.out.println("Server started on port: " + PORT);
        }
    
        /**
         * 被动型心跳机制,服务器在接受到客户端连接以后被动接受心跳请求,当在规定时间内没有收到客户端心跳请求时将客户端连接关闭
         * @ClassName KeepAliveMessageFactoryImpl
         * @Description 内部类,实现KeepAliveMessageFactory(心跳工厂)
         * @author cruise
         *
         */
        private static class KeepAliveMessageFactoryImpl implements KeepAliveMessageFactory {
    
            /* 服务器不会给客户端发送请求包,因此不关注请求包,直接返回 false   */
            @Override
            public boolean isRequest(IoSession session, Object message) {
                return false;
            }
    
            /* 客户端关注请求反馈,因此判断 mesaage 是否是反馈包 */
            @Override
            public boolean isResponse(IoSession session, Object message) {
                LOG.info("响应预设信息: " + message);
                return message.equals(HEART_BEAT_RESPONSE);
            }
    
            /* 获取心跳请求包 non-null */
            @Override
            public Object getRequest(IoSession session) {
                LOG.info("请求预设信息: " + HEART_BEAT_REQUEST);
                return HEART_BEAT_REQUEST;
            }
    
            /* 服务器不会给客户端发送心跳请求,客户端当然也不用反馈,该方法返回 null  */
            @Override
            public Object getResponse(IoSession session, Object request) {
                return null;
            }
        }
    }
    
  • 相关阅读:
    讲透学烂二叉树(五):分支平衡—AVL树与红黑树伸展树自平衡
    讲透学烂二叉树(四):二叉树的存储结构—建堆-搜索-排序
    讲透学烂二叉树(三):二叉树的遍历图解算法步骤及JS代码
    instanceof运算符的实质:Java继承链与JavaScript原型链
    JavaScript new 关键词解析及原生实现 new
    JavaScript继承的实现方式:原型语言对象继承对象原理剖析
    GitHub不再支持密码验证解决方案:SSH免密与Token登录配置
    PNG文件解读(2):PNG格式文件结构与数据结构解读—解码PNG数据
    PNG文件解读(1):PNG/APNG格式的前世今生
    JIT-动态编译与AOT-静态编译:java/ java/ JavaScript/Dart乱谈
  • 原文地址:https://www.cnblogs.com/binarylei/p/8496466.html
Copyright © 2011-2022 走看看