zoukankan      html  css  js  c++  java
  • 聊聊心跳机制及netty心跳实现

      我们在使用netty的时候会使用一个参数,ChannelOption.SO_KEEPALIVE为true, 设置好了之后再Linux系统才会对keepalive生效,但是linux里边需要配置几个参数,tcp_keepalive_time, tcp_keepalive_invl, tcp_keepalive_probes,如果不配置的时候都会是默认值。

      tcp_keepalive_time 即给一个TCP连接发送心跳包最后的时间间隔某一段时间后继续发送心跳包,允许空闲的时间,然后再次发送心跳包,默认时间为7200秒,即2个小时发一次心跳包。

          tcp_keepalive_invl,发送存活探测时候未收到对方回执的时候,需要间隔一段时间继续发送。默认为75秒。

      tcp_keepalive_probes,如果发了存活探测的时候没有收到对方的回执,那么需要继续发送探测的次数,此时默认值为9次,也就是未收到回执的时候需要发送9次。

      再理一次,间隔tcp_keepalive_time之后发送心跳探测,如果未收到对方回执的时候,需要间隔tcp_keepalive_invl设置的时间继续发送,一共需要发送tcp_keepalive_probes的次数。  

          这个是Linux系统的配置,如果要使用Linux的此功能需要设置SO_KEEPALIVE为true,同时设置其他几个参数。系统默认的SO_KEEPALIVE为false。因为这些情况的差异,所以netty提供了自己实现心跳的机制。

      netty有心跳的实现方法 IdleStateHandler,其中有读空闲时间,写空闲时间,读写空闲时间,只要有一个满足条件会触发userEventTriggered方法。

    public IdleStateHandler(
                int readerIdleTimeSeconds,
                int writerIdleTimeSeconds,
                int allIdleTimeSeconds)

      定义个消息内容吧,长度为Type的长度1 + 实际内容的长度5 = 6。Length为2个字节,Type为1个类型。

      +----------+----------+----------------+ 
      |  Length  |Type(byte)| Actual Content |
      |   0x06   |    1     |    "HELLO"     |    
      +----------+----------+----------------+  

      定义公共的inbound方法,用于进行channelRead, sendPing, sendPong, userEventTriggered 方法。

    package com.hqs.heartbeat.common;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.timeout.IdleStateEvent;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @author huangqingshi
     * @Date 2019-05-11
     */
    public abstract class CustomeHeartbeatHandler extends SimpleChannelInboundHandler<ByteBuf> {
    
        public static final byte PING = 1;
        public static final byte PONG = 2;
        public static final byte CUSTOM_MSG = 3;
    
        protected String name;
        private AtomicInteger heartbeatCount = new AtomicInteger(0);
    
        public CustomeHeartbeatHandler(String name) {
            this.name = name;
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
            if(byteBuf.getByte(2) == PING) {
                sendPong(channelHandlerContext);
            } else if(byteBuf.getByte(2) == PONG) {
                System.out.println("get pong msg from " + channelHandlerContext
                        .channel().remoteAddress());
            } else {
                handleData(channelHandlerContext, byteBuf);
            }
        }
    
        protected abstract void handleData(ChannelHandlerContext channelHandlerContext,
                                           ByteBuf byteBuf);
    
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("channel read : " + msg);
            ByteBuf byteBuf = (ByteBuf) msg;
            System.out.println(byteBuf.getByte(2));
            super.channelRead(ctx, msg);
        }
    
        protected void sendPong(ChannelHandlerContext channelHandlerContext) {
            ByteBuf buf = channelHandlerContext.alloc().buffer(3);
            buf.writeShort(3);
            buf.writeByte(PONG);
            channelHandlerContext.writeAndFlush(buf);
            heartbeatCount.incrementAndGet();
            System.out.println("send pong message to " + channelHandlerContext.channel().remoteAddress());
        }
    
        protected void sendPing(ChannelHandlerContext channelHandlerContext) {
            ByteBuf buf = channelHandlerContext.alloc().buffer(3);
            buf.writeShort(3);
            buf.writeByte(PING);
            channelHandlerContext.writeAndFlush(buf);
            heartbeatCount.incrementAndGet();
            System.out.println("send ping message to " + channelHandlerContext.channel().remoteAddress());
        }
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if(evt instanceof IdleStateEvent){
                IdleStateEvent e = (IdleStateEvent) evt;
                switch (e.state()) {
                    case ALL_IDLE:
                        handlALLIdle(ctx);
                        break;
                    case READER_IDLE:
                        handlReadIdle(ctx);
                        break;
                    case WRITER_IDLE:
                        handlWriteIdle(ctx);
                        break;
                     default:
                         break;
                }
            }
        }
    
        protected void handlReadIdle(ChannelHandlerContext channelHandlerContext) {
            System.out.println("READ_IDLE---");
        }
    
        protected void handlWriteIdle(ChannelHandlerContext channelHandlerContext) {
            System.out.println("WRITE_IDLE---");
        }
    
        protected void handlALLIdle(ChannelHandlerContext channelHandlerContext) {
            System.out.println("ALL_IDLE---");
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channel:" + ctx.channel().remoteAddress() + " is active");
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channel:" + ctx.channel().remoteAddress() + " is inactive");
        }
    }

      定义Server的方法,设置读超时为10秒,采用固定长度方法进行内容分割:LengthFieldBasedFrameDecoder(1024, 0, 2, -2, 0),长度为1K 。一个主线程接收请求,四个线程处理请求。端口号设置为9999。

    package com.hqs.heartbeat.server;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    import io.netty.handler.timeout.IdleStateHandler;
    
    /**
     * @author huangqingshi
     * @Date 2019-05-11
     */
    public class Server {
    
        public static void main(String[] args) {
            NioEventLoopGroup boss = new NioEventLoopGroup(1);
            NioEventLoopGroup worker = new NioEventLoopGroup(4);
    
            try {
                ServerBootstrap bootstrapServer = new ServerBootstrap();
                bootstrapServer.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline channelPipeline = ch.pipeline();
                        channelPipeline.addLast(new IdleStateHandler(10, 0, 0));
                        channelPipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0,2, -2, 0));
                        channelPipeline.addLast(new ServerHandler());
                    }
                });
                Channel channel = bootstrapServer.bind(9999).sync().channel();
                channel.closeFuture().sync();
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
    }

      Server的handler的处理方法:

    package com.hqs.heartbeat.server;
    
    import com.hqs.heartbeat.common.CustomeHeartbeatHandler;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    
    /**
     * @author huangqingshi
     * @Date 2019-05-11
     */
    public class ServerHandler extends CustomeHeartbeatHandler {
    
        public ServerHandler() {
            super("server");
        }
    
        @Override
        protected void handleData(ChannelHandlerContext channelHandlerContext,
                                  ByteBuf byteBuf) {
            byte[] data = new byte[byteBuf.readableBytes() - 3];
            ByteBuf responseBuf = Unpooled.copiedBuffer(byteBuf);
            byteBuf.skipBytes(3);
            byteBuf.readBytes(data);
            String content = new String(data);
            System.out.println(name + " get content : " + content);
            channelHandlerContext.writeAndFlush(responseBuf);
        }
    
        @Override
        protected void handlReadIdle(ChannelHandlerContext channelHandlerContext) {
            super.handlReadIdle(channelHandlerContext);
            System.out.println(" client " + channelHandlerContext.channel().remoteAddress() + " reader timeout close it --");
            channelHandlerContext.close();
        }
    }

      定义Client类,所有超时时间为5秒,如果5秒没有读写的话则发送ping,如果失去连接之后inactive了就会重新连接,采用10秒出发一次。

    package com.hqs.heartbeat.client;
    
    import com.hqs.heartbeat.common.CustomeHeartbeatHandler;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    import io.netty.handler.timeout.IdleStateHandler;
    
    import java.util.Random;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author huangqingshi
     * @Date 2019-05-11
     */
    public class Client {
    
    
        private NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
        private Channel channel;
        private Bootstrap bootstrap;
    
        public static void main(String[] args) throws InterruptedException {
            Client client = new Client();
            client.start();
            client.sendData();
        }
    
        public void start() {
    
            try {
                bootstrap = new Bootstrap();
    
                bootstrap.group(workGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline channelPipeline = ch.pipeline()
                                        .addLast(new IdleStateHandler(0,0,5))
                                        .addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, -2, 0))
                                        .addLast(new ClientHandler(Client.this));
                            }
                        });
                doConnect();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    
        public void sendData() throws InterruptedException {
            Random random = new Random(System.currentTimeMillis());
            for(int i = 0; i < 10000; i++) {
                if(channel != null && channel.isActive()) {
                    String content = "client msg " + i;
                    ByteBuf byteBuf = channel.alloc().buffer(3 + content.getBytes().length);
                    byteBuf.writeShort(3 + content.getBytes().length);
                    byteBuf.writeByte(CustomeHeartbeatHandler.CUSTOM_MSG);
                    byteBuf.writeBytes(content.getBytes());
                    channel.writeAndFlush(byteBuf);
                }
    
                Thread.sleep(random.nextInt(20000));
    
            }
    
        }
    
        public void doConnect() {
            if(channel != null && channel.isActive()) {
                return;
            }
    
            ChannelFuture future = bootstrap
                    .connect("127.0.0.1", 9999);
            future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if(future.isSuccess()) {
                        channel = future.channel();
                        System.out.println("connect to server successfully");
                    } else {
                        System.out.println("Failed to connect to server, try after 10s");
    
                        future.channel().eventLoop().schedule(new Runnable() {
                            @Override
                            public void run() {
                                doConnect();
                            }
                        }, 10, TimeUnit.SECONDS);
                    }
                }
            });
        }
    
    }

      定义clientHandler方法,读取时跳过长度+类型 2+1 三个字节,然后获取消息。连接断开之后则进行重连。

    package com.hqs.heartbeat.client;
    
    import com.hqs.heartbeat.common.CustomeHeartbeatHandler;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    
    /**
     * @author huangqingshi
     * @Date 2019-05-11
     */
    public class ClientHandler extends CustomeHeartbeatHandler {
    
        private Client client;
    
        public ClientHandler(Client client) {
            super("client");
            this.client = client;
        }
    
        @Override
        protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
            byte[] data = new byte[byteBuf.readableBytes() - 3];
            byteBuf.skipBytes(3);
            byteBuf.readBytes(data);
            String content = new String(data);
            System.out.println(name + " get content:" + content);
        }
    
        @Override
        protected void handlALLIdle(ChannelHandlerContext channelHandlerContext) {
            super.handlALLIdle(channelHandlerContext);
            sendPing(channelHandlerContext);
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            super.channelInactive(ctx);
            client.doConnect();
        }
    }

      好了,总体的netty心跳实现机制就这么多,希望能帮助到大家。

      github地址:https://github.com/stonehqs/heartbeat

      

  • 相关阅读:
    真正的e时代
    在线手册
    UVA 10616 Divisible Group Sums
    UVA 10721 Bar Codes
    UVA 10205 Stack 'em Up
    UVA 10247 Complete Tree Labeling
    UVA 10081 Tight Words
    UVA 11125 Arrange Some Marbles
    UVA 10128 Queue
    UVA 10912 Simple Minded Hashing
  • 原文地址:https://www.cnblogs.com/huangqingshi/p/10888663.html
Copyright © 2011-2022 走看看