zoukankan      html  css  js  c++  java
  • Netty中的ChannelPipeline源码分析

    ChannelPipeline在Netty中是用来处理请求的责任链,默认实现是DefaultChannelPipeline,其构造方法如下:

     1 private final Channel channel;
     2 private final ChannelFuture succeededFuture;
     3 private final VoidChannelPromise voidPromise;
     4 final AbstractChannelHandlerContext head;
     5 final AbstractChannelHandlerContext tail;
     6 
     7 protected DefaultChannelPipeline(Channel channel) {
     8     this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel");
     9     this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null);
    10     this.voidPromise = new VoidChannelPromise(channel, true);
    11     this.tail = new DefaultChannelPipeline.TailContext(this);
    12     this.head = new DefaultChannelPipeline.HeadContext(this);
    13     this.head.next = this.tail;
    14     this.tail.prev = this.head;
    15 }

    ChannelPipeline和Channel是一一对应关系,一个Channel绑定一条ChannelPipeline责任链
    succeededFuture 和voidPromise用来处理异步操作
    AbstractChannelHandlerContext 是持有请求的上下文对象,其和ChannelHandler是对应关系(在使用Sharable注解的情况下,不同的AbstractChannelHandlerContext 还可以对应同一个ChannelHandler),ChannelPipeline责任链
    处理的就AbstractChannelHandlerContext ,再将最后的AbstractChannelHandlerContext 交给ChannelHandler去做正真的逻辑处理

    AbstractChannelHandlerContext构造方法如下:

     1 private final String name;
     2 private final DefaultChannelPipeline pipeline;
     3 final EventExecutor executor;
     4 private final boolean inbound;
     5 private final boolean outbound;
     6 private final boolean ordered;
     7 volatile AbstractChannelHandlerContext next;
     8 volatile AbstractChannelHandlerContext prev;
     9 
    10 AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) {
    11     this.name = (String)ObjectUtil.checkNotNull(name, "name");
    12     this.pipeline = pipeline;
    13     this.executor = executor;
    14     this.inbound = inbound;
    15     this.outbound = outbound;
    16     this.ordered = executor == null || executor instanceof OrderedEventExecutor;
    17 }

    name是AbstractChannelHandlerContext的名称,pipeline就是上面说的ChannelPipeline;executor是用来进行异步操作的,默认使用的是在前面博客中说过的NioEventLoop  (Netty中NioEventLoopGroup的创建源码分析

    inbound 和outbound 代表两种请求处理方式,对应Netty中的I/O操作,若是inbound则处理Input操作,由ChannelPipeline从head 开始向后遍历链表,并且只处理ChannelInboundHandler类型的AbstractChannelHandlerContext;若是outbound 则处理Output操作,由ChannelPipeline从tail开始向前遍历链表,并且只处理ChannelOutboundHandler类型的AbstractChannelHandlerContext;
    ordered 是判断是否需要提供executor。

    由next和prev成员可以知道,ChannelPipeline维护的是一条AbstractChannelHandlerContext的双向链表
    其头节点head和尾节点tail分别默认初始化了HeadContext和TailContext

    HeadContext的构造:

    1 final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
    2     private final Unsafe unsafe;
    3     
    4     HeadContext(DefaultChannelPipeline pipeline) {
    5     super(pipeline, (EventExecutor)null, DefaultChannelPipeline.HEAD_NAME, false, true);
    6     this.unsafe = pipeline.channel().unsafe();
    7     this.setAddComplete();
    8     }
    9 }

    其中setAddComplete是由AbstractChannelHandlerContext实现的:

    1 final void setAddComplete() {
    2     int oldState;
    3     do {
    4         oldState = this.handlerState;
    5     } while(oldState != 3 && !HANDLER_STATE_UPDATER.compareAndSet(this, oldState, 2));
    6 
    7 }

    handlerState表示AbstractChannelHandlerContext对应的ChannelHandler的状态,有一下几种:

    1 private static final int ADD_PENDING = 1;
    2 private static final int ADD_COMPLETE = 2;
    3 private static final int REMOVE_COMPLETE = 3;
    4 private static final int INIT = 0;
    5 private volatile int handlerState = 0;    

    handlerState初始化默认是INIT状态。

    HANDLER_STATE_UPDATER是一个原子更新器:

    1 private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");

    所以setAddComplete方法,就是通过CAS操作,将handlerState状态更新为ADD_COMPLETE

    TailContext的构造:

    1 final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
    2     TailContext(DefaultChannelPipeline pipeline) {
    3         super(pipeline, (EventExecutor)null, DefaultChannelPipeline.TAIL_NAME, true, false);
    4         this.setAddComplete();
    5     }
    6 }

    和HeadContext一样,将handlerState状态更新为ADD_COMPLETE


    结合官方给出的ChannelPipeline的图示更容易理解:

     1                                              I/O Request
     2                                         via Channel or
     3                                     ChannelHandlerContext
     4                                                   |
     5 +---------------------------------------------------+---------------+
     6 |                           ChannelPipeline         |               |
     7 |                                                  |/              |
     8 |    +---------------------+            +-----------+----------+    |
     9 |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
    10 |    +----------+----------+            +-----------+----------+    |
    11 |              /|                                  |               |
    12 |               |                                  |/              |
    13 |    +----------+----------+            +-----------+----------+    |
    14 |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
    15 |    +----------+----------+            +-----------+----------+    |
    16 |              /|                                  .               |
    17 |               .                                   .               |
    18 | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
    19 |        [ method call]                       [method call]         |
    20 |               .                                   .               |
    21 |               .                                  |/              |
    22 |    +----------+----------+            +-----------+----------+    |
    23 |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
    24 |    +----------+----------+            +-----------+----------+    |
    25 |              /|                                  |               |
    26 |               |                                  |/              |
    27 |    +----------+----------+            +-----------+----------+    |
    28 |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
    29 |    +----------+----------+            +-----------+----------+    |
    30 |              /|                                  |               |
    31 +---------------+-----------------------------------+---------------+
    32               |                                  |/
    33 +---------------+-----------------------------------+---------------+
    34 |               |                                   |               |
    35 |       [ Socket.read() ]                    [ Socket.write() ]     |
    36 |                                                                   |
    37 |  Netty Internal I/O Threads (Transport Implementation)            |
    38 +-------------------------------------------------------------------+

    下面对一些主要方法分析:
    addFirst方法,有如下几种重载:

     1 public final ChannelPipeline addFirst(ChannelHandler handler) {
     2     return this.addFirst((String)null, (ChannelHandler)handler);
     3 }
     4 
     5 public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
     6     return this.addFirst((EventExecutorGroup)null, name, handler);
     7 }
     8 
     9 public final ChannelPipeline addFirst(ChannelHandler... handlers) {
    10     return this.addFirst((EventExecutorGroup)null, (ChannelHandler[])handlers);
    11 }
    12 
    13 public final ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) {
    14     if (handlers == null) {
    15         throw new NullPointerException("handlers");
    16     } else if (handlers.length != 0 && handlers[0] != null) {
    17         int size;
    18         for(size = 1; size < handlers.length && handlers[size] != null; ++size) {
    19             ;
    20         }
    21 
    22         for(int i = size - 1; i >= 0; --i) {
    23             ChannelHandler h = handlers[i];
    24             this.addFirst(executor, (String)null, h);
    25         }
    26 
    27         return this;
    28     } else {
    29         return this;
    30     }
    31 }
    32 
    33 public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
    34     final AbstractChannelHandlerContext newCtx;
    35     synchronized(this) {
    36         checkMultiplicity(handler);
    37         name = this.filterName(name, handler);
    38         newCtx = this.newContext(group, name, handler);
    39         this.addFirst0(newCtx);
    40         if (!this.registered) {
    41             newCtx.setAddPending();
    42             this.callHandlerCallbackLater(newCtx, true);
    43             return this;
    44         }
    45 
    46         EventExecutor executor = newCtx.executor();
    47         if (!executor.inEventLoop()) {
    48             newCtx.setAddPending();
    49             executor.execute(new Runnable() {
    50                 public void run() {
    51                     DefaultChannelPipeline.this.callHandlerAdded0(newCtx);
    52                 }
    53             });
    54             return this;
    55         }
    56     }
    57 
    58     this.callHandlerAdded0(newCtx);
    59     return this;
    60 }

    前面几种都是间接调用的第四种没什么好说的,直接看第四种addFirst
    首先调用checkMultiplicity,检查ChannelHandlerAdapter在不共享的情况下是否重复:

     1 private static void checkMultiplicity(ChannelHandler handler) {
     2     if (handler instanceof ChannelHandlerAdapter) {
     3         ChannelHandlerAdapter h = (ChannelHandlerAdapter)handler;
     4         if (!h.isSharable() && h.added) {
     5             throw new ChannelPipelineException(h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times.");
     6         }
     7 
     8         h.added = true;
     9     }
    10 
    11 }

    isSharable方法:

     1 public boolean isSharable() {
     2     Class<?> clazz = this.getClass();
     3     Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
     4     Boolean sharable = (Boolean)cache.get(clazz);
     5     if (sharable == null) {
     6         sharable = clazz.isAnnotationPresent(Sharable.class);
     7         cache.put(clazz, sharable);
     8     }
     9 
    10     return sharable;
    11 }

    首先尝试从当前线程的InternalThreadLocalMap中获取handlerSharableCache,(InternalThreadLocalMap是在Netty中使用高效的FastThreadLocal替代JDK的ThreadLocal使用的 Netty中FastThreadLocal源码分析
    InternalThreadLocalMap的handlerSharableCache方法:

    1 public Map<Class<?>, Boolean> handlerSharableCache() {
    2     Map<Class<?>, Boolean> cache = this.handlerSharableCache;
    3     if (cache == null) {
    4         this.handlerSharableCache = (Map)(cache = new WeakHashMap(4));
    5     }
    6 
    7     return (Map)cache;
    8 }

    当当前线程的InternalThreadLocalMap中没有handlerSharableCache时,直接创建一个大小为4的WeakHashMap弱引用Map;

    根据clazz从map中get,若是没有,需要检测当前clazz是否有Sharable注解,添加了Sharable注解的ChannelHandlerAdapter可以在不同Channel中共享使用一个单例,前提是确保线程安全;
    之后会将该clazz以及是否实现Sharable注解的情况添加在cache缓存中;
    其中ChannelHandler的added是用来标识是否添加过;

    回到addFirst方法:
    checkMultiplicity成功结束后,调用filterName方法,给当前要产生的AbstractChannelHandlerContext对象产生一个名称,
    然后调用newContext方法,产生AbstractChannelHandlerContext对象:

    1 private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    2     return new DefaultChannelHandlerContext(this, this.childExecutor(group), name, handler);
    3 }

    这里实际上产生了一个DefaultChannelHandlerContext对象:

     1 final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
     2     private final ChannelHandler handler;
     3 
     4     DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
     5         super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
     6         if (handler == null) {
     7             throw new NullPointerException("handler");
     8         } else {
     9             this.handler = handler;
    10         }
    11     }
    12 
    13     public ChannelHandler handler() {
    14         return this.handler;
    15     }
    16 
    17     private static boolean isInbound(ChannelHandler handler) {
    18         return handler instanceof ChannelInboundHandler;
    19     }
    20 
    21     private static boolean isOutbound(ChannelHandler handler) {
    22         return handler instanceof ChannelOutboundHandler;
    23     }
    24 }

    可以看到DefaultChannelHandlerContext 仅仅是将AbstractChannelHandlerContext和ChannelHandler封装了

    在产生了DefaultChannelHandlerContext 对象后,调用addFirst0方法:

    1 private void addFirst0(AbstractChannelHandlerContext newCtx) {
    2     AbstractChannelHandlerContext nextCtx = this.head.next;
    3     newCtx.prev = this.head;
    4     newCtx.next = nextCtx;
    5     this.head.next = newCtx;
    6     nextCtx.prev = newCtx;
    7 }

    这里就是一个简单的双向链表的操作,将newCtx节点插入到了head后面

    然后判断registered成员的状态:

    1 private boolean registered;

    在初始化时是false

    registered若是false,首先调用AbstractChannelHandlerContext的setAddPending方法:

    1 final void setAddPending() {
    2    boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, 0, 1);
    3 
    4     assert updated;
    5 
    6 }

    和前面说过的setAddComplete方法同理,通过CAS操作,将handlerState状态设置为ADD_PENDING
    接着调用callHandlerCallbackLater方法:

     1 private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
     2     assert !this.registered;
     3 
     4     DefaultChannelPipeline.PendingHandlerCallback task = added ? new DefaultChannelPipeline.PendingHandlerAddedTask(ctx) : new DefaultChannelPipeline.PendingHandlerRemovedTask(ctx);
     5     DefaultChannelPipeline.PendingHandlerCallback pending = this.pendingHandlerCallbackHead;
     6     if (pending == null) {
     7         this.pendingHandlerCallbackHead = (DefaultChannelPipeline.PendingHandlerCallback)task;
     8     } else {
     9         while(pending.next != null) {
    10             pending = pending.next;
    11         }
    12 
    13         pending.next = (DefaultChannelPipeline.PendingHandlerCallback)task;
    14     }
    15 
    16 }

    首先断言判断registered可能存在的多线程改变,然后根据added判断产生何种类型的PendingHandlerCallback
    PendingHandlerCallback是用来处理ChannelHandler的两种回调,定义如下:

     1 private abstract static class PendingHandlerCallback implements Runnable {
     2     final AbstractChannelHandlerContext ctx;
     3     DefaultChannelPipeline.PendingHandlerCallback next;
     4 
     5     PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
     6         this.ctx = ctx;
     7     }
     8 
     9     abstract void execute();
    10 }


    PendingHandlerAddedTask定义如下:

     1 private final class PendingHandlerAddedTask extends DefaultChannelPipeline.PendingHandlerCallback {
     2     PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
     3         super(ctx);
     4     }
     5 
     6     public void run() {
     7         DefaultChannelPipeline.this.callHandlerAdded0(this.ctx);
     8     }
     9 
    10     void execute() {
    11         EventExecutor executor = this.ctx.executor();
    12         if (executor.inEventLoop()) {
    13             DefaultChannelPipeline.this.callHandlerAdded0(this.ctx);
    14         } else {
    15             try {
    16                 executor.execute(this);
    17             } catch (RejectedExecutionException var3) {
    18                 if (DefaultChannelPipeline.logger.isWarnEnabled()) {
    19                     DefaultChannelPipeline.logger.warn("Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.", new Object[]{executor, this.ctx.name(), var3});
    20                 }
    21 
    22                 DefaultChannelPipeline.remove0(this.ctx);
    23                 this.ctx.setRemoved();
    24             }
    25         }
    26 
    27     }
    28 }

    除去异常处理,无论是在execute方法还是在run方法中,主要核心是异步执行callHandlerAdded0方法:

     1 private void callHandlerAdded0(AbstractChannelHandlerContext ctx) {
     2     try {
     3         ctx.setAddComplete();
     4         ctx.handler().handlerAdded(ctx);
     5     } catch (Throwable var10) {
     6         boolean removed = false;
     7 
     8         try {
     9             remove0(ctx);
    10 
    11             try {
    12                 ctx.handler().handlerRemoved(ctx);
    13             } finally {
    14                 ctx.setRemoved();
    15             }
    16 
    17             removed = true;
    18         } catch (Throwable var9) {
    19             if (logger.isWarnEnabled()) {
    20                 logger.warn("Failed to remove a handler: " + ctx.name(), var9);
    21             }
    22         }
    23 
    24         if (removed) {
    25             this.fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() + ".handlerAdded() has thrown an exception; removed.", var10));
    26         } else {
    27             this.fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() + ".handlerAdded() has thrown an exception; also failed to remove.", var10));
    28         }
    29     }
    30 
    31 }

    除去异常处理,主要核心就两行代码,首先通过setAddComplete方法,设置handlerState状态为ADD_COMPLETE,然后回调ChannelHandler的handlerAdded方法,这个handlerAdded方法就很熟悉了,在使用Netty处理业务逻辑时,会覆盖这个方法。

    PendingHandlerRemovedTask定义如下:

     1 private final class PendingHandlerRemovedTask extends DefaultChannelPipeline.PendingHandlerCallback {
     2     PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) {
     3         super(ctx);
     4     }
     5 
     6     public void run() {
     7         DefaultChannelPipeline.this.callHandlerRemoved0(this.ctx);
     8     }
     9 
    10     void execute() {
    11         EventExecutor executor = this.ctx.executor();
    12         if (executor.inEventLoop()) {
    13             DefaultChannelPipeline.this.callHandlerRemoved0(this.ctx);
    14         } else {
    15             try {
    16                 executor.execute(this);
    17             } catch (RejectedExecutionException var3) {
    18                 if (DefaultChannelPipeline.logger.isWarnEnabled()) {
    19                     DefaultChannelPipeline.logger.warn("Can't invoke handlerRemoved() as the EventExecutor {} rejected it, removing handler {}.", new Object[]{executor, this.ctx.name(), var3});
    20                 }
    21 
    22                 this.ctx.setRemoved();
    23             }
    24         }
    25 
    26     }
    27 }

    和PendingHandlerAddedTask一样,主要还是异步调用callHandlerRemoved0方法:

     1 private void callHandlerRemoved0(AbstractChannelHandlerContext ctx) {
     2     try {
     3         try {
     4             ctx.handler().handlerRemoved(ctx);
     5         } finally {
     6             ctx.setRemoved();
     7         }
     8     } catch (Throwable var6) {
     9         this.fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", var6));
    10     }
    11 
    12 }

    首先直接回调ChannelHandler的handlerRemoved方法,然后通过setRemoved方法将handlerState状态设置为REMOVE_COMPLETE

    回到callHandlerCallbackLater,其中成员pendingHandlerCallbackHead定义:

    1 private DefaultChannelPipeline.PendingHandlerCallback pendingHandlerCallbackHead;

    结合PendingHandlerCallback 可知,这个pendingHandlerCallbackHead是 DefaultChannelPipeline存储的一条PendingHandlerCallback单链表,用来处理ChannelHandler的handlerAdded和handlerRemoved的回调,在add的这些方法里调用callHandlerCallbackLater时,added参数都为true,所以add的ChannelHandler只向pendingHandlerCallbackHead添加了handlerAdded的回调。

    回到addFirst方法,若是registered为true,先获取EventExecutor,判断是否处于轮询中,若不是,则需要开启轮询线程直接异步执行callHandlerAdded0方法,若处于轮询,由于ChannelPipeline的调用是发生在轮询时的,所以还是直接异步执行callHandlerAdded0方法。

    addFirst方法到此结束,再来看addLast方法,同样有好几种重载:

     1 public final ChannelPipeline addLast(ChannelHandler handler) {
     2     return this.addLast((String)null, (ChannelHandler)handler);
     3 }
     4 
     5 public final ChannelPipeline addLast(String name, ChannelHandler handler) {
     6     return this.addLast((EventExecutorGroup)null, name, handler);
     7 }
     8 
     9 public final ChannelPipeline addLast(ChannelHandler... handlers) {
    10     return this.addLast((EventExecutorGroup)null, (ChannelHandler[])handlers);
    11 }
    12 
    13 public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
    14     if (handlers == null) {
    15         throw new NullPointerException("handlers");
    16     } else {
    17         ChannelHandler[] var3 = handlers;
    18         int var4 = handlers.length;
    19 
    20         for(int var5 = 0; var5 < var4; ++var5) {
    21             ChannelHandler h = var3[var5];
    22             if (h == null) {
    23                 break;
    24             }
    25 
    26             this.addLast(executor, (String)null, h);
    27         }
    28 
    29         return this;
    30     }
    31 }
    32 
    33 public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    34     final AbstractChannelHandlerContext newCtx;
    35     synchronized(this) {
    36         checkMultiplicity(handler);
    37         newCtx = this.newContext(group, this.filterName(name, handler), handler);
    38         this.addLast0(newCtx);
    39         if (!this.registered) {
    40             newCtx.setAddPending();
    41             this.callHandlerCallbackLater(newCtx, true);
    42             return this;
    43         }
    44 
    45         EventExecutor executor = newCtx.executor();
    46         if (!executor.inEventLoop()) {
    47             newCtx.setAddPending();
    48             executor.execute(new Runnable() {
    49                 public void run() {
    50                     DefaultChannelPipeline.this.callHandlerAdded0(newCtx);
    51                 }
    52             });
    53             return this;
    54         }
    55     }
    56 
    57     this.callHandlerAdded0(newCtx);
    58     return this;
    59 }

    还是间接调用最后一种:
    对比addFirst来看,只有addLast0不一样:

    1 private void addLast0(AbstractChannelHandlerContext newCtx) {
    2     AbstractChannelHandlerContext prev = this.tail.prev;
    3     newCtx.prev = prev;
    4     newCtx.next = this.tail;
    5     prev.next = newCtx;
    6     this.tail.prev = newCtx;
    7 }

    还是非常简单的双向链表基本操作,只不过这次,是将AbstractChannelHandlerContext插入到了tail之前
    还有两个,addBefore和addAfter方法,和上述方法类似,就不再累赘


    接下来看看ChannelPipeline是如何完成请求的传递的:
    invokeHandlerAddedIfNeeded方法:

    1 final void invokeHandlerAddedIfNeeded() {
    2     assert this.channel.eventLoop().inEventLoop();
    3 
    4     if (this.firstRegistration) {
    5         this.firstRegistration = false;
    6         this.callHandlerAddedForAllHandlers();
    7     }
    8 
    9 }

    断言判断是否处于轮询线程(ChannelPipeline处理请求都是在轮询线程中,都需要异步处理)
    其中firstRegistration成员在DefaultChannelPipeline初始化时为true:

    1 private boolean firstRegistration = true;

    此时设置为false,表示第一次调用,以后都不再调用后面的callHandlerAddedForAllHandlers:

     1 private void callHandlerAddedForAllHandlers() {
     2     DefaultChannelPipeline.PendingHandlerCallback pendingHandlerCallbackHead;
     3     synchronized(this) {
     4         assert !this.registered;
     5 
     6         this.registered = true;
     7         pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
     8         this.pendingHandlerCallbackHead = null;
     9     }
    10 
    11     for(DefaultChannelPipeline.PendingHandlerCallback task = pendingHandlerCallbackHead; task != null; task = task.next) {
    12         task.execute();
    13     }
    14 
    15 }

    刚才说过registered初始是false,在这里判断符合,之后就令其为true,然后获取处理ChannelHandler的回调链表pendingHandlerCallbackHead,并且将pendingHandlerCallbackHead置为null
    然后遍历这个单链表,处理ChannelHandler的handlerAdded和handlerRemoved的回调

    fireChannelRegistered方法,当Channel完成了向Selector的注册后,会由channel的Unsafe进行回调,异步处理:

    1 public final ChannelPipeline fireChannelRegistered() {
    2     AbstractChannelHandlerContext.invokeChannelRegistered(this.head);
    3     return this;
    4 }

    实际上的处理由AbstractChannelHandlerContext的静态方法invokeChannelRegistered完成,这里传递的参数head就是DefaultChannelPipeline初始化时创建的HeadContext:

     1 static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
     2     EventExecutor executor = next.executor();
     3     if (executor.inEventLoop()) {
     4         next.invokeChannelRegistered();
     5     } else {
     6         executor.execute(new Runnable() {
     7             public void run() {
     8                 next.invokeChannelRegistered();
     9             }
    10         });
    11     }
    12 
    13 }

    可以看到实际上是异步执行head对象的invokeChannelRegistered方法:

     1 private void invokeChannelRegistered() {
     2     if (this.invokeHandler()) {
     3         try {
     4             ((ChannelInboundHandler)this.handler()).channelRegistered(this);
     5         } catch (Throwable var2) {
     6             this.notifyHandlerException(var2);
     7         }
     8     } else {
     9         this.fireChannelRegistered();
    10     }
    11 
    12 }


    其中invokeHandler是用来判断当前的handlerState状态:

    1 private boolean invokeHandler() {
    2     int handlerState = this.handlerState;
    3     return handlerState == 2 || !this.ordered && handlerState == 1;
    4 }

    若是当前handlerState状态为ADD_COMPLETE,或者不需要提供EventExecutor并且状态为ADD_PENDING时返回true,否则返回false
    在成立的情况下,调用ChannelInboundHandler的channelRegistered方法,由于当前是head,所以由HeadContext实现了:

    1 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    2     DefaultChannelPipeline.this.invokeHandlerAddedIfNeeded();
    3     ctx.fireChannelRegistered();
    4 }

    首先调用invokeHandlerAddedIfNeeded,处理ChannelHandler的handlerAdded和handlerRemoved的回调
    然后调用ctx的fireChannelRegistered方法:

    1 public ChannelHandlerContext fireChannelRegistered() {
    2     invokeChannelRegistered(this.findContextInbound());
    3     return this;
    4 }

    findContextInbound方法,用来找出下一个ChannelInboundInvoker:

    1 private AbstractChannelHandlerContext findContextInbound() {
    2     AbstractChannelHandlerContext ctx = this;
    3 
    4     do {
    5         ctx = ctx.next;
    6     } while(!ctx.inbound);
    7 
    8     return ctx;
    9 }

    从当前节点向后遍历,inbound之前说过,该方法就是找到下一个ChannelInboundInvoker的类型的AbstractChannelHandlerContext,然后调用静态方法invokeChannelRegistered,重复上述操作,若是在ChannelInboundHandler中没有重写channelRegistered方法,会一直执直到完所有ChannelHandler的channelRegistered方法。
    ChannelInboundHandlerAdapter中的默认channelRegistered方法:

    1 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    2     ctx.fireChannelRegistered();
    3 }

    比HeadContext中的实现还简单,直接调用fireChannelRegistered向后传递


    fireChannelRead方法,是在Selector轮循到读事件就绪,会由channel的Unsafe进行回调,异步处理:

    1 public final ChannelPipeline fireChannelRead(Object msg) {
    2     AbstractChannelHandlerContext.invokeChannelRead(this.head, msg);
    3     return this;
    4 }

    还是从head开始调用AbstractChannelHandlerContext的静态方法invokeChannelRead:

     1 static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
     2     final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
     3     EventExecutor executor = next.executor();
     4     if (executor.inEventLoop()) {
     5         next.invokeChannelRead(m);
     6     } else {
     7         executor.execute(new Runnable() {
     8             public void run() {
     9                 next.invokeChannelRead(m);
    10             }
    11         });
    12     }
    13 
    14 }

    和上面一个逻辑异步调用AbstractChannelHandlerContext对象的invokeChannelRead方法:

     1 private void invokeChannelRead(Object msg) {
     2     if (this.invokeHandler()) {
     3         try {
     4             ((ChannelInboundHandler)this.handler()).channelRead(this, msg);
     5         } catch (Throwable var3) {
     6             this.notifyHandlerException(var3);
     7         }
     8     } else {
     9         this.fireChannelRead(msg);
    10     }
    11 
    12 }

    这里也和上面一样,调用了HeadContext的channelRead方法:

    1 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    2     ctx.fireChannelRead(msg);
    3 }

    这里直接不处理,调用ChannelHandlerContext 的fireChannelRead方法:

    1 public ChannelHandlerContext fireChannelRead(Object msg) {
    2     invokeChannelRead(this.findContextInbound(), msg);
    3     return this;
    4 }

    和之前注册一样,选择下一个ChannelInboundHandler,重复执行上述操作。


    再来看到writeAndFlush方法,和上面的就不太一样,这个发生在轮询前,用户通过channel来间接调用,在AbstractChannel中实现:

    1 public ChannelFuture writeAndFlush(Object msg) {
    2     return this.pipeline.writeAndFlush(msg);
    3 }

    实际上直接调用了DefaultChannelPipeline的writeAndFlush方法:

    1 public final ChannelFuture writeAndFlush(Object msg) {
    2     return this.tail.writeAndFlush(msg);
    3 }

    这里又有些不一样了,调用了tail的writeAndFlush方法,即TailContext的writeAndFlush,在AbstractChannelHandlerContext中实现:

    1 public ChannelFuture writeAndFlush(Object msg) {
    2     return this.writeAndFlush(msg, this.newPromise());
    3 }

    newPromise产生了一个ChannelPromise,用来处理异步事件的;实际上调用了writeAndFlush的重载:

     1 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
     2     if (msg == null) {
     3         throw new NullPointerException("msg");
     4     } else if (this.isNotValidPromise(promise, true)) {
     5         ReferenceCountUtil.release(msg);
     6         return promise;
     7     } else {
     8         this.write(msg, true, promise);
     9         return promise;
    10     }
    11 }

    继续调用write方法:

     1 private void write(Object msg, boolean flush, ChannelPromise promise) {
     2     AbstractChannelHandlerContext next = this.findContextOutbound();
     3     Object m = this.pipeline.touch(msg, next);
     4     EventExecutor executor = next.executor();
     5     if (executor.inEventLoop()) {
     6         if (flush) {
     7             next.invokeWriteAndFlush(m, promise);
     8         } else {
     9             next.invokeWrite(m, promise);
    10         }
    11     } else {
    12         Object task;
    13         if (flush) {
    14             task = AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next, m, promise);
    15         } else {
    16             task = AbstractChannelHandlerContext.WriteTask.newInstance(next, m, promise);
    17         }
    18 
    19         safeExecute(executor, (Runnable)task, promise, m);
    20     }
    21 
    22 }

    还是很相似,只不过先调用findContextOutbound找到下一个ChannelOutboundInvoker类型的ChannelHandlerContext,而且这里是从尾部往前遍历的,这样来看前面所给的图是没有任何问题的
    在找到ChannelOutboundInvoker后,调用invokeWriteAndFlush或者invokeWrite方法:
    invokeWriteAndFlush方法:

     1 private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
     2     if (this.invokeHandler()) {
     3         this.invokeWrite0(msg, promise);
     4         this.invokeFlush0();
     5     } else {
     6         this.writeAndFlush(msg, promise);
     7     }
     8 
     9 }
    10 
    11 private void invokeWrite0(Object msg, ChannelPromise promise) {
    12     try {
    13         ((ChannelOutboundHandler)this.handler()).write(this, msg, promise);
    14     } catch (Throwable var4) {
    15         notifyOutboundHandlerException(var4, promise);
    16     }
    17 
    18 }
    19 
    20 private void invokeFlush0() {
    21     try {
    22         ((ChannelOutboundHandler)this.handler()).flush(this);
    23     } catch (Throwable var2) {
    24         this.notifyHandlerException(var2);
    25     }
    26 
    27 }

    可以看到invokeWriteAndFlush回调了ChannelOutboundHandler的write和flush方法

    最终会调用HeadContext的write和flush方法:

    1 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    2     this.unsafe.write(msg, promise);
    3 }
    4 
    5 public void flush(ChannelHandlerContext ctx) throws Exception {
    6     this.unsafe.flush();
    7 }

    可以看到调用了unsafe的write和flush方法,向unsafe缓冲区写入了消息,当Selector轮询到写事件就绪时,就会通过unsafe将刚才写入的内容交由JDK的SocketChannel完成最终的write操作。


    ChannelPipeline的分析到此全部结束。

  • 相关阅读:
    jquery获取html元素的绝对位置和相对位置的方法
    Python基础之集合
    Python基础之集合
    PAT 甲级 1009 Product of Polynomials (25)(25 分)(坑比较多,a可能很大,a也有可能是负数,回头再看看)...
    PAT 甲级 1009 Product of Polynomials (25)(25 分)(坑比较多,a可能很大,a也有可能是负数,回头再看看)
    Python将数据写入excel或者txt,读入csv格式或xls文件,写入csv(写一行空一行解决办法)...
    Python将数据写入excel或者txt,读入csv格式或xls文件,写入csv(写一行空一行解决办法)
    Django将.csv文件(excel文件)显示到网页上
    cuDNN下载地址和指南
    cuDNN下载地址和指南
  • 原文地址:https://www.cnblogs.com/a526583280/p/10961855.html
Copyright © 2011-2022 走看看