zoukankan      html  css  js  c++  java
  • 008-核心技术-netty-服务端心跳机制以及客户端心跳重连方案

    一、概述

    使用与客户端与服务端

    public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) 

    // long readerIdleTime 表示多长时间没有读,就会发送一个心跳检测包检测是否连接
    // long writerIdleTim 表示多长时间没有写,就会发送一个心跳检测包检测是否连接
    // long allIdleTime 表示多长时间没有读写,就会发送一个心跳检测包检测是否连接
    // 触发一个 IdleStateEvent事件
    // 当IdleStateEvent事件触发后,就会传递给管道下一个handler去处理。
    // 通过调用(触发)下一handler的userEventTiggered。在该方法中去处理(读、写等)

    客户端主要是发送偏多,注重的是写,以及心跳配置是写,客户端需要自启动,以及断线重连

    服务端主要是接收偏多,注重的是读,以及心跳配置是读,定期收到不,要剔除客户端

     1.1、服务端

    主方法

    package com.github.bjlhx15.netty.demo.netty.heartbeat;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.timeout.IdleStateHandler;
    
    import java.util.concurrent.TimeUnit;
    
    public class ServerHeartBeat {
        public static void main(String[] args) throws InterruptedException {
            NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
            NioEventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .handler(new LoggingHandler(LogLevel.INFO))//在bossGroup增加一个日志处理器
                        .option(ChannelOption.SO_BACKLOG, 128)
                        .childOption(ChannelOption.SO_KEEPALIVE, true)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                //加入一个netty提供的IdleStateHandler,空闲状态处理器
                                // long readerIdleTime 表示多长时间没有读,就会发送一个心跳检测包检测是否连接
                                // long writerIdleTim 表示多长时间没有写,就会发送一个心跳检测包检测是否连接
                                // long allIdleTime 表示多长时间没有读写,就会发送一个心跳检测包检测是否连接
                                // 触发一个 IdleStateEvent事件
    //                            当IdleStateEvent事件触发后,就会传递给管道下一个handler去处理。
    //                            通过调用(触发)下一handler的userEventTiggered。在该方法中去处理(读、写等)
                                pipeline.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
    //                            加入对IdleStateEvent检测进一步处理的handler
                                pipeline.addLast(new ServerHeartBeatIdleStateTriggerChannelHandler());
                                pipeline.addLast("decoder", new StringDecoder());
                                pipeline.addLast("encoder", new StringEncoder());
                                pipeline.addLast(new ServerMsgChannelHandler());//业务处理的Handler
                            }
                        });
                ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
    
                System.out.println("Server start listen at " + 7000);
                channelFuture.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }

    服务端心跳检测ChannelHandler

    package com.github.bjlhx15.netty.demo.netty.heartbeat;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.handler.timeout.IdleState;
    import io.netty.handler.timeout.IdleStateEvent;
    
    public class ServerHeartBeatIdleStateTriggerChannelHandler extends ChannelInboundHandlerAdapter {
        /**
         * @param ctx 上下文
         * @param evt 事件
         * @throws Exception
         */
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
                if(event.state()== IdleState.READER_IDLE){
                    System.out.println(ctx.channel().remoteAddress() + "--读超时时间--" + event.state());
                    System.out.println("服务器做对应处理即可");
                }
            }
        }
    }

    业务逻辑处理类

    package com.github.bjlhx15.netty.demo.netty.heartbeat;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.handler.timeout.IdleStateEvent;
    
    public class ServerMsgChannelHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("server channelRead……");
            System.out.println(ctx.channel().remoteAddress() + "-msg=" + msg.toString());
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }

    1.2、客户端

    1、构建一个ChannelHandler集合

    package com.github.bjlhx15.netty.demo.netty.heartbeat;
    
    import io.netty.channel.ChannelHandler;
    
    /**
     *
     * 客户端的ChannelHandler集合,由子类实现,这样做的好处:
     * 继承这个接口的所有子类可以很方便地获取ChannelPipeline中的Handlers
     * 获取到handlers之后方便ChannelPipeline中的handler的初始化和在重连的时候也能很方便
     * 地获取所有的handlers
     */
    public interface ChannelHandlerHolder {
        ChannelHandler[] handlers();
    } 

    2、客户端心跳检测Handler

    package com.github.bjlhx15.netty.demo.netty.heartbeat;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.handler.timeout.IdleState;
    import io.netty.handler.timeout.IdleStateEvent;
    import io.netty.util.CharsetUtil;
    
    @ChannelHandler.Sharable
    public class ClientConnectorIdleStateTriggerChannelHandler extends ChannelInboundHandlerAdapter {
    
        private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
                CharsetUtil.UTF_8));
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                IdleState state = ((IdleStateEvent) evt).state();
                if (state == IdleState.WRITER_IDLE) {
                    // write heartbeat to server
                    ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
                }
            } else {
                super.userEventTriggered(ctx, evt);
            }
        }
    }

    3、客户端增加服务检测ChannelHandler类

    package com.github.bjlhx15.netty.demo.netty.heartbeat;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.*;
    import io.netty.util.Timeout;
    import io.netty.util.Timer;
    import io.netty.util.TimerTask;
    
    import java.util.concurrent.TimeUnit;
    
    @ChannelHandler.Sharable
    public abstract class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements TimerTask, ChannelHandlerHolder {
        private final Bootstrap bootstrap;
        private final Timer timer;
        private final int port;
        private final String host;
        private volatile boolean reconnect = true;
        private int attempts;
    
        public ConnectionWatchdog(Bootstrap bootstrap, Timer timer, int port, String host, boolean reconnect) {
            this.bootstrap = bootstrap;
            this.timer = timer;
            this.port = port;
            this.host = host;
            this.reconnect = reconnect;
        }
    
        /**
         * channel链路每次active的时候,将其连接的次数重新☞ 0
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("当前链路已经激活了,重连尝试次数重新置为0");
            attempts = 0;
            ctx.fireChannelActive();
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("链接关闭");
            if (reconnect) {
                System.out.println("链接关闭,将进行重连");
                if (attempts < 12) {
                    attempts++;
                }           //重连的间隔时间会越来越长
                int timeout = 2 << attempts;
                timer.newTimeout(this, timeout, TimeUnit.MILLISECONDS);
            }
            ctx.fireChannelInactive();
        }
    
        @Override
        public void run(Timeout timeout) throws Exception {
            ChannelFuture future;
            //bootstrap已经初始化好了,只需要将handler填入就可以了
            synchronized (bootstrap) {
                bootstrap.handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        ch.pipeline().addLast(handlers());
                    }
                });
                future = bootstrap.connect(host, port);
            }
            //future对象
            future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    boolean succeed = f.isSuccess();
                    //如果重连失败,则调用ChannelInactive方法,再次出发重连事件,一直尝试12次,如果失败则不再重连
                    if (!succeed) {
                        System.out.println("重连失败");
                        f.channel().pipeline().fireChannelInactive();
                    } else {
                        System.out.println("重连成功");
                    }
                }
            });
        }
    }

    4、客户端主类

    package com.github.bjlhx15.netty.demo.netty.heartbeat;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.*;
    import io.netty.channel.local.LocalChannel;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.timeout.IdleStateHandler;
    import io.netty.util.HashedWheelTimer;
    
    import java.util.concurrent.TimeUnit;
    
    public class ClientHeartBeat {
        protected final HashedWheelTimer timer = new HashedWheelTimer();
        private Bootstrap boot;
        private final ClientConnectorIdleStateTriggerChannelHandler idleStateTrigger = new ClientConnectorIdleStateTriggerChannelHandler();
    
        public void connect(int port, String host) {
            EventLoopGroup group = new NioEventLoopGroup();
            boot = new Bootstrap();
            boot.group(group).channel(NioSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO));
    
            final ConnectionWatchdog watchdog = new ConnectionWatchdog(boot, timer, port, host, true) {
                @Override
                public ChannelHandler[] handlers() {
                    return new ChannelHandler[]{
                            this,
                            new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS),
                            idleStateTrigger,
                            new StringDecoder(),
                            new StringEncoder(),
                            new ClientMsgChannelHandler()
                    };
                }
            };
    
            ChannelFuture future = null;
            //进行连接
            try {
                synchronized (boot) {
                    boot.handler(new ChannelInitializer<Channel>() {
                        //初始化channel
                        @Override
                        protected void initChannel(Channel ch) throws Exception {
                            ch.pipeline().addLast(watchdog.handlers());
                        }
                    });
    
                    future = boot.connect(host, port);
    
                }// 以下代码在synchronized同步块外面是安全的
                future.sync();
                if (!future.isSuccess()) {
                    System.out.println("---- 连接服务器失败,2秒后重试 ---------port=" + port);
                    this.scheduleStart(future.channel(), port, host);
                }
    
            } catch (Throwable t) {
                System.out.println("connects to  fails." + t.getMessage());
                System.out.println("---- 连接服务器失败,2秒后重试 ---------port=" + port);
                Channel channel = future != null && future.channel() != null ? future.channel() : new LocalChannel();
                this.scheduleStart(channel, port, host);
            }
        }
    
        public void scheduleStart(Channel channel, int port, String host) {
            channel.eventLoop().schedule(new Runnable() {
                @Override
                public void run() {
                    connect(port, host);
                }
            }, 2L, TimeUnit.SECONDS);
        }
    
        /**
         * @param args
         * @throws Exception
         */
        public static void main(String[] args) throws Exception {
            new ClientHeartBeat().connect(7000, "127.0.0.1");
        }
    }

    5、客户端消息处理类

    package com.github.bjlhx15.netty.demo.netty.heartbeat;
    
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.ReferenceCountUtil;
    
    import java.util.Date;
    
    @ChannelHandler.Sharable
    public class ClientMsgChannelHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("激活时间是:" + new Date());
            System.out.println("HeartBeatClientHandler channelActive");
            ctx.fireChannelActive();
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("停止时间是:" + new Date());
            System.out.println("HeartBeatClientHandler channelInactive");
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String message = (String) msg;
            System.out.println(message);
            if (message.equals("Heartbeat")) {
                ctx.write("has read message from server");
                ctx.flush();
            }
            ReferenceCountUtil.release(msg);
        }
    }

    1.3、测试

    1、启动服务端,在启动客户端,正常

    服务端会定时收到心跳

    server channelRead……
    /127.0.0.1:52276-msg=Heartbeat

    2、停止服务端

    客户端如下

    重连失败
    链接关闭
    链接关闭,将进行重连
    停止时间是:Sat Aug 07 22:53:20 CST 2021
    HeartBeatClientHandler channelInactive

    启动后恢复正常

    重连成功
    当前链路已经激活了,重连尝试次数重新置为0
    激活时间是:Sat Aug 07 22:53:57 CST 2021
    HeartBeatClientHandler channelActive

    3、如果先启动客户端

    客户端如下

    connects to  fails.Connection refused: /127.0.0.1:7000
    ---- 连接服务器失败,2秒后重试 ---------port=7000

    然后启动服务端

    客户端会

    connects to  fails.Connection refused: /127.0.0.1:7000
    ---- 连接服务器失败,2秒后重试 ---------port=7000
    当前链路已经激活了,重连尝试次数重新置为0
    激活时间是:Sat Aug 07 22:55:25 CST 2021
    HeartBeatClientHandler channelActive

      

    电风扇

    
    
    转载请注明出处,感谢。
    作者:李宏旭
    阅罢此文,如果您觉得本文不错并有所收获,请【打赏】或【推荐】,也可【评论】留下您的问题或建议与我交流。
    你的支持是我不断创作和分享的不竭动力!
  • 相关阅读:
    【iOS CocoaPods篇】iOS CocoaPods一些特别的用法 指定版本、版本介绍、忽略警告
    【iOS CocoaPods篇】iOS 10.10 10.11 10.12 安装升级CocoPods
    iOS程序中的内存分配 栈区堆区全局区(转)
    retain和strong、assign和weak的区别(转)
    (ios实战):retain,copy,assign及autorelease ,strong,weak(转)
    malloc()与 alloc()区别 (转)
    iOS开发--KVC&KVO
    iOS开发之支付功能概述(转)
    disptch_after 自递归
    makeObjectsPerformSelector 方法的用法
  • 原文地址:https://www.cnblogs.com/bjlhx/p/15113533.html
Copyright © 2011-2022 走看看