zoukankan      html  css  js  c++  java
  • Netty学习笔记

    一些类与方法说明

    1)ByteBuf
    ByteBuf的API说明:

    • Creation of a buffer
      It is recommended to create a new buffer using the helper methods in Unpooled rather than calling an individual implementation's constructor.
      建议用Unpooled类的帮助方法来创建一个ByteBuf,而不是用new ByteBuf()创建。具体如下:
     import static io.netty.buffer.Unpooled.*;
     ByteBuf heapBuffer    = buffer(128);
     ByteBuf directBuffer  = directBuffer(256);
     ByteBuf wrappedBuffer = wrappedBuffer(new byte[128], new byte[256]);
     ByteBuf copiedBuffe r = copiedBuffer(ByteBuffer.allocate(128));
    
    • Random Access Indexing
      除了顺序读写之外,ByteBuf还支持随机读写,这些方法如下:


      必须注意的是,set操作与write操作不支持动态扩展缓冲区,并且不会修改读写索引。
    • Sequential Access Indexing
    • Readable bytes (the actual content)
    • Writable bytes
    • Discardable bytes
      缓冲区的分配和释放是个耗时的操作,我们需要尽量重用它们;但同时频繁的调用将会导致性能下降,我们在调用前要确认是否确实需要这样做。
    • Clearing the buffer indexes
    • Search operations
    • Mark and reset
      mark操作会将当前的位置指针备份到mark变量中,当调用reset操作之后,重新将指针的当前位置恢复为备份在mark中的值。ByteBuffer只有2个相关方法,而ByteBuf有4个相关方法,因为ByteBuf有两个位置指针。
    • Derived buffers
      duplicate()操作,两个对象的内容是共享的,读写索引是独立的;copy()操作,两个对象的内容和读写索引都是独立的;slice()操作,两个对象的内容是共享的,读写索引是独立的。
    • Conversion to existing JDK types
      将ByteBuf转换为ByteBuffer操作的方法有两个,nioBuffer()和nioBuffer(int index, int length)。

    ByteBuf的典型用法示例:

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println(body);
    }
    

    ByteBuffer完全可以满足NIO编程的需要,但是由于NIO的复杂性,ByteBuffer也有其局限性,它的主要缺点如下:

    • ByteBuffer长度固定,一旦分配完成,它的容量不能动态扩展和收缩;
    • ByteBuffer只有一个标识位置的指针,读写的时候需要手工调用clear()、flip()和rewind()等;
    • ByteBuffer的API功能有限,一些高级和实用的特性它不支持,需要使用者自己实现。

    ByteBuffer的典型用法示例:

    ByteBuffer buffer = ByteBuffer.allocate(100);
    String value = "0123456789";
    buffer.put(value.getBytes());
    buffer.flip();
    byte[] valueArray = new byte[buffer.remaining()];
    buffer.get(valueArray);
    String decodeValue = new String(valueArray);
    

    2)connect()
    当服务端要进行重复登录检查时,需要绑定客户端端口。并且,从产品管理角度看,一般情况下不允许客户端系统随便使用随机端口。

    ChannelFuture future = b.connect(new InetSocketAddress(host, port), new InetSocketAddress(MyConstant.LOCAL_IP, MyConstant.LOCAL_PORT)).sync();
    future.channel().closeFuture().sync();
    

    解决粘包/拆包问题

    这个问题其实是流式协议的特点。常用的解决方案有:
    1)消息定长; 2)在包尾增加回车换行符进行分割; 3)将特殊的分隔符作为消息的结束符; 4)将消息分为消息头和消息体; 5)更复杂的应用层协议。

    Netty提供了多种解码器用于处理该问题,消息接收端只需要将相应的handler添加到ChannelPipeline中即可。如FixedLengthFrameDecoder用于方案1,LineBasedFrameDecoder用于方案2,DelimiterBasedFrameDecoder用于方案3,LengthFieldBasedFrameDecoder用于方案4。

    方案1: FixedLengthFrameDecoder + StringDecoder组合。部分源码如下:

    new ChannelInitializer<SocketChannel>() {
    	@Override
    	public void initChannel(SocketChannel ch) throws Exception {
    		ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
    		ch.pipeline().addLast(new StringDecoder());
    		ch.pipeline().addLast(new MyHandler());
    	}
    }
    

    FixedLengthFrameDecoder的API说明如下:
    A decoder that splits the received ByteBufs by the fixed number of bytes.

    方案2: LineBasedFrameDecoder + StringDecoder组合就是按行切换的文本解码器。不过需要注意的是,发送数据是要加上separator,而解码之后是没有separator的。部分源码如下:

    new ChannelInitializer<SocketChannel>() {
    	@Override
    	public void initChannel(SocketChannel ch)
    		throws Exception {
    		ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
    		ch.pipeline().addLast(new StringDecoder());
    		ch.pipeline().addLast(new MyHandler());
    	}
    }
    

    LineBasedFrameDecoder的API说明如下:
    A decoder that splits the received ByteBufs on line endings. Both " " and " " are handled. For a more general delimiter-based decoder, see DelimiterBasedFrameDecoder.
    StringDecoder的API说明如下:
    Decodes a received ByteBuf into a String. Please note that this decoder must be used with a proper ByteToMessageDecoder such as DelimiterBasedFrameDecoder or LineBasedFrameDecoder if you are using a stream-based transport such as TCP/IP.

    方案3: DelimiterBasedFrameDecoder + StringDecoder组合。部分源码如下:

    new ChannelInitializer<SocketChannel>() {
    	@Override
    	public void initChannel(SocketChannel ch) throws Exception {
    		ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
    		ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
    		ch.pipeline().addLast(new StringDecoder());
    		ch.pipeline().addLast(new MyHandler());
    	}
    }
    

    DelimiterBasedFrameDecoder的API说明如下:
    A decoder that splits the received ByteBufs by one or more delimiters. It is particularly useful for decoding the frames which ends with a delimiter such as NUL or newline characters.
    DelimiterBasedFrameDecoder allows you to specify more than one delimiter. If more than one delimiter is found in the buffer, it chooses the delimiter which produces the shortest frame.

    方案4:LengthFieldBasedFrameDecoder。部分源码如下:

    //消息接收端
    public class MyMessageDecoder extends LengthFieldBasedFrameDecoder {
    
    	public MyMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) throws IOException {
    		super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
    		//...
    	}
    	
    	@Override
    	protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
    		ByteBuf frame = (ByteBuf) super.decode(ctx, in); //解码
    		if (frame == null) {
    			return null;
    		}
    		//...
    	}
    }
    
    //消息发送端
    public final class MyMessageEncoder extends MessageToByteEncoder<MyMessage> {
    
        public MyMessageEncoder() {
            //...
        }
    
        @Override
        protected void encode(ChannelHandlerContext ctx, MyMessage msg, ByteBuf sendBuf) throws Exception {
            if (msg == null || msg.getHeader() == null)
                //...
    
            sendBuf.writeInt((msg.getHeader().getLength()));
            sendBuf.writeByte((msg.getHeader().getType()));
            sendBuf.writeByte((msg.getHeader().getPriority()));
            //...
    
            if (msg.getBody() != null) {
                //...
            } else {
                //...        
            }
            //...
        }
    }
    

    LengthFieldBasedFrameDecoder解码器支持自动的TCP粘包和半包处理,只需要给出标识消息长度的字段偏移量和消息长度自身所占的字节数。
    LengthFieldBasedFrameDecoder的API说明如下:
    A decoder that splits the received ByteBufs dynamically by the value of the length field in the message. It is particularly useful when you decode a binary message which has an integer header field that represents the length of the message body or the whole message.
    LengthFieldBasedFrameDecoder has many configuration parameters so that it can decode any message with a length field, which is often seen in proprietary client-server protocols.

    并且API说明给出了使用该解码器的多种情况:

    • 2 bytes length field at offset 0, do not strip header
    • 2 bytes length field at offset 0, strip header
    • 2 bytes length field at offset 0, do not strip header, the length field represents the length of the whole message
    • 3 bytes length field at the end of 5 bytes header, do not strip header
    • 3 bytes length field at the beginning of 5 bytes header, do not strip header
    • 2 bytes length field at offset 1 in the middle of 4 bytes header, strip the first header field and the length field
    • 2 bytes length field at offset 1 in the middle of 4 bytes header, strip the first header field and the length field, the length field represents the length of the whole message

    编解码

    编解码框架优劣标准: 是否支持跨语言; 编码后的码流大小; 编解码的性能; API使用是否方便。
    无论是序列化后的码流大小,还是序列化性能,JDK默认的序列化机制表现得都很差。适当的时候用Netty的ObjectEncoder + ObjectDecoder、Google Protobuf编解码、JBoss Marshalling代替。

    方案1: ObjectEncoder + ObjectDecoder
    接收时,encoder后可以直接将msg强制转换为相应类的对象; 发送时,可以重写相应类的toString()方法,并用writeAndFlush()发送相应对象。特别地,这时不用考虑粘包和拆包问题。
    server部分源码:

    new ChannelInitializer<SocketChannel>() {
    	@Override
    	public void initChannel(SocketChannel ch) {
    		ch.pipeline().addLast(new ObjectDecoder(1024 * 1024, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
    		ch.pipeline().addLast(new ObjectEncoder());
    		ch.pipeline().addLast(new MyServerHandler());
    	}
    }
    

    client部分源码:

    new ChannelInitializer<SocketChannel>() {
    	@Override
    	public void initChannel(SocketChannel ch) throws Exception {
    		ch.pipeline().addLast(new ObjectDecoder(1024, ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
    		ch.pipeline().addLast(new ObjectEncoder());
    		ch.pipeline().addLast(new MyClientHandler());
    	}
    }
    

    ObjectDecoder的API说明:
    A decoder which deserializes the received ByteBufs into Java objects.
    Please note that the serialized form this decoder expects is not compatible with the standard ObjectOutputStream. Please use ObjectEncoder or ObjectEncoderOutputStream to ensure the interoperability with this decoder.

    ObjectEncoder的API说明:
    An encoder which serializes a Java object into a ByteBuf.
    Please note that the serialized form this encoder produces is not compatible with the standard ObjectInputStream. Please use ObjectDecoder or ObjectDecoderInputStream to ensure the interoperability with this encoder.

    方案2: Google Protobuf编解码
    摘自overview
    Protocol buffers are a flexible, efficient, automated mechanism for serializing structured data – think XML, but smaller, faster, and simpler.

    摘自tutorials
    With protocol buffers, you write a .proto description of the data structure you wish to store. From that, the protocol buffer compiler creates a class that implements automatic encoding and parsing of the protocol buffer data with an efficient binary format. The generated class provides getters and setters for the fields that make up a protocol buffer and takes care of the details of reading and writing the protocol buffer as a unit. Importantly, the protocol buffer format supports the idea of extending the format over time in such a way that the code can still read data encoded with the old format.

    server部分源码:

    new ChannelInitializer<SocketChannel>() {
    	@Override
    	public void initChannel(SocketChannel ch) {
    		ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
    		ch.pipeline().addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance()));
    		ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
    		ch.pipeline().addLast(new ProtobufEncoder());
    		ch.pipeline().addLast(new MyServerHandler());
    	}
    }
    

    client部分源码:

    new ChannelInitializer<SocketChannel>() {
    	@Override
    	public void initChannel(SocketChannel ch) {
    		ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
    		ch.pipeline().addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance()));
    		ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
    		ch.pipeline().addLast(new ProtobufEncoder());
    		ch.pipeline().addLast(new MyClientHandler());
    	}
    }
    

    源码中使用了ProtobufVarint32LengthFieldPrepender和ProtobufVarint32FrameDecoder编解码器,实际上是上文提到的解决粘包/拆包问题方案4的运用。关于Protobuf部分,请详细查看tutorials,业务逻辑中不需要再关注编解码了。

    Netty from the ground up

    1)两种发送数据方式的不同
    Once a ChannelHandler is added to a ChannelPipeline it also gets what's called a ChannelHandlerContext. Typically it is safe to get a reference to this object and keep it around. This is not true when a datagram protocol is used such as UDP. This object can later be used to obtain the underlying channel but is typically kept because you use it to write/send messages. This means there are two ways of sending messages in Netty. You can write directly to the channel or write to the ChannelHandlerContext object. The main difference between the two is that writing to the channel directly causes the message to start from the tail of the ChannelPipeline where as writing to the context object causes the message to start from the next handler in the ChannelPipeline.

    2)简单的handler只需要继承SimpleChannelInboundHandler
    To create a handler like this, your application only needs to extend the base class called SimpleChannelInboundHandler, where T is the type of message your handler can process. It is in this handler where your application obtains a reference to the ChannelHandlerContext by overriding one of the methods from the base class, all of them accept the ChannelHandlerContext as a parameter which you can then store as a field in the class.

    特别注意SimpleChannelInboundHandler的API说明:
    Be aware that depending of the constructor parameters it will release all handled messages by pass them to ReferenceCountUtil.release(Object). In this case you may need to use ReferenceCountUtil.retain(Object) if you pass the object to the next handler in the ChannelPipeline.
    这和ChannelHandlerAdapter是不同的。

    3)当业务逻辑耗时的时候可以使用EventExecutorGroup
    As said before you MUST NOT block the IO Thread at all. This means doing blocking operations within your ChannelHandler is problematic. Lucky enough there is a solution for this. Netty allows to specify an EventExecutorGroup when adding ChannelHandlers to the ChannelPipeline. This EventExecutorGroup will then be used to obtain an EventExecutor and this EventExecutor will execute all the methods of the ChannelHandler. The EventExecutor here will use a different Thread then the one that is used for the IO and thus free up the
    EventLoop.

    4)关于ChannelPipeline与ChannelInitializer
    Both ChannelInboundHandler and ChannelOutboundHandler can be mixed into the same ChannelPipeline.

    The role of the ChannelInitializer is to add ChannelHandler implementations to what's called the ChannelPipeline.
    An ChannelInitializer is also itself a ChannelHandler which automatically removes itself from the ChannelPipeline after it has added the other handlers.

    5)不需要关注多线程同步,同步已经被Netty的机制保证了
    When a channel is registered, Netty binds that channel to a single EventLoop (and so to a single thread) for the lifetime of that Channel. This is why your application doesn't need to synchronize on Netty IO operations because all IO for a given Channel will always be performed by the same thread.

    6)Netty中所有的IO操作默认都是异步的,但是这些提交的异步操作的执行是有先后关系的
    So basically a ChannelFuture is a placeholder for a result of an operation that is executed in the future. When exactly it is executed depends on many facts and is not easy to say. The only thing you can be sure of is that it will be executed and all operations that return a ChannelFuture and belong to the same Channel will be executed in the correct order, which is the same order as you executed the methods.

    关于UDP协议

    UDP使用的是面向无连接的、不可靠的数据报投递服务。当使用UDP协议传输信息时,用户应用程序必须负责解决数据报的丢失、重复、排序、差错确认等问题。
    由于UDP具有资源消耗小、处理速度快的优点,所以通常视频、音频等可靠性要求不高的数据传输一般会使用UDP,即使有一定的丢包率,也不会对功能造成严重的影响。

    文件传输

    Netty的文件传输无论在功能还是在可靠性方面,相比于传统的I/O类库或者其他一些第三方文件传输类库,都有独特的优势。
    server部分源码:

    public class MyHandler extends SimpleChannelInboundHandler<String> {
    
        private static final String CR = System.getProperty("line.separator");
    
        public void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
            File file = new File(msg);
            if (file.exists()) {
                if (!file.isFile()) {
                    ctx.writeAndFlush("Not a file : " + file + CR);
                    return;
                }
                ctx.write(file + " " + file.length() + CR);
                RandomAccessFile randomAccessFile = new RandomAccessFile(msg, "r");
                FileRegion region = new DefaultFileRegion(randomAccessFile.getChannel(), 0,randomAccessFile.length());
                ctx.write(region);
                ctx.writeAndFlush(CR);
                randomAccessFile.close();
            } else {
                ctx.writeAndFlush("File not found: " + file + CR);
            }
        }
    
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    

    DefaultFileRegion的API说明:
    Default FileRegion implementation which transfer data from a FileChannel or File. Be aware that the FileChannel will be automatically closed once AbstractReferenceCounted.refCnt() returns 0.
    DefaultFileRegion类实现了FileRegion接口,继承了AbstractReferenceCounted类。创建DefaultFileRegion实例时可以传入一个FileChannel实现或一个File实例,而且在这里使用FileChannel不需要手动close。

    FileRegion的API说明:
    If your operating system (or JDK / JRE) does not support zero-copy file transfer, sending a file with FileRegion might fail or yield worse performance. For example, sending a large file doesn't work well in Windows.
    FileRegion的使用需要操作系统(或JDK)支持zero-copy,这是因为Netty中是通过在FileRegion中包装了NIO的FileChannel.transferTo()方法实现的零拷贝。操作系统,设备驱动,文件系统,网络协议栈都会影响zero-copy。
    wikipedia有这样两段话:
    1、Java input streams can support zero-copy through the java.nio.channels.FileChannel's transferTo() method if the underlying operating system also supports zero copy.
    2、Zero-copy versions of operating system elements, such as device drivers, file systems, and network protocol stacks, greatly increase the performance of certain application programs and more efficiently utilize system resources.

    关于zero-copy,请参考wikipediadeveloperWorks 中国,在这儿就不再更多地描述了。

    需要注意的是,在进行大文件传输的时候,一次将文件的全部内容映射到内存中,很有可能导致内存溢出。为了解决大文件传输过程中的内存溢出,Netty提供了ChunkedWriteHandler来解决大文件或者码流传输过程中可能发生的内存溢出问题。
    ChunkedWriteHandler的API说明:
    A ChannelHandler that adds support for writing a large data stream asynchronously neither spending a lot of memory nor getting OutOfMemoryError. Large data streaming such as file transfer requires complicated state management in a ChannelHandler implementation. ChunkedWriteHandler manages such complicated states so that you can send a large data stream without difficulties.

    心跳机制

    参考Netty系列之Netty可靠性分析关于“链路的有效性检测”的说明

    要解决链路的可靠性问题,必须周期性的对链路进行有效性检测。目前最流行和通用的做法就是心跳检测。
    不同的协议,心跳检测机制也存在差异,归纳起来主要分为两类:

    1. Ping-Pong型心跳:由通信一方定时发送Ping消息,对方接收到Ping消息之后,立即返回Pong应答消息给对方,属于请求-响应型心跳;
    2. Ping-Ping型心跳:不区分心跳请求和应答,由通信双方按照约定定时向对方发送心跳Ping消息,它属于双向心跳。
      Netty提供的空闲检测机制分为三种:
    3. 读空闲,链路持续时间t没有读取到任何消息;
    4. 写空闲,链路持续时间t没有发送任何消息;
    5. 读写空闲,链路持续时间t没有接收或者发送任何消息。

    需要补充的是,尽管有SO_KEEPALIVE(周期性测试连接是否仍存活)选项,但是应用层的心跳还是必不可少的。《Linux多线程服务器端编程》解释

    心跳除了说明应用程序还活着(进程还在,网络通畅),更重要的是表明应用程序还能正常工作。而 TCP keepalive 有操作系统负责探查,即便进程死锁,或阻塞,操作系统也会如常收发 TCP keepalive 消息。对方无法得知这一异常。

    这儿提供两种实现心跳检测的方案:
    1)使用io.netty.handler.timeout包下的相关类与接口
    ReadTimeoutHandler实现了readTimedOut()方法

    /**
     * Is called when a read timeout was detected.
     */
    protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
        if (!closed) {
            ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
            ctx.close();
            closed = true;
        }
    }
    

    使用方法

    WriteTimeoutHandler实现了writeTimedOut()方法

    /**
     * Is called when a write timeout was detected
     */
    protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {
        if (!closed) {
            ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE);
            ctx.close();
            closed = true;
        }
    }
    

    使用方法

    IdleStateHandler实现了channelIdle()方法

    /**
     * Is called when an {@link IdleStateEvent} should be fired. This implementation calls
     * {@link ChannelHandlerContext#fireUserEventTriggered(Object)}.
     */
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        ctx.fireUserEventTriggered(evt);
    }
    

    使用方法

    2)利用Netty提供的自定义EventExecutor接口实现
    这是《Netty权威指南》中给出的一种实现方式,client实现Ping,server实现Pong。
    client部分源码:

    public class HeartBeatReqHandler extends ChannelHandlerAdapter {
    
        private volatile ScheduledFuture<?> heartBeat;
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            NettyMessage message = (NettyMessage) msg;
            
            if (message.getHeader() != null && message.getHeader().getType() == MessageType.LOGIN_RESP.value()) { // 握手成功, 主动发送心跳消息
                heartBeat = ctx.executor().scheduleAtFixedRate(new HeartBeatReqHandler.HeartBeatTask(ctx), 0, 5000, TimeUnit.MILLISECONDS);
            } else if (message.getHeader() != null && message.getHeader().getType() == MessageType.HEARTBEAT_RESP.value()) { // 心跳信号
                System.out.println("Client receive server heart beat message : ---> " + message);
            } else //一般消息, 透传给下一个handler
                ctx.fireChannelRead(msg);
        }
    
        private class HeartBeatTask implements Runnable {
            private final ChannelHandlerContext ctx;
    
            public HeartBeatTask(final ChannelHandlerContext ctx) {
                this.ctx = ctx;
            }
    
            @Override
            public void run() {
                NettyMessage heatBeat = buildHeatBeat();
                System.out.println("Client send heart beat messsage to server : ---> " + heatBeat);
                ctx.writeAndFlush(heatBeat); //Ping
            }
    
            private NettyMessage buildHeatBeat() {
                NettyMessage message = new NettyMessage();
                Header header = new Header();
                header.setType(MessageType.HEARTBEAT_REQ.value());
                message.setHeader(header);
                return message;
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            if (heartBeat != null) {
                heartBeat.cancel(true);
                heartBeat = null;
            }
            ctx.fireExceptionCaught(cause);
        }
    }
    

    ChannelHandlerContext.executor()返回EventExecutor接口,该接口继承了EventExecutorGroup接口,EventExecutor的API说明:
    The EventExecutor is a special EventExecutorGroup which comes with some handy methods to see if a Thread is executed in a event loop. Besides this, it also extends the EventExecutorGroup to allow for a generic way to access methods.
    EventExecutor除了scheduleAtFixedRate()方法,还有scheduleWithFixedDelay()方法可供使用,它们均返回ScheduledFuture,ScheduledFuture的API说明:
    The result of an scheduled asynchronous operation.

    server部分源码:

    public class HeartBeatRespHandler extends ChannelHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            NettyMessage message = (NettyMessage) msg;
            // 返回心跳应答消息
            if (message.getHeader() != null && message.getHeader().getType() == MessageType.HEARTBEAT_REQ.value()) {
                System.out.println("Receive client heart beat message : ---> " + message);
                NettyMessage heartBeat = buildHeatBeat();
                System.out.println("Send heart beat response message to client : ---> " + heartBeat);
                ctx.writeAndFlush(heartBeat); //Pong
            } else
                ctx.fireChannelRead(msg);
        }
    
        private NettyMessage buildHeatBeat() {
            NettyMessage message = new NettyMessage();
            Header header = new Header();
            header.setType(MessageType.HEARTBEAT_RESP.value());
            message.setHeader(header);
            return message;
        }
    }
    

    重连机制

    如果链路中断,等待INTERVAL时间后,由客户端发起重连操作,如果重连失败,间隔周期INTERVAL后再次发起重连,直到重连成功。
    为了保证服务端有充足的时间释放句柄资源,在首次断连时客户端需要等待INTERVAL时间之后再发起重连,而不是失败后就立即重连。
    为了保证句柄资源能够及时释放,无论在什么场景下的重连失败,客户端都必须保证自身的资源被及时释放。

    client部分源码:

    public class MyClient {
    
        private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
        //...
    
        public void connect(int port, String host) throws Exception {
    
            // 配置客户端NIO线程组
    
            try {
                //...
                ChannelFuture future = b.connect(new InetSocketAddress(host, port), new InetSocketAddress(MyConstant.LOCAL_IP, MyConstant.LOCAL_PORT)).sync();
                future.channel().closeFuture().sync();
            } finally {
                // 所有资源释放完成之后,清空资源,再次发起重连操作
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            TimeUnit.SECONDS.sleep(1);
                            try {
                                connect(MyConstant.REMOTE_PORT, MyConstant.REMOTE_IP); // 发起重连操作
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }
    
        public static void main(String[] args) throws Exception {
            new MyClient().connect(MyConstant.REMOTE_PORT, MyConstant.REMOTE_IP);
        }
    
    }
    

    参考:《Netty in Action》、《Netty权威指南》。

  • 相关阅读:
    树分治
    实现自己的shell--MIT xv6 shell
    逆元打表
    Linux fork()函数
    三分:求解凸函数极值
    anti-nim 游戏
    nginx配置文件详解
    nginx之别名、location使用
    shell脚本编程基础知识点
    linux任务计划
  • 原文地址:https://www.cnblogs.com/shuaihanhungry/p/5841899.html
Copyright © 2011-2022 走看看