zoukankan      html  css  js  c++  java
  • 基于netty的心跳机制实现

    前言:在实现过程查找过许多资料,各种波折,最后综合多篇文章最终实现并上线使用。为了减少大家踩坑的时间,所以写了本文,希望有用。对于实现过程中有用的参考资料直接放上链接,可能有些内容相对冗余,不过时间允许多看看也并不无益。

    入门文章:

    http://www.tuicool.com/articles/mEJvYb

    netty官网:
    (官网的user guide相对一般,javadoc倒是要看的)
     
    需求场景:
    实现用户的在线离线状态实时展现(我们的客户端是android)。
     
    技术选型:
    在线好办,关键是要监测到什么时候离线,于是我们选择了心跳模型,当心跳失效时即为离线。如果用http发送心跳包虽然简单但是极度不科学,耗电量太大,所以直接否决。我们选择基于TCP实现长连接,而借助一些第三方插件可以更好更快地实现长连接,于是在mina和netty之间我们选择了netty。(理由仅仅是在百度知道里边看到别人说netty使用的更广泛,没有深入对比过)
     
    相关版本:
    netty5.0
    jdk1.7
    tomcat6.0
     
    基础流程图如下:
    服务端代码:
    HeartBeatServer.java
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    public class HeartBeatServer {
     
        // 端口
        private int port ;
        public HeartBeatServer(int port) {
            this.port = port;
        }
         
        ChannelFuture f ;
         
        ServerBootstrap b ;
         
        // 检测chanel是否接受过心跳数据时间间隔(单位秒)
        private static final int READ_WAIT_SECONDS = 10;
     
         
        public void startServer() {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                b = new ServerBootstrap();
                b.group(bossGroup, workerGroup);
                b.channel(NioServerSocketChannel.class);
                b.childHandler(new HeartBeatServerInitializer());
                // 服务器绑定端口监听
                f = b.bind(port).sync();
                // 监听服务器关闭监听,此方法会阻塞
                f.channel().closeFuture().sync();
                // 可以简写为
                /* b.bind(portNumber).sync().channel().closeFuture().sync(); */
            catch (InterruptedException e) {
                e.printStackTrace();
            finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
        /**
         * 消息处理器
         * @author cullen edward
         */
        private class HeartBeatServerInitializer extends ChannelInitializer<SocketChannel> {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                 
                /*
                 * 使用ObjectDecoder和ObjectEncoder
                 * 因为双向都有写数据和读数据,所以这里需要两个都设置
                 * 如果只读,那么只需要ObjectDecoder即可
                 */
                pipeline.addLast("decoder"new StringDecoder());
                pipeline.addLast("encoder"new StringEncoder());
                 
                /*
                 * 这里只监听读操作
                 * 可以根据需求,监听写操作和总得操作
                 */
                pipeline.addLast("pong"new IdleStateHandler(READ_WAIT_SECONDS, 00,TimeUnit.SECONDS));
                 
                //pipeline.addLast("handler", new Heartbeat());
                pipeline.addLast("handler"new HeartbeatHandler());
            }
        }
         
        public void stopServer(){
            if(f!=null){
                f.channel().close();
            }
        }
        /**
         * @param args
         */
        public static void main(String[] args) {
            HeartbeatServer heartbeatServer = new HeartbeatServer(9597);
            heartbeatServer.startServer();
        }
    }

      

    HeartbeatHandler.java
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    public class HeartbeatHandler extends SimpleChannelInboundHandler<String> {
        // 失败计数器:未收到client端发送的ping请求
        private int unRecPingTimes = 0 ;
        private String userid;
         
        // 定义服务端没有收到心跳消息的最大次数
        private static final int MAX_UN_REC_PING_TIMES = 3;
         
        @Override
        protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
            System.out.println("----->msg=" + msg);    //msg格式约定为"operation,userid"
            String[] args = msg.split(",");
            String msg_operation = args[0];
            String msg_userid = args[1];
            if("LOGIN".equals(msg_operation)){
                if(!Utils.isBlank(msg_userid)){
                    userid = msg_userid;
                }
                setUserOnlineStatus(userid, true);
            }else if("HEARTBEAT".equals(msg_operation)){
                ctx.channel().writeAndFlush("success");
                // 失败计数器清零
                unRecPingTimes = 0;
            }
        }
         
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
                if (event.state() == IdleState.READER_IDLE) {
                    /*读超时*/
                    System.out.println("===服务端===(READER_IDLE 读超时)");
                    // 失败计数器次数大于等于3次的时候,关闭链接,等待client重连
                    if(unRecPingTimes >= MAX_UN_REC_PING_TIMES){
                        System.out.println("===服务端===(读超时,关闭chanel)");
                        // 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连
                        ctx.channel().close();
                    }else{
                        // 失败计数器加1
                        unRecPingTimes++;
                    }
                else if (event.state() == IdleState.WRITER_IDLE) {
                    /*写超时*/  
                    System.out.println("===服务端===(WRITER_IDLE 写超时)");
                else if (event.state() == IdleState.ALL_IDLE) {
                    /*总超时*/
                    System.out.println("===服务端===(ALL_IDLE 总超时)");
                }
            }
        }
         
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("错误原因:"+cause.getMessage());
            ctx.channel().close();
            setUserOnlineStatus(userid, false);
        }
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("Client active ");
            super.channelActive(ctx);
        }
         
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            // 关闭,等待重连
            ctx.close();
            System.out.println("===服务端===(客户端失效)");
            setUserOnlineStatus(userid, false);
        }
         
        //设置用户在线离线状态
        private void setUserOnlineStatus(String userid, boolean isOnline){
            if(!Utils.isBlank(userid)){
                //更新用户信息为在线状态(此处代码省略)
            }
        }
    }

      

    简易的测试客户端代码:

    SimpleClient.java
     
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    public class SimpleClient {
        public static void main(String[] args) throws Exception {
            new SimpleClient("127.0.0.1"9597).run();
        }
        private final String host;
        private final int port;
        public SimpleClient(String host, int port) {
            this.host = host;
            this.port = port;
        }
        public void run() throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap().group(group).channel(
                        NioSocketChannel.class).handler(
                        new SimpleClientInitializer());
                Channel channel = bootstrap.connect(host, port).sync().channel();
                BufferedReader in = new BufferedReader(new InputStreamReader(
                        System.in));
                while (true) {
                    channel.writeAndFlush(in.readLine());
                }
            catch (Exception e) {
                e.printStackTrace();
            finally {
                group.shutdownGracefully();
            }
        }
    }

      

    SimpleClientInitializer.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public class SimpleClientInitializer extends ChannelInitializer<SocketChannel> {
        @Override
        public void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast("framer"new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
            pipeline.addLast("decoder"new StringDecoder());
            pipeline.addLast("encoder"new StringEncoder());
        }
    }

    备注:代码大部分是从其他网站复制修改调整的,写得相对简易一点,其中还有很多安全性、合理性有待优化。

    代码参考文章:

     
    更多相关文章:
  • 相关阅读:
    linux软硬链接
    yum配置文件位置
    What is Docker?
    easy_install下载地址及安装
    python setuptools安装
    django--模板
    django基础PROJECT APP View template
    flask+uswgi+nginx+python3.6的venv发布网站ubuntu14.04
    Mixnode 让操作网络资源和数据库一样简单,不一样的爬虫!
    React Native vs. Cordova.
  • 原文地址:https://www.cnblogs.com/DaTouDaddy/p/7526324.html
Copyright © 2011-2022 走看看