zoukankan      html  css  js  c++  java
  • 聊聊心跳机制及netty心跳实现

      我们在使用netty的时候会使用一个参数,ChannelOption.SO_KEEPALIVE为true, 设置好了之后再Linux系统才会对keepalive生效,但是linux里边需要配置几个参数,tcp_keepalive_time, tcp_keepalive_invl, tcp_keepalive_probes,如果不配置的时候都会是默认值。

      tcp_keepalive_time 即给一个TCP连接发送心跳包最后的时间间隔某一段时间后继续发送心跳包,允许空闲的时间,然后再次发送心跳包,默认时间为7200秒,即2个小时发一次心跳包。

          tcp_keepalive_invl,发送存活探测时候未收到对方回执的时候,需要间隔一段时间继续发送。默认为75秒。

      tcp_keepalive_probes,如果发了存活探测的时候没有收到对方的回执,那么需要继续发送探测的次数,此时默认值为9次,也就是未收到回执的时候需要发送9次。

      再理一次,间隔tcp_keepalive_time之后发送心跳探测,如果未收到对方回执的时候,需要间隔tcp_keepalive_invl设置的时间继续发送,一共需要发送tcp_keepalive_probes的次数。  

          这个是Linux系统的配置,如果要使用Linux的此功能需要设置SO_KEEPALIVE为true,同时设置其他几个参数。系统默认的SO_KEEPALIVE为false。因为这些情况的差异,所以netty提供了自己实现心跳的机制。

      netty有心跳的实现方法 IdleStateHandler,其中有读空闲时间,写空闲时间,读写空闲时间,只要有一个满足条件会触发userEventTriggered方法。

    public IdleStateHandler(
                int readerIdleTimeSeconds,
                int writerIdleTimeSeconds,
                int allIdleTimeSeconds)

      定义个消息内容吧,长度为Type的长度1 + 实际内容的长度5 = 6。Length为2个字节,Type为1个类型。

      +----------+----------+----------------+ 
      |  Length  |Type(byte)| Actual Content |
      |   0x06   |    1     |    "HELLO"     |    
      +----------+----------+----------------+  

      定义公共的inbound方法,用于进行channelRead, sendPing, sendPong, userEventTriggered 方法。

    package com.hqs.heartbeat.common;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.timeout.IdleStateEvent;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @author huangqingshi
     * @Date 2019-05-11
     */
    public abstract class CustomeHeartbeatHandler extends SimpleChannelInboundHandler<ByteBuf> {
    
        public static final byte PING = 1;
        public static final byte PONG = 2;
        public static final byte CUSTOM_MSG = 3;
    
        protected String name;
        private AtomicInteger heartbeatCount = new AtomicInteger(0);
    
        public CustomeHeartbeatHandler(String name) {
            this.name = name;
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
            if(byteBuf.getByte(2) == PING) {
                sendPong(channelHandlerContext);
            } else if(byteBuf.getByte(2) == PONG) {
                System.out.println("get pong msg from " + channelHandlerContext
                        .channel().remoteAddress());
            } else {
                handleData(channelHandlerContext, byteBuf);
            }
        }
    
        protected abstract void handleData(ChannelHandlerContext channelHandlerContext,
                                           ByteBuf byteBuf);
    
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("channel read : " + msg);
            ByteBuf byteBuf = (ByteBuf) msg;
            System.out.println(byteBuf.getByte(2));
            super.channelRead(ctx, msg);
        }
    
        protected void sendPong(ChannelHandlerContext channelHandlerContext) {
            ByteBuf buf = channelHandlerContext.alloc().buffer(3);
            buf.writeShort(3);
            buf.writeByte(PONG);
            channelHandlerContext.writeAndFlush(buf);
            heartbeatCount.incrementAndGet();
            System.out.println("send pong message to " + channelHandlerContext.channel().remoteAddress());
        }
    
        protected void sendPing(ChannelHandlerContext channelHandlerContext) {
            ByteBuf buf = channelHandlerContext.alloc().buffer(3);
            buf.writeShort(3);
            buf.writeByte(PING);
            channelHandlerContext.writeAndFlush(buf);
            heartbeatCount.incrementAndGet();
            System.out.println("send ping message to " + channelHandlerContext.channel().remoteAddress());
        }
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if(evt instanceof IdleStateEvent){
                IdleStateEvent e = (IdleStateEvent) evt;
                switch (e.state()) {
                    case ALL_IDLE:
                        handlALLIdle(ctx);
                        break;
                    case READER_IDLE:
                        handlReadIdle(ctx);
                        break;
                    case WRITER_IDLE:
                        handlWriteIdle(ctx);
                        break;
                     default:
                         break;
                }
            }
        }
    
        protected void handlReadIdle(ChannelHandlerContext channelHandlerContext) {
            System.out.println("READ_IDLE---");
        }
    
        protected void handlWriteIdle(ChannelHandlerContext channelHandlerContext) {
            System.out.println("WRITE_IDLE---");
        }
    
        protected void handlALLIdle(ChannelHandlerContext channelHandlerContext) {
            System.out.println("ALL_IDLE---");
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channel:" + ctx.channel().remoteAddress() + " is active");
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channel:" + ctx.channel().remoteAddress() + " is inactive");
        }
    }

      定义Server的方法,设置读超时为10秒,采用固定长度方法进行内容分割:LengthFieldBasedFrameDecoder(1024, 0, 2, -2, 0),长度为1K 。一个主线程接收请求,四个线程处理请求。端口号设置为9999。

    package com.hqs.heartbeat.server;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    import io.netty.handler.timeout.IdleStateHandler;
    
    /**
     * @author huangqingshi
     * @Date 2019-05-11
     */
    public class Server {
    
        public static void main(String[] args) {
            NioEventLoopGroup boss = new NioEventLoopGroup(1);
            NioEventLoopGroup worker = new NioEventLoopGroup(4);
    
            try {
                ServerBootstrap bootstrapServer = new ServerBootstrap();
                bootstrapServer.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline channelPipeline = ch.pipeline();
                        channelPipeline.addLast(new IdleStateHandler(10, 0, 0));
                        channelPipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0,2, -2, 0));
                        channelPipeline.addLast(new ServerHandler());
                    }
                });
                Channel channel = bootstrapServer.bind(9999).sync().channel();
                channel.closeFuture().sync();
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
    }

      Server的handler的处理方法:

    package com.hqs.heartbeat.server;
    
    import com.hqs.heartbeat.common.CustomeHeartbeatHandler;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    
    /**
     * @author huangqingshi
     * @Date 2019-05-11
     */
    public class ServerHandler extends CustomeHeartbeatHandler {
    
        public ServerHandler() {
            super("server");
        }
    
        @Override
        protected void handleData(ChannelHandlerContext channelHandlerContext,
                                  ByteBuf byteBuf) {
            byte[] data = new byte[byteBuf.readableBytes() - 3];
            ByteBuf responseBuf = Unpooled.copiedBuffer(byteBuf);
            byteBuf.skipBytes(3);
            byteBuf.readBytes(data);
            String content = new String(data);
            System.out.println(name + " get content : " + content);
            channelHandlerContext.writeAndFlush(responseBuf);
        }
    
        @Override
        protected void handlReadIdle(ChannelHandlerContext channelHandlerContext) {
            super.handlReadIdle(channelHandlerContext);
            System.out.println(" client " + channelHandlerContext.channel().remoteAddress() + " reader timeout close it --");
            channelHandlerContext.close();
        }
    }

      定义Client类,所有超时时间为5秒,如果5秒没有读写的话则发送ping,如果失去连接之后inactive了就会重新连接,采用10秒出发一次。

    package com.hqs.heartbeat.client;
    
    import com.hqs.heartbeat.common.CustomeHeartbeatHandler;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    import io.netty.handler.timeout.IdleStateHandler;
    
    import java.util.Random;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author huangqingshi
     * @Date 2019-05-11
     */
    public class Client {
    
    
        private NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
        private Channel channel;
        private Bootstrap bootstrap;
    
        public static void main(String[] args) throws InterruptedException {
            Client client = new Client();
            client.start();
            client.sendData();
        }
    
        public void start() {
    
            try {
                bootstrap = new Bootstrap();
    
                bootstrap.group(workGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline channelPipeline = ch.pipeline()
                                        .addLast(new IdleStateHandler(0,0,5))
                                        .addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, -2, 0))
                                        .addLast(new ClientHandler(Client.this));
                            }
                        });
                doConnect();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    
        public void sendData() throws InterruptedException {
            Random random = new Random(System.currentTimeMillis());
            for(int i = 0; i < 10000; i++) {
                if(channel != null && channel.isActive()) {
                    String content = "client msg " + i;
                    ByteBuf byteBuf = channel.alloc().buffer(3 + content.getBytes().length);
                    byteBuf.writeShort(3 + content.getBytes().length);
                    byteBuf.writeByte(CustomeHeartbeatHandler.CUSTOM_MSG);
                    byteBuf.writeBytes(content.getBytes());
                    channel.writeAndFlush(byteBuf);
                }
    
                Thread.sleep(random.nextInt(20000));
    
            }
    
        }
    
        public void doConnect() {
            if(channel != null && channel.isActive()) {
                return;
            }
    
            ChannelFuture future = bootstrap
                    .connect("127.0.0.1", 9999);
            future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if(future.isSuccess()) {
                        channel = future.channel();
                        System.out.println("connect to server successfully");
                    } else {
                        System.out.println("Failed to connect to server, try after 10s");
    
                        future.channel().eventLoop().schedule(new Runnable() {
                            @Override
                            public void run() {
                                doConnect();
                            }
                        }, 10, TimeUnit.SECONDS);
                    }
                }
            });
        }
    
    }

      定义clientHandler方法,读取时跳过长度+类型 2+1 三个字节,然后获取消息。连接断开之后则进行重连。

    package com.hqs.heartbeat.client;
    
    import com.hqs.heartbeat.common.CustomeHeartbeatHandler;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    
    /**
     * @author huangqingshi
     * @Date 2019-05-11
     */
    public class ClientHandler extends CustomeHeartbeatHandler {
    
        private Client client;
    
        public ClientHandler(Client client) {
            super("client");
            this.client = client;
        }
    
        @Override
        protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
            byte[] data = new byte[byteBuf.readableBytes() - 3];
            byteBuf.skipBytes(3);
            byteBuf.readBytes(data);
            String content = new String(data);
            System.out.println(name + " get content:" + content);
        }
    
        @Override
        protected void handlALLIdle(ChannelHandlerContext channelHandlerContext) {
            super.handlALLIdle(channelHandlerContext);
            sendPing(channelHandlerContext);
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            super.channelInactive(ctx);
            client.doConnect();
        }
    }

      好了,总体的netty心跳实现机制就这么多,希望能帮助到大家。

      github地址:https://github.com/stonehqs/heartbeat

      

  • 相关阅读:
    发现可高速缓存的 SSL 页面
    启用了不安全的HTTP方法
    目录遍历漏洞
    强强合体:Docker版Kali Linux发布
    Kali2.0 Sqlmap清除历史扫描日志
    OWASP-ZAP
    基于web的项目管理软件Redmine
    sqlmap用户手册
    Sqlmap基础(二)
    信息存储安全
  • 原文地址:https://www.cnblogs.com/huangqingshi/p/10888663.html
Copyright © 2011-2022 走看看