zoukankan      html  css  js  c++  java
  • netty中的channelPipeline在编程中的作用

    在netty编程中我们绝大多数是要是用nio的,nio相比传统的io更加高效,而nio中核心概念离不开channel,buffer,selector三个重要的对象。

    那么在netty中有一个channelPipeline的概念,表面理解起来是通道的意思,实际它是在数据传输过程中的通道容器

    之所以定义为通道容器就是因为在这里去注册InboundChannelHandleOnboundChannelHandle。而且netty中设计更为巧妙的是,数据流入和流出的处理器可以是不对称的,当然也有数据出入时同时经过相同的channelHandle处理器。

    一张图描绘出netty的数据入栈出栈是可以完全独立的过滤器:

     继续跟踪代码并我们会发现,我们经常编写的代码中总是会出现这样的情况:

       .childHandler(new ChannelInitializer<SocketChannel>() {
                            //设置channel中的多个handler
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline channelPipeline = socketChannel.pipeline();
                                ByteBuf delimiter = Unpooled.copiedBuffer("$_$".getBytes());
                                channelPipeline.addLast("delimiterBasedFrameDecoder", new DelimiterBasedFrameDecoder(4096, delimiter));
                                channelPipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
                                channelPipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
    
                                channelPipeline.addLast("testServerChannelHandler", new TestServerChannelHandler());
                            }
                        });
    public class TestServerChannelHandler extends SimpleChannelInboundHandler<String> {
        public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
            Channel channel = channelHandlerContext.channel();
            channelGroup.forEach(currChannel -> {
                if (channel == currChannel) {
                    currChannel.writeAndFlush("[自己]:" + s + TestServer.delimiterStr);
                } else {
                    currChannel.writeAndFlush("[" + currChannel.remoteAddress().toString() + "]:" + s + TestServer.delimiterStr);
                }
            });
        }

     我们会发现在channelRead0中有个ChannelHandlerContext参数,那我们就说下这个Context到底是个什么东西。

    我们在addLast方法添加ChannelHandle的时候会发现,有些是继承了ChannelInboundHandle,而有些是继承了ChannelOutboundHandle,另一些是二者都继承了。那么在addlast方法实际调用内部是这样的:

     @Override
        public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
            final AbstractChannelHandlerContext newCtx;
            synchronized (this) {
                checkMultiplicity(handler);
    
                newCtx = newContext(group, filterName(name, handler), handler);//这里我们实质是创建了一个ChannelHandleContext对象后在调用addLast0方法将上下文对象放入了“事件处理链表”。
    
                addLast0(newCtx);
    
                // If the registered is false it means that the channel was not registered on an eventloop yet.
                // In this case we add the context to the pipeline and add a task that will call
                // ChannelHandler.handlerAdded(...) once the channel is registered.
                if (!registered) {
                    newCtx.setAddPending();
                    callHandlerCallbackLater(newCtx, true);
                    return this;
                }
    
                EventExecutor executor = newCtx.executor();
                if (!executor.inEventLoop()) {
                    newCtx.setAddPending();
                    executor.execute(new Runnable() {
                        @Override
                        public void run() {
                            callHandlerAdded0(newCtx);
                        }
                    });
                    return this;
                }
            }
            callHandlerAdded0(newCtx);
            return this;
        }

     这里揭示了一个问题,我们要清清楚,并且可以准确的说:channelPipeline是一个容纳channelHandleContext的容器,而容器的内部又持有当前的channel和channelPipeline对象,这些channelHandleContext实际上是组成了一个链表,在数据进行InBound和OutBound

    时候进行相应的处理。我们从addLast0方法可以看出是channelHandleContext的链表。如下

     private void addLast0(AbstractChannelHandlerContext newCtx) {
            AbstractChannelHandlerContext prev = tail.prev;
            newCtx.prev = prev;
            newCtx.next = tail;
            prev.next = newCtx;
            tail.prev = newCtx;
        }

          这里还有个重点问题,我们在使用netty进行实际开发的时候,通过channel对象能获取到writeAndFlush方法将数据写入并发回给客户端。我们知道netty有一系列的

    channelHandle控制数据流入和流出,那么在经过OutBoundHandle数据的时候,如果我们调用channel本身的riteAndFlush方法,数据流出会经过所有的ChannelOutBoundHandle

    处理器。而如果调用的writeAndFlush方法来自于ChannelHandleContext则只会调用当前handle接下来的ChannelOutBoundHandle处理器,之前的handle处理则不会经过,那么在

    这里我们可以根据当前情景适当的选择调用对象,来提升性能。过程如下图:

     性能优化点:

    (0)对于关注事件发生的EventLoop,尽量使用一个线程来处理,将更多线程分给实际的业务处理。

    (1)尽量减小channelpiplen的长度,可减少EventLoop在处理单个事件时的阻塞时间。

    (2)减少ChannelHandler的重复创建,可大大减小内存的开销,但是要注意chennelRead的线程安全

    (3)对使用完毕的ByteBuf进行引用计数的清除,可对空间进行回收,而且ByteBuf默认是使用堆外内存创建的,堆外内存创建的数据不会用户态和内核态的数据copy大大减少了时间复杂度。

    (4).childOption(ChannelOption.TCP_NODELAY, true)和.option(ChannelOption.TCP_NODELAY, true)禁用nagle算法,不等待,马上发送数据,这样数据体积比较小。

  • 相关阅读:
    nat
    ICE协议下NAT穿越的实现(STUN&TURN)
    比特币源码分析--端口映射
    IO复用,AIO,BIO,NIO,同步,异步,阻塞和非阻塞 区别(百度)
    从数据的角度带你深入了解IPFS
    IPFS 到底是怎么工作的?
    从数据的角度带你深入了解IPFS
    IPFS
    IPFS中文简介
    bootstrap 表单验证 dem
  • 原文地址:https://www.cnblogs.com/zzq-include/p/12075798.html
Copyright © 2011-2022 走看看