zoukankan      html  css  js  c++  java
  • Netty 源码解读(二)-ChannelPipeline、ChannelHandler、ChannelHandlerContext

    1. ChannelPipeline、ChannelHandler、ChannelHandlerContext 的关系

    1. 每创建一个Socket 就会分配一个全新的ChannelPipeline (简称pipeline)

    2. 每一个 ChannelPipeline 内部包含多个 ChannelHandlerContext (简称Context)

    3. Context一起组成了一个双向链表,这些Context 用于封装我们调用addLast 时添加的Channelhandler(以下简称Handler)

    也就是说ChannelSocket 和 ChannelPipeline 是一对一的关系,而pipeline内部的多个Context 行成了链表,Context 只是对Handler 的封装。

    当一个请求进来时会进入socket 对应的pipeline,并经过pipeline 所有的handler。 可以理解为过滤器模式。

    2. 设计

    1. ChannelPipeline 设计

    该接口继承了ChannelInboundInvoker、 ChannelOutboundInvoker、 Iterable 接口。 标识可以调用数据出站的方法和入站的方法, 同时也能遍历内部的链表。

    public interface ChannelPipeline
            extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {

    处理过程如下:

    (1)入站事件由入站处理程序自下而上的方向处理。入站处理程序通常处理由地步的IO线程生成入站数据,入站数据通常从SocketChannel#read(ByteBuffer) 获取

    (2) 通常一个pipeline 有多个handler。例如一个典型的服务器在每个通常的管道中都会有一下处理程序:

    协议解码器-将二进制数据转换为Java 对象;

    协议编码器-将java 对象转换为二进制数据

    业务逻辑处理程序-执行实际业务逻辑

    (3) 业务程序不能将线程阻塞,会影响IO的速度,进而影响整个Netty 程序的性能。如果业务程序很快,可以放在IO线程中,反之就需要异步执行。 或者在添加handler的时候添加一个线程池。

    2. ChannelHandler 作用以及设计

    (1) 源码

    public interface ChannelHandler {
    
        // 当把channelHandler 添加到pipeline 时被调用
        void handlerAdded(ChannelHandlerContext ctx) throws Exception;
    
        // 当从pipeline 移除时调用
        void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
    
        // 处理发生异常时调用
        @Deprecated
        void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
    
        @Inherited
        @Documented
        @Target(ElementType.TYPE)
        @Retention(RetentionPolicy.RUNTIME)
        @interface Sharable {
            // no value
        }
    }

    (2) 作用: 作用是处理IO事件或拦截IO事件,并将其转发给下一个handler。 handler 处理事件是分入站和出站的(入站是说读取数据到程序处理的过程,出站是说写出数据到调用内核write方法写出去数据的过程)。两个方向的操作都是不同的,因此,netty 定义了两个子接口继承ChannelHandler。

    入站:ChannelInboundHandler

    出站: ChannelOutboundHandler

    入站出站都可以处理的handler:ChannelDuplexHandler

    public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {

    3. ChannelHandlerContext 作用

    ChannelHandlerContext 同时继承了 ChannelInboundInvoker, ChannelOutboundInvoker。ChannelHandlerContext 也 定义了一些自己的方法。这些方法能够获取Context 上下文环境的对象,比如channel、executor、handler、pipeline, 内存分配器,关联的handler 是否被删除等信息。Context 就是包装了handler相关的一切,以方便Contex 可以在pipeline 方便的操作handler。

    public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {

    如下自身的方法:

    3. 创建过程

    1. 任何一个SocketChannel 创建的同时都会创建一个pipeline

    io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel) 源码如下:

        protected AbstractChannel(Channel parent) {
            this.parent = parent;
            id = newId();
            unsafe = newUnsafe();
            pipeline = newChannelPipeline();
        }
    
        protected DefaultChannelPipeline newChannelPipeline() {
            return new DefaultChannelPipeline(this);
        }

    io.netty.channel.DefaultChannelPipeline#DefaultChannelPipeline:

        protected DefaultChannelPipeline(Channel channel) {
            this.channel = ObjectUtil.checkNotNull(channel, "channel");
            succeededFuture = new SucceededChannelFuture(channel, null);
            voidPromise =  new VoidChannelPromise(channel, true);
    
            tail = new TailContext(this);
            head = new HeadContext(this);
    
            head.next = tail;
            tail.prev = head;
        }

    可以看到链表有两个伪节点(头和尾)。

    1》头节点:

    io.netty.channel.DefaultChannelPipeline.TailContext (入站处理handler)

        final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
    
            TailContext(DefaultChannelPipeline pipeline) {
                super(pipeline, null, TAIL_NAME, true, false);
                setAddComplete();
            }
    。。。
    }

    2》伪节点:

    HeadContext 是一个入站和出站都兼顾的handler

        final class HeadContext extends AbstractChannelHandlerContext
                implements ChannelOutboundHandler, ChannelInboundHandler {
    
            private final Unsafe unsafe;
    
            HeadContext(DefaultChannelPipeline pipeline) {
                super(pipeline, null, HEAD_NAME, false, true);
                unsafe = pipeline.channel().unsafe();
                setAddComplete();
            }
    。。。
    }

      创建是在客户端建立连接时:

     2. 当用户或系统内部调用pipeline的addXX 方法添加handler 时,都会创建一个包装这handler 的Context

     io.netty.channel.DefaultChannelPipeline#addLast(io.netty.channel.ChannelHandler...)

        @Override
        public final ChannelPipeline addLast(ChannelHandler... handlers) {
            return addLast(null, handlers);
        }
    
        @Override
        public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
            if (handlers == null) {
                throw new NullPointerException("handlers");
            }
    
            for (ChannelHandler h: handlers) {
                if (h == null) {
                    break;
                }
                addLast(executor, null, h);
            }
    
            return this;
        }
    
        @Override
        public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
            final AbstractChannelHandlerContext newCtx;
            synchronized (this) {
                checkMultiplicity(handler);
    
                newCtx = newContext(group, filterName(name, handler), handler);
    
                addLast0(newCtx);
    
                // If the registered is false it means that the channel was not registered on an eventloop yet.
                // In this case we add the context to the pipeline and add a task that will call
                // ChannelHandler.handlerAdded(...) once the channel is registered.
                if (!registered) {
                    newCtx.setAddPending();
                    callHandlerCallbackLater(newCtx, true);
                    return this;
                }
    
                EventExecutor executor = newCtx.executor();
                if (!executor.inEventLoop()) {
                    newCtx.setAddPending();
                    executor.execute(new Runnable() {
                        @Override
                        public void run() {
                            callHandlerAdded0(newCtx);
                        }
                    });
                    return this;
                }
            }
            callHandlerAdded0(newCtx);
            return this;
        }
    
        private void addLast0(AbstractChannelHandlerContext newCtx) {
            AbstractChannelHandlerContext prev = tail.prev;
            newCtx.prev = prev;
            newCtx.next = tail;
            prev.next = newCtx;
            tail.prev = newCtx;
        }

    解释:

    1. io.netty.channel.DefaultChannelPipeline#checkMultiplicity 检查该实例是否是共享的,如果不是并且已经被别的pipeline 使用了,则抛出异常

    2. 调用io.netty.channel.DefaultChannelPipeline#newContext 创建一个DefaultChannelHandlerContext

        private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
            return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
        }

    3. 调用io.netty.channel.DefaultChannelPipeline#addLast0 添加到链条尾部

    4. 做一些其他处理

    4. 调用过程

    读的时候从head开始, 写的时候从tail 开始。当一个请求进来时,会先调用pipeline 的相关方法,如果是入站事件由fireXXX开始,表示开始管道的流动,让后面的handler 继续处理。

    调用过程可以用下图标识:

    1. 入站读取数据追踪

    io.netty.channel.DefaultChannelPipeline#fireChannelRead代码如下:

        @Override
        public final ChannelPipeline fireChannelRead(Object msg) {
            AbstractChannelHandlerContext.invokeChannelRead(head, msg);
            return this;
        }

    读取数据时调用过程如下:

     1》 继续调用io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object)

        static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
            final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeChannelRead(m);
            } else {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        next.invokeChannelRead(m);
                    }
                });
            }
        }

    2》 io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object)

        private void invokeChannelRead(Object msg) {
            if (this.invokeHandler()) {
                try {
                    ((ChannelInboundHandler)this.handler()).channelRead(this, msg);
                } catch (Throwable var3) {
                    this.invokeExceptionCaught(var3);
                }
            } else {
                this.fireChannelRead(msg);
            }
    
        }

      这时候会调用handler的channelRead 方法。也就是具体的handler 的方法。

    3》io.netty.channel.DefaultChannelPipeline.HeadContext#channelRead 方法如下: 相当于没做任何逻辑处理,直接调用下一个处理器处理

            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                ctx.fireChannelRead(msg);
            }

    4》io.netty.channel.AbstractChannelHandlerContext#fireChannelRead

        @Override
        public ChannelHandlerContext fireChannelRead(final Object msg) {
            invokeChannelRead(findContextInbound(), msg);
            return this;
        }

    io.netty.channel.AbstractChannelHandlerContext#findContextInbound 如下:(可以看到入站是找inbound属性为true的context,然后继续进行调用)

        private AbstractChannelHandlerContext findContextInbound() {
            AbstractChannelHandlerContext ctx = this;
            do {
                ctx = ctx.next;
            } while (!ctx.inbound);
            return ctx;
        }

    5》继续调用io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object)方法(同1》 一样)

      也就是如果希望pipeline 中的context 继续处理,需要在handler中继续调用 ctx.fireXXX 方法,比如io.netty.handler.logging.LoggingHandler#channelRead

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (logger.isEnabled(internalLevel)) {
                logger.log(internalLevel, format(ctx, "READ", msg));
            }
            ctx.fireChannelRead(msg);
        }

    2. 出站数据跟踪

    1》io.netty.channel.DefaultChannelPipeline#write(java.lang.Object, io.netty.channel.ChannelPromise)

        public final ChannelFuture write(Object msg, ChannelPromise promise) {
            return tail.write(msg, promise);
        }

    2》io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, io.netty.channel.ChannelPromise)

        public ChannelFuture write(final Object msg, final ChannelPromise promise) {
            write(msg, false, promise);
    
            return promise;
        }

    3》io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, boolean, io.netty.channel.ChannelPromise)

        private void write(Object msg, boolean flush, ChannelPromise promise) {
            ObjectUtil.checkNotNull(msg, "msg");
            try {
                if (isNotValidPromise(promise, true)) {
                    ReferenceCountUtil.release(msg);
                    // cancelled
                    return;
                }
            } catch (RuntimeException e) {
                ReferenceCountUtil.release(msg);
                throw e;
            }
    
            final AbstractChannelHandlerContext next = findContextOutbound(flush ?
                    (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
            final Object m = pipeline.touch(msg, next);
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                if (flush) {
                    next.invokeWriteAndFlush(m, promise);
                } else {
                    next.invokeWrite(m, promise);
                }
            } else {
                final AbstractWriteTask task;
                if (flush) {
                    task = WriteAndFlushTask.newInstance(next, m, promise);
                }  else {
                    task = WriteTask.newInstance(next, m, promise);
                }
                if (!safeExecute(executor, task, promise, m)) {
                    // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes
                    // and put it back in the Recycler for re-use later.
                    //
                    // See https://github.com/netty/netty/issues/8343.
                    task.cancel();
                }
            }
        }

    4》io.netty.channel.AbstractChannelHandlerContext#invokeWriteAndFlush

        private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
            if (invokeHandler()) {
                invokeWrite0(msg, promise);
                invokeFlush0();
            } else {
                writeAndFlush(msg, promise);
            }
        }

    5》io.netty.channel.AbstractChannelHandlerContext#invokeWrite0  这里调用handler的write 方法

        private void invokeWrite0(Object msg, ChannelPromise promise) {
            try {
                ((ChannelOutboundHandler) handler()).write(this, msg, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        }

    6》handler 处理完成如果需要继续处理调用ctx.write(msg, promise);    会重新调用到 io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, io.netty.channel.ChannelPromise)

    【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】
  • 相关阅读:
    【vijos】【优先队列】合并果子
    【vijos】【二叉树】FBI树
    【NOIp复习】数据结构之栈、队列和二叉树
    【Leetcode】53. Maximum Subarray
    PHP json_encode转换空数组为对象
    206. Reverse Linked List
    151. Reverse Words in a String
    74. Search a 2D Matrix
    557. Reverse Words in a String III
    【Leetcode】79. Word Search
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/15091141.html
Copyright © 2011-2022 走看看