zoukankan      html  css  js  c++  java
  • 04Channel 与 ChannelPipeline

    Channel 与 ChannelPipeline
    相信大家都知道了, 在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应, 它们的组成关系如下:
    head---->handler----->tail
    通过上图我们可以看到, 一个 Channel 包含了一个 ChannelPipeline,
    而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表.
    这个链表的头是 HeadContext, 链表的尾是 TailContext, 并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler.
    上面的图示给了我们一个对 ChannelPipeline 的直观认识, 但是实际上 Netty 实现的 Channel 是否真的是这样的呢? 我们继续用源码说话.


    在第一章 Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 中, 我们已经知道了一个 Channel 的初始化的基本过程,
    下面我们再回顾一下.
    下面的代码是 AbstractChannel 构造器:
    protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
    }

    在newChannelPipeline函数中把this也就是NioSocketChannel传过去
    protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
    }

    在channelPipeline里面持有一个channel对象
    bstractChannel 有一个 pipeline 字段, 在构造器中会初始化它为 DefaultChannelPipeline的实例. 这里的代码就印证了一点: 每个 Channel 都有一个 ChannelPipeline.
    接着我们跟踪一下 DefaultChannelPipeline 的初始化过程.
    首先进入到 DefaultChannelPipeline 构造器中:

    protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise = new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
    }

    在 DefaultChannelPipeline 构造器中, 首先将与之关联的 Channel 保存到字段 channel 中, 然后实例化两个 ChannelHandlerContext,
    一个是 HeadContext 实例 head, 另一个是 TailContext 实例 tail. 接着将 head 和 tail 互相指向, 构成一个双向链表.
    特别注意到, 我们在开始的示意图中, head 和 tail 并没有包含 ChannelHandler,
    这是因为 HeadContext 和 TailContext 继承于 AbstractChannelHandlerContext 的同时也实现了 ChannelHandler 接口了,
    因此它们有 Context 和 Handler 的双重属性.

    headContext和tailContext是ChannelPipeline的内部类
    headContext和tailContext不仅都继承了AbstractChannelHandlerContext还都实现了ChannelHandler接口
    final class HeadContext extends AbstractChannelHandlerContext
    implements ChannelOutboundHandler, ChannelInboundHandler

    ChannelInBoundHandler或者ChannelOutboundHandler就是继承了ChannelHandler接口
    public interface ChannelInboundHandler extends ChannelHandler
    public interface ChannelOutboundHandler extends ChannelHandler

    接着看一下 HeadContext 的构造器:
    HeadContext(DefaultChannelPipeline pipeline) {
    super(pipeline, null, HEAD_NAME, false, true);
    unsafe = pipeline.channel().unsafe();
    setAddComplete();
    }
    接着调用父类AbstractChannelHandlerContext的构造函数
    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
    boolean inbound, boolean outbound) {
    this.name = ObjectUtil.checkNotNull(name, "name");
    this.pipeline = pipeline;
    this.executor = executor;
    this.inbound = inbound;
    this.outbound = outbound;
    // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
    ordered = executor == null || executor instanceof OrderedEventExecutor;
    }

    ChannelInitializer 的添加
    上面一小节中, 我们已经分析了 Channel 的组成, 其中我们了解到,
    最开始的时候 ChannelPipeline 中含有两个 ChannelHandlerContext(同时也是 ChannelHandler),
    但是这个 Pipeline并不能实现什么特殊的功能, 因为我们还没有给它添加自定义的 ChannelHandler.

    通常来说, 我们在初始化 Bootstrap, 会添加我们自定义的 ChannelHandler, 就以我们熟悉的 EchoClient 来举例吧:
    Bootstrap b = new Bootstrap();
    b.group(group)
    .channel(NioSocketChannel.class)
    .option(ChannelOption.TCP_NODELAY, true)
    .handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline p = ch.pipeline();
    if (sslCtx != null) {
    p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
    }
    //p.addLast(new LoggingHandler(LogLevel.INFO));
    p.addLast(new EchoClientHandler());
    }
    });

    上面代码的初始化过程, 相信大家都不陌生. 在调用 handler 时, 传入了 ChannelInitializer 对象,
    它提供了一个 initChannel 方法供我们初始化 ChannelHandler. 那么这个初始化过程是怎样的呢? 下面我们就来揭开它的神秘面纱.

    ChannelInitializer 实现了 ChannelHandler,
    public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter

    public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler

    那么它是在什么时候添加到 ChannelPipeline 中的呢?
    进行了一番搜索后, 我们发现它是在 Bootstrap.init 方法中添加到 ChannelPipeline 中的.
    final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
    channel = channelFactory.newChannel();
    init(channel);
    } catch (Throwable t) {
    if (channel != null) {
    // channel can be null if newChannel crashed (eg SocketException("too many open files"))
    channel.unsafe().closeForcibly();
    }
    // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
    return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
    if (channel.isRegistered()) {
    channel.close();
    } else {
    channel.unsafe().closeForcibly();
    }
    }

    return regFuture;
    }

    init方法里面传入的是NioSocketChannel
    void init(Channel channel) throws Exception {
    ChannelPipeline p = channel.pipeline();
    p.addLast(config.handler());

    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
    setChannelOptions(channel, options, logger);
    }

    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
    for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
    channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }
    }
    }

    从NioSocketChannel里面拿到pipeline
    ChannelPipeline p = channel.pipeline();
    p.addLast(config.handler());

    config.handler()返回的就是bootstrap.handler()其实就是 ChannelInitializer

    config是bootstrap的成员变量,而且吧bootstrap传给他做参数
    private final BootstrapConfig config = new BootstrapConfig(this);

    addLast方法要一步一步的跟踪,进入DefaultChannelPipeline
    public final ChannelPipeline addLast(ChannelHandler... handlers) {
    return addLast(null, handlers);
    }

    在进入重载函数
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
    if (handlers == null) {
    throw new NullPointerException("handlers");
    }

    for (ChannelHandler h: handlers) {
    if (h == null) {
    break;
    }
    addLast(executor, null, h);
    }

    return this;
    }

    在进入重载函数

    newContext是AbstractChannelHandlerContext对象,把handler也就是ChannelInitializer作为参数传入进去
    在newCtx的时候把handler也就是ChannelInitializer封装在AbstractChannelHandlerContext里面

    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
    checkMultiplicity(handler);

    newCtx = newContext(group, filterName(name, handler), handler);

    addLast0(newCtx);

    // If the registered is false it means that the channel was not registered on an eventloop yet.
    // In this case we add the context to the pipeline and add a task that will call
    // ChannelHandler.handlerAdded(...) once the channel is registered.
    if (!registered) {
    newCtx.setAddPending();
    callHandlerCallbackLater(newCtx, true);
    return this;
    }

    EventExecutor executor = newCtx.executor();
    if (!executor.inEventLoop()) {
    newCtx.setAddPending();
    executor.execute(new Runnable() {
    @Override
    public void run() {
    callHandlerAdded0(newCtx);
    }
    });
    return this;
    }
    }
    callHandlerAdded0(newCtx);
    return this;
    }

    在 addLast0函数中进行添加
    private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
    }

    有朋友可能就有疑惑了, 我明明插入的是一个 ChannelInitializer 实例,
    为什么在 ChannelPipeline 中的双向链表中的元素却是一个 ChannelHandlerContext? 为了解答这个问题, 我们继续在代码中寻找答案吧.
    我们刚才提到, 在 Bootstrap.init 中会调用 p.addLast() 方法, 将 ChannelInitializer 插入到链表末端,
    而 ChannelInitializer是封装在AbstractChannelHandlerContext里面的

    可以清楚地看到, ChannelInitializer 仅仅实现了 ChannelInboundHandler 接口,
    因此这里实例化的 DefaultChannelHandlerContext 的 inbound = true, outbound = false.
    不就是 inbound 和 outbound 两个字段嘛, 为什么需要这么大费周章地分析一番? 其实这两个字段关系到 pipeline 的事件的流向与分类,
    因此是十分关键的, 不过我在这里先卖个关子, 后面我们再来详细分析这两个字段所起的作用. 在这里, 读者只需要记住,
    ChannelInitializer 所对应的 DefaultChannelHandlerContext 的 inbound = true, outbound = false 即可.

    当创建好 Context 后, 就将这个 Context 插入到 Pipeline 的双向链表中

    ------------------------------------------------------------------

    自定义 ChannelHandler 的添加过程

    我们已经分析了一个 ChannelInitializer 如何插入到 Pipeline 中的, 接下来就来探讨一下 ChannelInitializer 在哪里被调用,
    ChannelInitializer 的作用, 以及我们自定义的 ChannelHandler 是如何插入到 Pipeline 中的.

    首先在 AbstractBootstrap.initAndRegister中, 通过 group().register(channel), 调用 MultithreadEventLoopGroup.register 方法
    MultithreadEventLoopGroup是NioEventloopGroup的父类

    public ChannelFuture register(Channel channel) {
    return next().register(channel);
    }

    next返回的是NioeventLoop,NioEventLoop是SingleThreadEventLoop, regiser方法在SingleThreadEventLoop里面


    在MultithreadEventLoopGroup.register 中, 通过 next() 获取一个可用的 SingleThreadEventLoop, 然后调用它的 register

    在 SingleThreadEventLoop.register 中, 通过 channel.unsafe().register(this, promise) 来获取 channel 的 unsafe() 底层操作对象,
    然后调用它的 register.
    public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
    }

    public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
    }
    promise.channel().unsafe().register(this, promise);
    unsafe()是在NioSocketChannel里面创建出来的NioSocketChannelUnsafe


    register方法是AbstractUnsafe, AbstractUnsafe是NioSockketChannelUnsafe的曾祖父
    参数eventLoop是nioEventLoop,
    promise是上面new DefaultChannelPromise 里面持有channel

    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    if (eventLoop == null) {
    throw new NullPointerException("eventLoop");
    }
    if (isRegistered()) {
    promise.setFailure(new IllegalStateException("registered to an event loop already"));
    return;
    }
    if (!isCompatible(eventLoop)) {
    promise.setFailure(
    new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
    return;
    }

    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
    register0(promise);
    } else {
    try {
    eventLoop.execute(new Runnable() {
    @Override
    public void run() {
    register0(promise);
    }
    });
    } catch (Throwable t) {
    logger.warn(
    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
    AbstractChannel.this, t);
    closeForcibly();
    closeFuture.setClosed();
    safeSetFailure(promise, t);
    }
    }
    }

    在 AbstractUnsafe.register 方法中, 调用 register0 方法注册 Channel

    在 AbstractUnsafe.register0 中, 调用 AbstractNioChannel#doRegister 方法

    AbstractNioChannel.doRegister 方法通过 javaChannel().register(eventLoop().selector, 0, this) 将 Channel 对应的 Java NIO SockerChannel 注册到一个 eventLoop 的 Selector 中, 并且将当前 Channel 作为 attachment.

    ---------------------------------------------------------------------------------------------------------

    而我们自定义 ChannelHandler 的添加过程, 发生在 AbstractUnsafe.register0 中,
    在这个方法中调用了 pipeline.fireChannelRegistered() 方法, 其实现如下:

    private void register0(ChannelPromise promise) {
    try {
    // check if the channel is still open as it could be closed in the mean time when the register
    // call was outside of the eventLoop
    if (!promise.setUncancellable() || !ensureOpen(promise)) {
    return;
    }
    boolean firstRegistration = neverRegistered;
    doRegister();
    neverRegistered = false;
    registered = true;

    // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
    // user may already fire events through the pipeline in the ChannelFutureListener.
    pipeline.invokeHandlerAddedIfNeeded();

    safeSetSuccess(promise);
    pipeline.fireChannelRegistered();
    // Only fire a channelActive if the channel has never been registered. This prevents firing
    // multiple channel actives if the channel is deregistered and re-registered.
    if (isActive()) {
    if (firstRegistration) {
    pipeline.fireChannelActive();
    } else if (config().isAutoRead()) {
    // This channel was registered before and autoRead() is set. This means we need to begin read
    // again so that we process inbound data.
    //
    // See https://github.com/netty/netty/issues/4805
    beginRead();
    }
    }
    } catch (Throwable t) {
    // Close the channel directly to avoid FD leak.
    closeForcibly();
    closeFuture.setClosed();
    safeSetFailure(promise, t);
    }
    }


    调用的是DefaultChannelPipeline
    @Override
    public final ChannelPipeline fireChannelRegistered() {
    AbstractChannelHandlerContext.invokeChannelRegistered(head);
    return this;
    }

    static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
    next.invokeChannelRegistered();
    } else {
    executor.execute(new Runnable() {
    @Override
    public void run() {
    next.invokeChannelRegistered();
    }
    });
    }
    }

    next.executor()返回的是channel里面的eventLoop,

    @Override
    public EventExecutor executor() {
    if (executor == null) {
    return channel().eventLoop();
    } else {
    return executor;
    }
    }

    public Channel channel() {
    return pipeline.channel();
    }

    eventLoop是在AbstractUnsafe.register函数里面赋值给AbstractChannel。this.eventLoop对象的
    AbstractUnsafe是AbstractChannel的内部类可以调用外部类

    AbstractUnsafe.register(EventLoop eventLoop, ChannelPromise promise)

    还记得 head 的 类层次结构图不, head 是一个 AbstractChannelHandlerContext 实例,
    并且它没有重写 fireChannelRegistered 方法,
    因此 head.fireChannelRegistered 其实是调用的 AbstractChannelHandlerContext.fireChannelRegistered:

    上面的代码很简单, 就是调用了 head.fireChannelRegistered() 方法而已.

    关于上面代码的 head.fireXXX 的调用形式, 是 Netty 中 Pipeline 传递事件的常用方式, 我们以后会经常看到.

    还记得 head 的 类层次结构图不, head 是一个 AbstractChannelHandlerContext 实例,
    并且它没有重写 fireChannelRegistered 方法,
    因此 head.fireChannelRegistered 其实是调用的 AbstractChannelHandlerContext.fireChannelRegistered:

    static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
    next.invokeChannelRegistered();
    } else {
    executor.execute(new Runnable() {
    @Override
    public void run() {
    next.invokeChannelRegistered();
    }
    });
    }
    }

    executor返回的就是channel里面的NioEventLoop
    public EventExecutor executor() {
    if (executor == null) {
    return channel().eventLoop();
    } else {
    return executor;
    }
    }

    我们已经强调过了, 每个 ChannelHandler 都与一个 ChannelHandlerContext 关联,
    我们可以通过 ChannelHandlerContext 获取到对应的 ChannelHandler.
    因此很显然了, 这里 handler() 返回的, 其实就是 head 对象,
    并接着调用了 head.channelRegistered 方法.

    private void invokeChannelRegistered() {
    if (invokeHandler()) {
    try {
    ((ChannelInboundHandler) handler()).channelRegistered(this);
    } catch (Throwable t) {
    notifyHandlerException(t);
    }
    } else {
    fireChannelRegistered();
    }
    }


    ctx是head对象
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    invokeHandlerAddedIfNeeded();
    ctx.fireChannelRegistered();
    }
    调用head的 fireChannelRegistered 函数
    public ChannelHandlerContext fireChannelRegistered() {
    invokeChannelRegistered(findContextInbound());
    return this;
    }

    很显然, 这个代码会从 head 开始遍历 Pipeline 的双向链表,
    然后找到第一个属性 inbound 为 true 的 ChannelHandlerContext 实例.
    想起来了没? 我们在前面分析 ChannelInitializer 时, 花了大量的笔墨来分析了 inbound 和 outbound 属性,
    你看现在这里就用上了. 回想一下, ChannelInitializer 实现了 ChannelInboudHandler,
    因此它所对应的 ChannelHandlerContext 的 inbound 属性就是 true,
    因此这里返回就是 ChannelInitializer 实例所对应的 ChannelHandlerContext. 即:

    返回的就是DefaultChannelHandlerContext
    private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
    ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
    }

    接着调用DefaultChannelHandlerContext的 invokeChannelRegistered, handler 返回的就是ChannelInitializer
    private void invokeChannelRegistered() {
    if (invokeHandler()) {
    try {
    ((ChannelInboundHandler) handler()).channelRegistered(this);
    } catch (Throwable t) {
    notifyHandlerException(t);
    }
    } else {
    fireChannelRegistered();
    }
    }

    调用 ChannelInitializer 的channelRegistered 函数
    参数 ctx 是 this也就是 DefaultChannelHandlerContext
    @Override
    @SuppressWarnings("unchecked")
    public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    // Normally this method will never be called as handlerAdded(...) should call initChannel(...) and remove
    // the handler.
    if (initChannel(ctx)) {
    // we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not
    // miss an event.
    ctx.pipeline().fireChannelRegistered();
    } else {
    // Called initChannel(...) before which is the expected behavior, so just forward the event.
    ctx.fireChannelRegistered();
    }
    }

    @SuppressWarnings("unchecked")
    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
    try {
    initChannel((C) ctx.channel());
    } catch (Throwable cause) {
    // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
    // We do so to prevent multiple calls to initChannel(...).
    exceptionCaught(ctx, cause);
    } finally {
    remove(ctx);
    }
    return true;
    }
    return false;
    }

    initChannel 这个方法我们很熟悉了吧, 它就是我们在初始化 Bootstrap 时, 调用 handler 方法传入的匿名内部类所实现的方法:
    Bootstrap b = new Bootstrap();
    b.group(group)
    .channel(NioSocketChannel.class)
    .option(ChannelOption.TCP_NODELAY, true)
    .handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline p = ch.pipeline();
    if (sslCtx != null) {
    p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
    }
    //p.addLast(new LoggingHandler(LogLevel.INFO));
    p.addLast(new EchoClientHandler());
    }
    });

    因此当调用了这个方法后, 我们自定义的 ChannelHandler 就插入到 Pipeline 了,

  • 相关阅读:
    Ansi,UTF8,Unicode,ASCII编码的区别
    Delphi 快捷键
    Sql Server2008恢复备份数据库问题
    js图片无缝滚动代码
    SQL Server 2008 清空删除日志文件 130G日志 10秒内变10M .
    JavaScript_循环26个英文字母的方法 .
    iframe跨子域
    sql数据库该名字
    大型网站架构的优化与架构演变(整理) .转自网络
    删除SQL日志语句,经测试8G日志文件都可以删除
  • 原文地址:https://www.cnblogs.com/handsome1013/p/10038337.html
Copyright © 2011-2022 走看看