zoukankan      html  css  js  c++  java
  • Netty之旅三:Netty服务端启动源码分析,一梭子带走!

    Netty服务端启动流程源码分析

    前记

    哈喽,自从上篇《Netty之旅二:口口相传的高性能Netty到底是什么?》后,迟迟两周才开启今天的Netty源码系列。源码分析的第一篇文章,下一篇我会分享客户端的启动过程源码分析。通过源码的阅读,我们将会知道,Netty 服务端启动的调用链是非常长的,同时肯定也会发现一些新的问题,随着我们源码阅读的不断深入,相信这些问题我们也会一一攻破。

    废话不多说,直接上号!

    一、从EchoServer示例入手

    netty-example:EchoServer.png

    示例从哪里来?任何开源框架都会有自己的示例代码,Netty源码也不例外,如模块netty-example中就包括了最常见的EchoServer示例,下面通过这个示例进入服务端启动流程篇章。

    public final class EchoServer {
    
        static final boolean SSL = System.getProperty("ssl") != null;
        static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    
        public static void main(String[] args) throws Exception {
            // Configure SSL.
            final SslContext sslCtx;
            if (SSL) {
                SelfSignedCertificate ssc = new SelfSignedCertificate();
                sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
            } else {
                sslCtx = null;
            }
    
            // 1. 声明Main-Sub Reactor模式线程池:EventLoopGroup
            // Configure the server.
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            // 创建 EchoServerHandler 对象
            final EchoServerHandler serverHandler = new EchoServerHandler();
            try {
                // 2. 声明服务端启动引导器,并设置相关属性
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                 .channel(NioServerSocketChannel.class)
                 .option(ChannelOption.SO_BACKLOG, 100)
                 .handler(new LoggingHandler(LogLevel.INFO))
                 .childHandler(new ChannelInitializer<SocketChannel>() {
                     @Override
                     public void initChannel(SocketChannel ch) throws Exception {
                         ChannelPipeline p = ch.pipeline();
                         if (sslCtx != null) {
                             p.addLast(sslCtx.newHandler(ch.alloc()));
                         }
                         //p.addLast(new LoggingHandler(LogLevel.INFO));
                         p.addLast(serverHandler);
                     }
                 });
    
                // 3. 绑定端口即启动服务端,并同步等待
                // Start the server.
                ChannelFuture f = b.bind(PORT).sync();
    
                // 4. 监听服务端关闭,并阻塞等待
                // Wait until the server socket is closed.
                f.channel().closeFuture().sync();
            } finally {
                // 5. 优雅地关闭两个EventLoopGroup线程池 
                // Shut down all event loops to terminate all threads.
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    1. [代码行18、19]声明Main-Sub Reactor模式线程池:EventLoopGroup

    创建两个 EventLoopGroup 对象。其中,bossGroup用于服务端接受客户端的连接,workerGroup用于进行客户端的 SocketChannel 的数据读写。

    (关于EventLoopGroup不是本文重点所以在后续文章中进行分析)

    1. [代码行23-39]声明服务端启动引导器,并设置相关属性

    AbstractBootstrap是一个帮助类,通过方法链(method chaining)的方式,提供了一个简单易用的方式来配置启动一个Channelio.netty.bootstrap.ServerBootstrap ,实现 AbstractBootstrap 抽象类,用于 Server 的启动器实现类。io.netty.bootstrap.Bootstrap ,实现 AbstractBootstrap 抽象类,用于 Client 的启动器实现类。如下类图所示:

    ![AbstractBootstrap类继承.png](//p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/d877706356d4427a9afd4b13d7177142~tplv-k3u1fbpfcp-zoom-1.image)
    

    (EchoServer示例代码中,我们看到 ServerBootstrap groupchanneloptionchildHandler 等属性链式设置都放到关于AbstractBootstrap体系代码中详细介绍。)

    1. [代码行43]绑定端口即启动服务端,并同步等待

    先调用 #bind(int port) 方法,绑定端口,后调用 ChannelFuture#sync() 方法,阻塞等待成功。对于bind操作就是本文要详细介绍的"服务端启动流程"。

    1. [代码行47]监听服务端关闭,并阻塞等待

    先调用 #closeFuture() 方法,监听服务器关闭,后调用 ChannelFuture#sync() 方法,阻塞等待成功。 注意,此处不是关闭服务器,而是channel的监听关闭。

    1. [代码行51、52]优雅地关闭两个EventLoopGroup线程池

    finally代码块中执行说明服务端将最终关闭,所以调用 EventLoopGroup#shutdownGracefully() 方法,分别关闭两个EventLoopGroup对象,终止所有线程。

    二、服务启动过程

    在服务启动过程的源码分析之前,这里回顾一下我们在通过JDK NIO编程在服务端启动初始的代码:

     serverSocketChannel = ServerSocketChannel.open();
     serverSocketChannel.configureBlocking(false);
     serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
     selector = Selector.open();
     serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    

    这5行代码标示一个最为熟悉的过程:

    • 打开serverSocketChannel
    • 配置非阻塞模式
    • channelsocket绑定监听端口
    • 创建Selector
    • serverSocketChannel注册到 selector

    后面等分析完Netty的启动过程后,会对这些步骤有一个新的认识。在EchoServer示例中,进入 #bind(int port) 方法,AbstractBootstrap#bind()其实有多个方法,方便不同地址参数的传递,实际调用的方法是AbstractBootstrap#doBind(final SocketAddress localAddress) 方法,代码如下:

    private ChannelFuture doBind(final SocketAddress localAddress) {
            final ChannelFuture regFuture = initAndRegister();
            final Channel channel = regFuture.channel();
            if (regFuture.cause() != null) {
                return regFuture;
            }
    
            if (regFuture.isDone()) {
                // At this point we know that the registration was complete and successful.
                ChannelPromise promise = channel.newPromise();
                doBind0(regFuture, channel, localAddress, promise);
                return promise;
            } else {
                // Registration future is almost always fulfilled already, but just in case it's not.
                final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
                regFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        Throwable cause = future.cause();
                        if (cause != null) {
                            // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                            // IllegalStateException once we try to access the EventLoop of the Channel.
                            promise.setFailure(cause);
                        } else {
                            // Registration was successful, so set the correct executor to use.
                            // See https://github.com/netty/netty/issues/2586
                            promise.registered();
    
                            doBind0(regFuture, channel, localAddress, promise);
                        }
                    }
                });
                return promise;
            }
    }
    
    • [代码行2] :调用 #initAndRegister() 方法,初始化并注册一个 Channel 对象。因为注册是异步的过程,所以返回一个 ChannelFuture 对象。详细解析,见 「initAndRegister()」。
    • [代码行4-6]]:若发生异常,直接进行返回。
    • [代码行9-34]:因为注册是异步的过程,有可能已完成,有可能未完成。所以实现代码分成了【第 10 至 14 行】和【第 15 至 36 行】分别处理已完成和未完成的情况。
      • 核心在[第 11 、29行],调用 #doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) 方法,绑定 Channel 的端口,并注册 Channel 到 SelectionKey 中。
      • 如果异步注册对应的 ChanelFuture 未完成,则调用 ChannelFuture#addListener(ChannelFutureListener) 方法,添加监听器,在注册完成后,进行回调执行 #doBind0(...) 方法的逻辑。

    通过doBind方法可以知道服务端启动流程大致如下几个步骤:

    服务端启动流程.png

    1. 创建Channel

    创建服务端channel.png

    #doBind(final SocketAddress localAddress)进入到initAndRegister():

     final ChannelFuture initAndRegister() {
         Channel channel = null;
         try {
             channel = channelFactory.newChannel();
             init(channel);
         } catch (Throwable t) {
             if (channel != null) {
                 // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                 channel.unsafe().closeForcibly();
                 // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                 return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
             }
             // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
             return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
         }
    
         ChannelFuture regFuture = config().group().register(channel);
         if (regFuture.cause() != null) {
             if (channel.isRegistered()) {
                 channel.close();
             } else {
                 channel.unsafe().closeForcibly();
             }
         }
    
         return regFuture;
    }
    

    [代码行4]调用 ChannelFactory#newChannel() 方法,创建Channel对象。 ChannelFactory类继承如下:

    ChannelFactroy类继承.png

    可以在ChannelFactory注释看到@deprecated Use {@link io.netty.channel.ChannelFactory} instead.,这里只是包名的调整,对于继承结构不变。netty默认使用ReflectiveChannelFactory,我们可以看到重载方法:

    @Override
    public T newChannel() {
        try {
            return constructor.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
        }
    }
    

    很明显,正如其名是通过反射机制构造Channel对象实例的。constructor是在其构造方法初始化的:this.constructor = clazz.getConstructor();这个clazz按理说应该是我们要创建的Channel的Class对象。那Class对象是什么呢?我们接着看channelFactory是怎么初始化的。

    首先在AbstractBootstrap找到如下代码:

    @Deprecated
    public B channelFactory(ChannelFactory<? extends C> channelFactory) {
        ObjectUtil.checkNotNull(channelFactory, "channelFactory");
        if (this.channelFactory != null) {
            throw new IllegalStateException("channelFactory set already");
        }
    
        this.channelFactory = channelFactory;
        return self();
    }
    

    调用这个方法的递推向上看到:

    public B channel(Class<? extends C> channelClass) {
        return channelFactory(new ReflectiveChannelFactory<C>(
            ObjectUtil.checkNotNull(channelClass, "channelClass")
        ));
    }
    

    这个方法正是在EchoServerServerBootstrap链式设置时调用.channel(NioServerSocketChannel.class)的方法。我们看到,channelClass就是NioServerSocketChannel.classchannelFactory也是以ReflectiveChannelFactory作为具体实例,并且将NioServerSocketChannel.class作为构造参数传递初始化的,所以这回答了反射机制构造的是io.netty.channel.socket.nio.NioServerSocketChannel对象。

    继续看NioServerSocketChannel构造方法逻辑做了什么事情,看之前先给出NioServerSocketChannel类继承关系:

    Channel类继承.jpg

    NioServerSocketChannelNioSocketChannel分别对应服务端和客户端,公共父类都是AbstractNioChannelAbstractChannel,下面介绍创建过程可以参照这个Channel类继承图。进入NioServerSocketChannel构造方法:

    /**
      * Create a new instance
      */
    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
    

    点击newSocket进去:

    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            /**
              *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
              *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
              *
              *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
              */
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException(
                "Failed to open a server socket.", e);
        }
    }
    

    以上传进来的providerDEFAULT_SELECTOR_PROVIDER即默认的java.nio.channels.spi.SelectorProvider,[代码行9]就是熟悉的jdk nio创建ServerSocketChannel。这样newSocket(DEFAULT_SELECTOR_PROVIDER)就返回了结果ServerSocketChannel,回到NioServerSocketChannel()#this()点进去:

    /**
      * Create a new instance using the given {@link ServerSocketChannel}.
      */
    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
    

    以上super代表父类AbstractNioMessageChannel构造方法,点进去看到:

     /**
       * @see AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int)
       */
    protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent, ch, readInterestOp);
    }
    

    以上super代表父类AbstractNioChannel构造方法,点进去看到:

     protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
         super(parent);
         this.ch = ch;
         this.readInterestOp = readInterestOp;
         try {
             ch.configureBlocking(false);
         } catch (IOException e) {
             try {
                 ch.close();
             } catch (IOException e2) {
                 if (logger.isWarnEnabled()) {
                     logger.warn("Failed to close a partially initialized socket.", e2);
                 }
             }
    
             throw new ChannelException("Failed to enter non-blocking mode.", e);
         }
    }
    

    以上[代码行3]将ServerSocketChannel保存到了AbstractNioChannel#ch成员变量,在上面提到的NioServerSocketChannel构造方法的[代码行6]javaChannel()拿到的就是ch保存的ServerSocketChannel变量。

    以上[代码行6]就是熟悉的jdk nio编程设置ServerSocketChannel非阻塞方式。这里还有super父类构造方法,点击进去看到:

     protected AbstractChannel(Channel parent) {
         this.parent = parent;
         id = newId();
         unsafe = newUnsafe();
         pipeline = newChannelPipeline();
    }
    

    以上构造方法中:

    • parent 属性,代表父 Channel 对象。对于NioServerSocketChannelparentnull
    • id 属性,Channel 编号对象。在构造方法中,通过调用 #newId() 方法进行创建。(这里不细展开Problem-1
    • unsafe 属性,Unsafe 对象。因为Channel 真正的具体操作,是通过调用对应的 Unsafe 对象实施。所以需要在构造方法中,通过调用 #newUnsafe() 方法进行创建。这里的 Unsafe 并不是我们常说的 jdk自带的sun.misc.Unsafe ,而是 io.netty.channel.Channel#Unsafe。(这里不细展开Problem-2
    • pipeline属性默认是DefaultChannelPipeline对象,赋值后在后面为channel绑定端口的时候会用到

    通过以上创建channel源码过程分析,总结的流程时序图如下:

    wwv6q1.jpg

    2. 初始化Channel

    初始化channel.png

    回到一开始创建ChannelinitAndRegister()入口方法,在创建Channel后紧接着init(channel)进入初始化流程,因为是服务端初始化,所以是ServerBootstrap#init(Channel channel),代码如下:

    @Override
    void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }
    
        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }
    
        ChannelPipeline p = channel.pipeline();
    
        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
        }
    
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
    
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }
    
    • [代码 3 - 6 行]: options0()方法返回的options保存了用户在EchoServer中设置自定义的可选项集合,这样ServerBootstrap将配置的选项集合,设置到了 Channel 的可选项集合中。

    • [代码 8 - 15 行]: attrs0()方法返回的attrs保存了用户在EchoServer中设置自定义的属性集合,这样ServerBootstrap将配置的属性集合,设置到了 Channel 的属性集合中。

    • [代码21-28行]:通过局部变量currentChildOptionscurrentChildAttrs保存了用户自定义的childOptionschildAttrs,用于[代码43行] ServerBootstrapAcceptor 构造方法。

    • [代码30-47]]:创建ChannelInitializer 对象,添加到 pipeline 中,用于后续初始化 ChannelHandler pipeline 中,包括用户在EchoServer配置的LoggingHandler和创建的创建 ServerBootstrapAcceptor 对象。

      • [代码行34-37]:添加启动器配置的 LoggingHandlerpipeline 中。

      • [代码行39-45]:创建 ServerBootstrapAcceptor 对象,添加到 pipeline 中。从名字上就可以看出来,ServerBootstrapAcceptor 也是一个 ChannelHandler 实现类,专门用于接受客户端的新连接请求,把新的请求扔给某个事件循环器,我们先不做过多分析。我们发现是使用EventLoop.execute 执行添加的过程,这是为什么呢?同样记录问题(Problem-3)

      • 需要说明的是pipeline 在之前介绍Netty核心组件的时候提到是一个包含ChannelHandlerContext的双向链表,每一个context对于唯一一个ChannelHandler,这里初始化后,ChannelPipeline里就是如下一个结构:

        ChannelPipeline内部结构.png

    3. 注册Channel

    注册channel.png

    初始化Channel一些基本配置和属性完毕后,回到一开始创建ChannelinitAndRegister()入口方法,在初始化Channel后紧接着[代码行17] ChannelFuture regFuture = config().group().register(channel);明显这里是通过EventLoopGroup进入注册流程(EventLoopGroup体系将在后续文章讲解)

    EchoServer中启动器同样通过ServerBootstrap#group()设置了NioEventLoopGroup,它继承自MultithreadEventLoopGroup,所以注册流程会进入MultithreadEventLoopGroup重载的register(Channel channel)方法,代码如下:

    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
    

    这里会调用 next() 方法选择出来一个 EventLoop 来注册 Channel,里面实际上使用的是一个叫做 EventExecutorChooser 的东西来选择,它实际上又有两种实现方式 ——PowerOfTwoEventExecutorChooserGenericEventExecutorChooser,本质上就是从 EventExecutor 数组中选择一个 EventExecutor,我们这里就是 NioEventLoop,那么,它们有什么区别呢?(Problem-4:在介绍EventLoopGroup体系的后续文章中将会详细讲解,这里简单地提一下,本质都是按数组长度取余数 ,不过,2 的 N 次方的形式更高效。

    接着,来到 NioEventLoopregister(channel) 方法,你会不会问找不到该方法?提示NioEventLoop 继承SingleThreadEventLoop,所以父类方法:

    @Override
    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }
    
    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
    

    可以看到,先创建了一个叫做 ChannelPromise 的东西,它是 ChannelFuture 的子类。[代码行9]又调回了 ChannelUnsaferegister () 方法,这里第一个参数是 this,也就是 NioEventLoop,第二个参数是刚创建的 ChannelPromise

    点击 AbstractUnsafe#register(EventLoop eventLoop, final ChannelPromise promise) 方法进去,代码如下:

     public final void register(EventLoop eventLoop, final ChannelPromise promise) {
         if (eventLoop == null) {
             throw new NullPointerException("eventLoop");
         }
         if (isRegistered()) {
             promise.setFailure(new IllegalStateException("registered to an event loop already"));
             return;
         }
         if (!isCompatible(eventLoop)) {
             promise.setFailure(
                 new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
             return;
         }
    
         AbstractChannel.this.eventLoop = eventLoop;
    
         if (eventLoop.inEventLoop()) {
             register0(promise);
         } else {
             try {
                 eventLoop.execute(new Runnable() {
                     @Override
                     public void run() {
                         register0(promise);
                     }
                 });
             } catch (Throwable t) {
                 logger.warn(
                     "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                     AbstractChannel.this, t);
                 closeForcibly();
                 closeFuture.setClosed();
                 safeSetFailure(promise, t);
             }
         }
    }
    

    [代码行15]这行代码是设置 ChanneleventLoop 属性。这行前面的代码主要是在校验传入的 eventLoop 参数非空,校验是否有注册过以及校验 ChanneleventLoop 类型是否匹配。

    [代码18、24]接着,跟踪到 AbstractUnsafe#register0(ChannelPromise promise) 方法中:

    private void register0(ChannelPromise promise) {
        try {
            // check if the channel is still open as it could be closed in the mean time when the register
            // call was outside of the eventLoop
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }
            boolean firstRegistration = neverRegistered;
            doRegister();
            neverRegistered = false;
            registered = true;
    
            // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
            // user may already fire events through the pipeline in the ChannelFutureListener.
            pipeline.invokeHandlerAddedIfNeeded();
    
            safeSetSuccess(promise);
            pipeline.fireChannelRegistered();
            // Only fire a channelActive if the channel has never been registered. This prevents firing
            // multiple channel actives if the channel is deregistered and re-registered.
            if (isActive()) {
                if (firstRegistration) {
                    pipeline.fireChannelActive();
                } else if (config().isAutoRead()) {
                    // This channel was registered before and autoRead() is set. This means we need to begin read
                    // again so that we process inbound data.
                    //
                    // See https://github.com/netty/netty/issues/4805
                    beginRead();
                }
            }
        } catch (Throwable t) {
            // Close the channel directly to avoid FD leak.
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
    

    [代码行9]进入 AbstractNioChannel#doRegister() 方法:

    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }
    

    [代码行5]关键一行代码,将 Java 原生NIO Selector与 Java 原生 NIOChannel 对象(ServerSocketChannel) 绑定在一起,并将当前 Netty 的Channel通过 attachment 的形式绑定到 SelectionKey 上:

    • 调用 #unwrappedSelector() 方法,返回 Java 原生 NIO Selector 对象,而且每个NioEventLoopSelector唯一一对应。
    • 调用 SelectableChannel#register(Selector sel, int ops, Object att) 方法,注册 Java 原生NIOChannel 对象到 NIO Selector对象上。

    通过以上注册channel源码分析,总结流程的时序图如下:

    4. 绑定端口

    绑定端口.png

    注册完Channel最后回到AbstractBootstrap#doBind() 方法,分析 Channel 的端口绑定逻辑。进入doBind0代码如下:

    private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {
    
        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }
    
    • [代码行7]:在前面Channel 注册成功的条件下,调用 EventLoop执行 Channel 的端口绑定逻辑。但是,实际上当前线程已经是 EventLoop所在的线程了,为何还要这样操作呢?答案在【第 5 至 6 行】的英语注释,这里作为一个问题记着(Problem-5)。
    • [代码行11]:进入AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise),同样立即异步返回并添加ChannelFutureListener.CLOSE_ON_FAILURE监听事件。
    • [代码行13]:如果绑定端口之前的操作并没有成功,自然也就不能进行端口绑定操作了,通过promise记录异常原因。

    AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise)方法如下:

     public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
            return pipeline.bind(localAddress, promise);
     }
    

    pipeline是之前创建channel的时候创建的DefaultChannelPipeline,进入该方法:

     public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
            return tail.bind(localAddress, promise);
     }
    

    [在分析初始化流程的时候最后画一个DefaultChannelPipeline内部的结构,能够便于分析后面进入DefaultChannelPipeline一系列bind方法。]

    首先,tail代表TailContext,进入AbstractChannelHandlerContext# bind(final SocketAddress localAddress, final ChannelPromise promise)方法:

     public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
         //省略部分代码
         final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
         EventExecutor executor = next.executor();
         if (executor.inEventLoop()) {
             next.invokeBind(localAddress, promise);
         } else {
             safeExecute(executor, new Runnable() {
                 @Override
                 public void run() {
                     next.invokeBind(localAddress, promise);
                 }
             }, promise, null);
         }
         return promise;
    }
    

    [代码行3]:findContextOutbound方法里主要是执行ctx = ctx.prev;那么得到的next就是绑定LoggingHandlercontext

    [代码行6]:进入invokeBind(localAddress, promise)方法并直接执行LoggingHandler#bind(this, localAddress, promise),进入后的方法如下:

     public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
         if (logger.isEnabled(internalLevel)) {
             logger.log(internalLevel, format(ctx, "BIND", localAddress));
         }
         ctx.bind(localAddress, promise);
     }
    

    设置了LoggingHandler的日志基本级别为默认的INFO后,进行绑定操作的信息打印。接着,继续循环到AbstractChannelHandlerContext# bind(final SocketAddress localAddress, final ChannelPromise promise)方法执行ctx = ctx.prev取出HeadContext进入到bind方法:

     public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
         unsafe.bind(localAddress, promise);
     }
    

    兜兜转转,最终跳出了pipeline轮回到AbstractUnsafe#bind(final SocketAddress localAddress, final ChannelPromise promise) 方法,Channel 的端口绑定逻辑。代码如下:

    public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
        //此处有省略...
        boolean wasActive = isActive();
        try {
            doBind(localAddress);
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            closeIfClosed();
            return;
        }
        //此处有省略...
    }
    

    做实事方法doBind进入后如下:

    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }
    

    到了此处,服务端的 Java 原生 NIO ServerSocketChannel 终于绑定上了端口。

    三、问题归纳

    • Problem-1: 创建Channel流程中AbstractChannel构造函数中为channel分配ID的算法如何实现?
    • Problem-2: AbstractChannel内部类AbstractUnsafe的作用?
    • Problem-3: 初始化channel流程中pipeline 添加ServerBootstrapAcceptor 是通过EventLoop.execute 执行添加的过程,这是为什么呢?
    • Problem-4:注册channel流程中PowerOfTwoEventExecutorChooserGenericEventExecutorChooser的区别和优化原理?
    • Problem-5:绑定端口流程中调用 EventLoop执行 Channel 的端口绑定逻辑。但是,实际上当前线程已经是 EventLoop所在的线程了,为何还要这样操作呢?

    小结

    通过对Netty服务端启动流程源码分析,我们发现了在使用NIO的模式下,服务端启动流程其实就是封装了JDK NIO编程在服务端启动的流程。只不过对原生JDK NIO进行了增强和优化,同时从架构设计上简化了服务端流程的编写。

    最重要的是感谢彤哥、艿艿和俞超-闪电侠这些大佬前期的分享,能够让更多人学习源码的旅途少走很多弯路,谢谢!

    欢迎关注:
    原创干货分享.png

  • 相关阅读:
    【PAT甲级】1043 Is It a Binary Search Tree (25 分)(判断是否为BST的先序遍历并输出后序遍历)
    Educational Codeforces Round 73 (Rated for Div. 2)F(线段树,扫描线)
    【PAT甲级】1042 Shuffling Machine (20 分)
    【PAT甲级】1041 Be Unique (20 分)(多重集)
    【PAT甲级】1040 Longest Symmetric String (25 分)(cin.getline(s,1007))
    【PAT甲级】1039 Course List for Student (25 分)(vector嵌套于map,段错误原因未知)
    Codeforces Round #588 (Div. 2)E(DFS,思维,__gcd,树)
    2017-3-9 SQL server 数据库
    2017-3-8 学生信息展示习题
    2017-3-5 C#基础 函数--递归
  • 原文地址:https://www.cnblogs.com/wang-meng/p/13674137.html
Copyright © 2011-2022 走看看