zoukankan      html  css  js  c++  java
  • 【Netty之旅四】你一定看得懂的Netty客户端启动源码分析!

    前言

    前面小飞已经讲解了NIONetty服务端启动,这一讲是Client的启动过程。

    源码系列的文章依旧还是遵循大白话+画图的风格来讲解,本文Netty源码及以后的文章版本都基于:4.1.22.Final

    本篇是以NettyClient启动为切入点,带大家一步步进入Netty源码的世界。

    Client启动流程揭秘

    1、探秘的入口:netty-client demo

    这里用netty-exmaple中的EchoClient来作为例子:

    public final class EchoClient {
        public static void main(String[] args) throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group)
                 .channel(NioSocketChannel.class)
                 .option(ChannelOption.TCP_NODELAY, true)
                 .handler(new ChannelInitializer<SocketChannel>() {
                     @Override
                     public void initChannel(SocketChannel ch) throws Exception {
                         ChannelPipeline p = ch.pipeline();
                         p.addLast(new EchoClientHandler());
                     }
                 });
    
                ChannelFuture f = b.connect(HOST, PORT).sync();
    
                f.channel().closeFuture().sync();
            } finally {
                group.shutdownGracefully();
            }
        }
    }
    

    代码没有什么独特的地方,我们上一篇文章时也梳理过Netty网络编程的一些套路,这里就不再赘述了。
    (忘记的小朋友可以查看Netty系列文章中查找~)

    上面的客户端代码虽然简单, 但是却展示了Netty 客户端初始化时所需的所有内容:

    • EventLoopGroupNetty服务端或者客户端,都必须指定EventLoopGroup,客户端指定的是NioEventLoopGroup
    • Bootstrap: Netty客户端启动类,负责客户端的启动和初始化过程
    • channel()类型:指定Channel的类型,因为这里是客户端,所以使用的是NioSocketChannel,服务端会使用NioServerSocketChannel
    • Handler:设置数据的处理器
    • bootstrap.connect(): 客户端连接netty服务的方法

    2、NioEventLoopGroup 流程解析

    我们先从NioEventLoopGroup开始,一行行代码解析,先看看其类结构:

    NioEventLoopGroup类结构.png

    上面是大致的类结构,而 EventLoop 又继承自EventLoopGroup,所以类的大致结构我们可想而知。这里一些核心逻辑会在MultithreadEventExecutorGroup中,包含EventLoopGroup的创建和初始化操作等。

    接着从NioEventLoopGroup构造方法开始看起,一步步往下跟(代码都只展示重点的部分,省去很多暂时不需要关心的代码,以下代码都遵循这个原则):

    EventLoopGroup group = new NioEventLoopGroup();
    
    public NioEventLoopGroup() {
        this(0);
    }
    
    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }
    
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
    

    这里通过调用this()super()方法一路往下传递,期间会构造一些默认属性,一直传递到MultithreadEventExecutorGroup类中,接着往西看。

    2.1、MultithreadEventExecutorGroup

    上面构造函数有一个重要的参数传递:DEFAULT_EVENT_LOOP_THREADS,这个值默认是CPU核数 * 2

    为什么要传递这个参数呢?我们之前说过EventLoopGroup可以理解成一个线程池,MultithreadEventExecutorGroup有一个线程数组EventExecutor[] children属性,而传递过来的DEFAULT_EVENT_LOOP_THREADS就是数组的长度。

    先看下MultithreadEventExecutorGroup中的构造方法:

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
        
        children = new EventExecutor[nThreads];
        
        for (int i = 0; i < nThreads; i ++) {
            children[i] = newChild(executor, args);
        }
        
        // ... 省略
    }
    

    这段代码执行逻辑可以理解为:

    • 通过ThreadPerTaskExecutor构造一个Executor执行器,后面会细说,里面包含了线程执行的execute()方法
    • 接着创建一个EventExecutor数组对象,大小为传递进来的threads数量,这个所谓的EventExecutor可以理解为我们的EventLoop,在这个demo中就是NioEventLoop对象
    • 最后调用 newChild 方法逐个初始化EventLoopGroup中的EventLoop对象

    上面只是大概说了下MultithreadEventExecutorGroup中的构造方法做的事情,后面还会一个个详细展开,先不用着急,我们先有个整体的认知就好。

    再回到MultithreadEventExecutorGroup中的构造方法入参中,有个EventExecutorChooserFactory对象,这里面是有个很亮眼的细节设计,通过它我们来洞悉Netty的良苦用心。

    2.1、亮点设计:DefaultEventExecutorChooserFactory

    EventExecutorChooserFactory.png

    EventExecutorChooserFactory这个类的作用是用来选择EventLoop执行器的,我们知道EventLoopGroup是一个包含了CPU * 2个数量的EventLoop数组对象,那每次选择EventLoop来执行任务是选择数组中的哪一个呢?

    我们看一下这个类的具体实现,红框中都是需要重点查看的地方:

    轮询算法实现器.png

    DefaultEventExecutorChooserFactory是一个选择器工厂类,调用里面的next()方法达到一个轮询选择的目的。

    数组的长度是length,执行第n次,取数组中的哪个元素就是对length取余

    w9llHf.png

    继续回到代码的实现,这里的优化就是在于先通过isPowerOfTwo()方法判断数组的长度是否为2的n次幂,判断的方式很巧妙,使用val & -val == val,这里我不做过多的解释,网上还有很多判断2的n次幂的优秀解法,我就不班门弄斧了。(可参考:https://leetcode-cn.com/problems/power-of-two/solution/2de-mi-by-leetcode/)

    当然我认为这里还有更容易理解的一个算法:x & (x - 1) == 0 大家可以看下面的图就懂了,这里就不延展了:

    2的幂次方算法.png

    BUT!!! 这里为什么要去煞费苦心的判断数组的长度是2的n次幂?

    不知道小伙伴们是否还记得大明湖畔HashMap?一般我们要求HashMap数组的长度需要是2的n次幂,因为在key值寻找数组位置的方法:(n - 1) & hash n是数组长度,这里如果数组长度是2的n次幂就可以通过位运算来提升性能,当length为2的n次幂时下面公式是等价的:

    n & (length - 1) <=> n % length

    还记得上面说过,数组的长度默认都是CPU * 2,而一般服务器CPU核心数都是2、4、8、16等等,所以这一个小优化就很实用了,再仔细想想,原来数组长度的初始化也是很讲究的。

    这里位运算的好处就是效率远远高于与运算,Netty针对于这个小细节都做了优化,真是太棒了。

    2.3、线程执行器:ThreadPerTaskExecutor

    接着看下ThreadPerTaskExecutor线程执行器,每次执行任务都会通过它来创建一个线程实体。

    public final class ThreadPerTaskExecutor implements Executor {
        private final ThreadFactory threadFactory;
    
        public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
            if (threadFactory == null) {
                throw new NullPointerException("threadFactory");
            }
            this.threadFactory = threadFactory;
        }
    
        @Override
        public void execute(Runnable command) {
            threadFactory.newThread(command).start();
        }
    }
    

    传递进来的threadFactoryDefaultThreadFactory,这里面会构造NioEventLoop线程命名规则为nioEventLoop-1-xxx,我们就不细看这个了。当线程执行的时候会调用execute()方法,这里会创建一个FastThreadLocalThread线程,具体看代码:

    public class DefaultThreadFactory implements ThreadFactory {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
            return t;
        }
    
        protected Thread newThread(Runnable r, String name) {
            return new FastThreadLocalThread(threadGroup, r, name);
        }
    }
    

    这里通过newThread()来创建一个线程,然后初始化线程对象数据,最终会调用到Thread.init()中。

    2.4、EventLoop初始化

    接着继续看MultithreadEventExecutorGroup构造方法:

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
        children = new EventExecutor[nThreads];
        for (int i = 0; i < nThreads; i ++) {
            children[i] = newChild(executor, args);
            // .... 省略部分代码
        }
    }
    

    上面代码的最后一部分是 newChild 方法, 这个是一个抽象方法, 它的任务是实例化 EventLoop 对象. 我们跟踪一下它的代码, 可以发现, 这个方法在 NioEventLoopGroup 类中实现了, 其内容很简单:

    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }
    
    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }
    

    其实就是实例化一个 NioEventLoop 对象, 然后返回。NioEventLoop构造函数中会保存provider和事件轮询器selector,在其父类中还会创建一个MpscQueue队列,然后保存线程执行器executor

    再回过头来想一想,MultithreadEventExecutorGroup 内部维护了一个 EventExecutor[] children数组, NettyEventLoopGroup 的实现机制其实就建立在 MultithreadEventExecutorGroup 之上。

    每当 Netty 需要一个 EventLoop 时, 会调用 next() 方法从EventLoopGroup数组中获取一个可用的 EventLoop对象。其中next方法的实现是通过NioEventLoopGroup.next()来完成的,就是用的上面有过讲解的通过轮询算法来计算得出的。

    最后总结一下整个 EventLoopGroup 的初始化过程:

    EventLoopGroup构造流程.png

    • EventLoopGroup(其实是MultithreadEventExecutorGroup) 内部维护一个类型为 EventExecutor children 数组,数组长度是nThreads
    • 如果我们在实例化 NioEventLoopGroup 时, 如果指定线程池大小, 则 nThreads 就是指定的值, 反之是处理器核心数 * 2
    • MultithreadEventExecutorGroup 中会调用 newChild 抽象方法来初始化 children 数组
    • 抽象方法 newChild 是在 NioEventLoopGroup 中实现的, 它返回一个 NioEventLoop 实例.
    • NioEventLoop 属性:
      • SelectorProvider provider 属性: NioEventLoopGroup 构造器中通过 SelectorProvider.provider() 获取一个 SelectorProvider
      • Selector selector 属性: NioEventLoop 构造器中通过调用通过 selector = provider.openSelector() 获取一个 selector 对象.

    2.5、NioSocketChannel

    Netty中,Channel是对Socket的抽象,每当Netty建立一个连接后,都会有一个与其对应的Channel实例。

    我们在开头的Demo中,设置了channel(NioSocketChannel.class)NioSocketChannel的类结构如下:

    NioSocketChannel类结构.png

    接着分析代码,当我们调用b.channel()时实际上会进入AbstractBootstrap.channel()逻辑,接着看AbstractBootstrap中代码:

    public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }
    
    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        }
        this.clazz = clazz;
    }
    
    public B channelFactory(ChannelFactory<? extends C> channelFactory) {
        if (channelFactory == null) {
            throw new NullPointerException("channelFactory");
        }
        if (this.channelFactory != null) {
            throw new IllegalStateException("channelFactory set already");
        }
    
        this.channelFactory = channelFactory;
        return self();
    }
    

    可以看到,这里ReflectiveChannelFactory其实就是返回我们指定的channelClass:NioSocketChannel, 然后指定AbstractBootstrap中的channelFactory = new ReflectiveChannelFactory()

    2.6、Channel初始化流程

    到了这一步,我们已经知道NioEventLoopGroupchannel()的流程,接着来看看Channel的 初始化流程,这也是Netty客户端启动的的核心流程之一:

    ChannelFuture f = b.connect(HOST, PORT).sync();
    

    接着就开始从b.connect()为入口一步步往后跟,先看下NioSocketChannel构造的整体流程:

    NioSocketChannel构造流程.png

    connet往后梳理下整体流程:

    Bootstrap.connect -> Bootstrap.doResolveAndConnect -> AbstractBootstrap.initAndRegister

    final ChannelFuture initAndRegister() {
        Channel channel = channelFactory.newChannel();
        init(channel);
        
        ChannelFuture regFuture = config().group().register(channel);
        return regFuture;
    }
    

    为了更易读,这里代码都做了简化,只保留了一些重要的代码。

    紧接着我们看看channelFactory.newChannel()做了什么,这里channelFactoryReflectiveChannelFactory,我们在上面的章节分析过:

    @Override
    public T newChannel() {
        try {
            return clazz.getConstructor().newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }
    

    这里的clazzNioSocketChannel,同样是在上面章节讲到过,这里是调用NioSocketChannel的构造函数然后初始化一个Channel实例。

    public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
        public NioSocketChannel() {
            this(DEFAULT_SELECTOR_PROVIDER);
        }
    
        public NioSocketChannel(SelectorProvider provider) {
            this(newSocket(provider));
        }
    
        private static SocketChannel newSocket(SelectorProvider provider) {
            try {
                return provider.openSocketChannel();
            } catch (IOException e) {
                throw new ChannelException("Failed to open a socket.", e);
            }
        }
    }
    

    这里其实也很简单,就是创建一个Java NIO SocketChannel而已,接着看看NioSocketChannel的父类还做了哪些事情,这里梳理下类的关系:

    NioSocketChannel -> extends AbstractNioByteChannel -> exntends AbstractNioChannel

    public abstract class AbstractNioChannel extends AbstractChannel {
        protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
            super(parent, ch, SelectionKey.OP_READ);
        }
    
        protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
            super(parent);
            ch.configureBlocking(false);
        }
    }
    

    这里会调用父类的构造参数,并且传递readInterestOp = SelectionKey.OP_READ:,这里还有一个很重要的点,配置 Java NIO SocketChannel 为非阻塞的,我们之前在NIO章节的时候讲解过,这里也不再赘述。

    接着继续看AbstractChannel的构造函数:

    public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
        protected AbstractChannel(Channel parent) {
            this.parent = parent;
            id = newId();
            unsafe = newUnsafe();
            pipeline = newChannelPipeline();
        }
    }
    

    这里创建一个ChannelId,创建一个Unsafe对象,这里的Unsafe并不是Java中的Unsafe,后面也会讲到。然后创建一个ChannelPipeline,后面也会讲到,到了这里,一个完整的NioSocketChannel 就初始化完成了,我们再来总结一下:

    • NettySocketChannel 会与 Java 原生的 SocketChannel 绑定在一起;
    • 会注册 Read 事件;
    • 会为每一个 Channel 分配一个 channelId
    • 会为每一个 Channel 创建一个Unsafe对象;
    • 会为每一个 Channel 分配一个 ChannelPipeline

    2.7、Channel 注册流程

    还是回到最上面initAndRegister方法,我们上面都是在分析里面newChannel的操作,这个方法是NioSocketChannel创建的一个流程,接着我们在继续跟init()register()的过程:

     public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
        final ChannelFuture initAndRegister() {
            Channel channel = channelFactory.newChannel();
            init(channel);
            ChannelFuture regFuture = config().group().register(channel);
        }
    }
    

    init()就是将一些参数optionsattrs设置到channel中,我们重点需要看的是register方法,其调用链为:

    AbstractBootstrap.initAndRegister -> MultithreadEventLoopGroup.register -> SingleThreadEventLoop.register -> AbstractUnsafe.register

    这里最后到了unsaferegister()方法,最终调用到AbstractNioChannel.doRegister():

    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        }
    }
    

    javaChannel()就是Java NIO中的SocketChannel,这里是将SocketChannel注册到与eventLoop相关联的selector上。

    Channel注册流程.png

    最后我们整理一下服务启动的整体流程:

    1. initAndRegister()初始化并注册什么呢?
    • channelFactory.newChannel()
    • 通过反射创建一个 NioSocketChannel
    • Java 原生 Channel 绑定到 NettyChannel
    • 注册 Read 事件
    • Channel 分配 id
    • Channel 创建 unsafe对象
    • Channel 创建 ChannelPipeline(默认是 head<=>tail 的双向链表)
    1. `init(channel)``
    • Bootstrap 中的配置设置到 Channel
    1. register(channel)
    • Channel 绑定到一个 EventLoop
    • Java 原生 Channel、NettyChannel、Selector 绑定到 SelectionKey
    • 触发 Register 相关的事件

    2.8 unsafe初始化

    上面有提到过在初始化Channel的过程中会创建一个Unsafe的对象,然后绑定到Channel上:

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }
    

    newUnsafe直接调用到了NioSocketChannel中的方法:

    @Override
    protected AbstractNioUnsafe newUnsafe() {
        return new NioSocketChannelUnsafe();
    }
    

    NioSocketChannelUnsafeNioSocketChannel中的一个内部类,然后向上还有几个父类继承,这里主要是对应到相关Java底层的Socket操作。

    2.9 pipeline初始化

    我们还是回到pipeline初始化的过程,来看一下newChannelPipeline()的具体实现:

    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }
    
    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);
    
        tail = new TailContext(this);
        head = new HeadContext(this);
    
        head.next = tail;
        tail.prev = head;
    }
    

    我们调用 DefaultChannelPipeline 的构造器, 传入了一个 channel, 而这个 channel 其实就是我们实例化的 NioSocketChannel

    DefaultChannelPipeline 会将这个 NioSocketChannel 对象保存在channel 字段中. DefaultChannelPipeline 中, 还有两个特殊的字段, 即 headtail, 而这两个字段是一个双向链表的头和尾. 其实在 DefaultChannelPipeline 中, 维护了一个以 AbstractChannelHandlerContext 为节点的双向链表, 这个链表是 Netty 实现 Pipeline 机制的关键.

    关于 DefaultChannelPipeline 中的双向链表以及它所起的作用, 我们会在后续章节详细讲解。这里只是对pipeline做个初步的认识。

    HeadContext 的继承层次结构如下所示:

    HeadContext继承结构.png

    TailContext 的继承层次结构如下所示:

    TailContext继承结构.png

    我们可以看到, 链表中 head 是一个 ChannelOutboundHandler, 而 tail 则是一个 ChannelInboundHandler.

    3.0、客户端connect过程

    客户端连接的入口方法还是在Bootstrap.connect()中,上面也分析过一部分内容,请求的具体流程是:

    Bootstrap.connect() -> AbstractChannel.coonnect() -> NioSocketChannel.doConnect()

    public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress)
                throws IOException {
        try {
            return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
                @Override
                public Boolean run() throws IOException {
                    return socketChannel.connect(remoteAddress);
                }
            });
        } catch (PrivilegedActionException e) {
            throw (IOException) e.getCause();
        }
    }
    

    看到这里,还是用Java NIO SocketChannel发送的connect请求进行客户端连接请求。

    总结

    本篇文章以一个Netty Client demo为入口,然后解析了NioEventLoopGroup创建的流程、Channel的创建和注册的流程,以及客户端发起connect的具体流程,这里对于很多细节并没有很深的深入下去,这些会放到后续的源码分析文章,敬请期待~

    原创干货分享.png

  • 相关阅读:
    EF+MVC+Bootstrap 项目实践 Day7
    JS---数组
    OS---华硕笔记本从U盘启动安装系统
    PHP--分页类
    PHP--数据库操作类
    OS---net start mysql 发生系统错误5
    MYSQL---远程连接mysql数据库提示:ERROR 1130的解决办法
    CSS-小谈LV,HA!
    MYSQL---设置存储引擎
    MYSQL---存储引擎
  • 原文地址:https://www.cnblogs.com/wang-meng/p/13711756.html
Copyright © 2011-2022 走看看