zoukankan      html  css  js  c++  java
  • 【Netty之旅四】你一定看得懂的Netty客户端启动源码分析!

    前言

    前面小飞已经讲解了NIONetty服务端启动,这一讲是Client的启动过程。

    源码系列的文章依旧还是遵循大白话+画图的风格来讲解,本文Netty源码及以后的文章版本都基于:4.1.22.Final

    本篇是以NettyClient启动为切入点,带大家一步步进入Netty源码的世界。

    Client启动流程揭秘

    1、探秘的入口:netty-client demo

    这里用netty-exmaple中的EchoClient来作为例子:

    public final class EchoClient {
        public static void main(String[] args) throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group)
                 .channel(NioSocketChannel.class)
                 .option(ChannelOption.TCP_NODELAY, true)
                 .handler(new ChannelInitializer<SocketChannel>() {
                     @Override
                     public void initChannel(SocketChannel ch) throws Exception {
                         ChannelPipeline p = ch.pipeline();
                         p.addLast(new EchoClientHandler());
                     }
                 });
    
                ChannelFuture f = b.connect(HOST, PORT).sync();
    
                f.channel().closeFuture().sync();
            } finally {
                group.shutdownGracefully();
            }
        }
    }
    

    代码没有什么独特的地方,我们上一篇文章时也梳理过Netty网络编程的一些套路,这里就不再赘述了。
    (忘记的小朋友可以查看Netty系列文章中查找~)

    上面的客户端代码虽然简单, 但是却展示了Netty 客户端初始化时所需的所有内容:

    • EventLoopGroupNetty服务端或者客户端,都必须指定EventLoopGroup,客户端指定的是NioEventLoopGroup
    • Bootstrap: Netty客户端启动类,负责客户端的启动和初始化过程
    • channel()类型:指定Channel的类型,因为这里是客户端,所以使用的是NioSocketChannel,服务端会使用NioServerSocketChannel
    • Handler:设置数据的处理器
    • bootstrap.connect(): 客户端连接netty服务的方法

    2、NioEventLoopGroup 流程解析

    我们先从NioEventLoopGroup开始,一行行代码解析,先看看其类结构:

    NioEventLoopGroup类结构.png

    上面是大致的类结构,而 EventLoop 又继承自EventLoopGroup,所以类的大致结构我们可想而知。这里一些核心逻辑会在MultithreadEventExecutorGroup中,包含EventLoopGroup的创建和初始化操作等。

    接着从NioEventLoopGroup构造方法开始看起,一步步往下跟(代码都只展示重点的部分,省去很多暂时不需要关心的代码,以下代码都遵循这个原则):

    EventLoopGroup group = new NioEventLoopGroup();
    
    public NioEventLoopGroup() {
        this(0);
    }
    
    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }
    
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
    

    这里通过调用this()super()方法一路往下传递,期间会构造一些默认属性,一直传递到MultithreadEventExecutorGroup类中,接着往西看。

    2.1、MultithreadEventExecutorGroup

    上面构造函数有一个重要的参数传递:DEFAULT_EVENT_LOOP_THREADS,这个值默认是CPU核数 * 2

    为什么要传递这个参数呢?我们之前说过EventLoopGroup可以理解成一个线程池,MultithreadEventExecutorGroup有一个线程数组EventExecutor[] children属性,而传递过来的DEFAULT_EVENT_LOOP_THREADS就是数组的长度。

    先看下MultithreadEventExecutorGroup中的构造方法:

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
        
        children = new EventExecutor[nThreads];
        
        for (int i = 0; i < nThreads; i ++) {
            children[i] = newChild(executor, args);
        }
        
        // ... 省略
    }
    

    这段代码执行逻辑可以理解为:

    • 通过ThreadPerTaskExecutor构造一个Executor执行器,后面会细说,里面包含了线程执行的execute()方法
    • 接着创建一个EventExecutor数组对象,大小为传递进来的threads数量,这个所谓的EventExecutor可以理解为我们的EventLoop,在这个demo中就是NioEventLoop对象
    • 最后调用 newChild 方法逐个初始化EventLoopGroup中的EventLoop对象

    上面只是大概说了下MultithreadEventExecutorGroup中的构造方法做的事情,后面还会一个个详细展开,先不用着急,我们先有个整体的认知就好。

    再回到MultithreadEventExecutorGroup中的构造方法入参中,有个EventExecutorChooserFactory对象,这里面是有个很亮眼的细节设计,通过它我们来洞悉Netty的良苦用心。

    2.1、亮点设计:DefaultEventExecutorChooserFactory

    EventExecutorChooserFactory.png

    EventExecutorChooserFactory这个类的作用是用来选择EventLoop执行器的,我们知道EventLoopGroup是一个包含了CPU * 2个数量的EventLoop数组对象,那每次选择EventLoop来执行任务是选择数组中的哪一个呢?

    我们看一下这个类的具体实现,红框中都是需要重点查看的地方:

    轮询算法实现器.png

    DefaultEventExecutorChooserFactory是一个选择器工厂类,调用里面的next()方法达到一个轮询选择的目的。

    数组的长度是length,执行第n次,取数组中的哪个元素就是对length取余

    w9llHf.png

    继续回到代码的实现,这里的优化就是在于先通过isPowerOfTwo()方法判断数组的长度是否为2的n次幂,判断的方式很巧妙,使用val & -val == val,这里我不做过多的解释,网上还有很多判断2的n次幂的优秀解法,我就不班门弄斧了。(可参考:https://leetcode-cn.com/problems/power-of-two/solution/2de-mi-by-leetcode/)

    当然我认为这里还有更容易理解的一个算法:x & (x - 1) == 0 大家可以看下面的图就懂了,这里就不延展了:

    2的幂次方算法.png

    BUT!!! 这里为什么要去煞费苦心的判断数组的长度是2的n次幂?

    不知道小伙伴们是否还记得大明湖畔HashMap?一般我们要求HashMap数组的长度需要是2的n次幂,因为在key值寻找数组位置的方法:(n - 1) & hash n是数组长度,这里如果数组长度是2的n次幂就可以通过位运算来提升性能,当length为2的n次幂时下面公式是等价的:

    n & (length - 1) <=> n % length

    还记得上面说过,数组的长度默认都是CPU * 2,而一般服务器CPU核心数都是2、4、8、16等等,所以这一个小优化就很实用了,再仔细想想,原来数组长度的初始化也是很讲究的。

    这里位运算的好处就是效率远远高于与运算,Netty针对于这个小细节都做了优化,真是太棒了。

    2.3、线程执行器:ThreadPerTaskExecutor

    接着看下ThreadPerTaskExecutor线程执行器,每次执行任务都会通过它来创建一个线程实体。

    public final class ThreadPerTaskExecutor implements Executor {
        private final ThreadFactory threadFactory;
    
        public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
            if (threadFactory == null) {
                throw new NullPointerException("threadFactory");
            }
            this.threadFactory = threadFactory;
        }
    
        @Override
        public void execute(Runnable command) {
            threadFactory.newThread(command).start();
        }
    }
    

    传递进来的threadFactoryDefaultThreadFactory,这里面会构造NioEventLoop线程命名规则为nioEventLoop-1-xxx,我们就不细看这个了。当线程执行的时候会调用execute()方法,这里会创建一个FastThreadLocalThread线程,具体看代码:

    public class DefaultThreadFactory implements ThreadFactory {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
            return t;
        }
    
        protected Thread newThread(Runnable r, String name) {
            return new FastThreadLocalThread(threadGroup, r, name);
        }
    }
    

    这里通过newThread()来创建一个线程,然后初始化线程对象数据,最终会调用到Thread.init()中。

    2.4、EventLoop初始化

    接着继续看MultithreadEventExecutorGroup构造方法:

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
        children = new EventExecutor[nThreads];
        for (int i = 0; i < nThreads; i ++) {
            children[i] = newChild(executor, args);
            // .... 省略部分代码
        }
    }
    

    上面代码的最后一部分是 newChild 方法, 这个是一个抽象方法, 它的任务是实例化 EventLoop 对象. 我们跟踪一下它的代码, 可以发现, 这个方法在 NioEventLoopGroup 类中实现了, 其内容很简单:

    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }
    
    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }
    

    其实就是实例化一个 NioEventLoop 对象, 然后返回。NioEventLoop构造函数中会保存provider和事件轮询器selector,在其父类中还会创建一个MpscQueue队列,然后保存线程执行器executor

    再回过头来想一想,MultithreadEventExecutorGroup 内部维护了一个 EventExecutor[] children数组, NettyEventLoopGroup 的实现机制其实就建立在 MultithreadEventExecutorGroup 之上。

    每当 Netty 需要一个 EventLoop 时, 会调用 next() 方法从EventLoopGroup数组中获取一个可用的 EventLoop对象。其中next方法的实现是通过NioEventLoopGroup.next()来完成的,就是用的上面有过讲解的通过轮询算法来计算得出的。

    最后总结一下整个 EventLoopGroup 的初始化过程:

    EventLoopGroup构造流程.png

    • EventLoopGroup(其实是MultithreadEventExecutorGroup) 内部维护一个类型为 EventExecutor children 数组,数组长度是nThreads
    • 如果我们在实例化 NioEventLoopGroup 时, 如果指定线程池大小, 则 nThreads 就是指定的值, 反之是处理器核心数 * 2
    • MultithreadEventExecutorGroup 中会调用 newChild 抽象方法来初始化 children 数组
    • 抽象方法 newChild 是在 NioEventLoopGroup 中实现的, 它返回一个 NioEventLoop 实例.
    • NioEventLoop 属性:
      • SelectorProvider provider 属性: NioEventLoopGroup 构造器中通过 SelectorProvider.provider() 获取一个 SelectorProvider
      • Selector selector 属性: NioEventLoop 构造器中通过调用通过 selector = provider.openSelector() 获取一个 selector 对象.

    2.5、NioSocketChannel

    Netty中,Channel是对Socket的抽象,每当Netty建立一个连接后,都会有一个与其对应的Channel实例。

    我们在开头的Demo中,设置了channel(NioSocketChannel.class)NioSocketChannel的类结构如下:

    NioSocketChannel类结构.png

    接着分析代码,当我们调用b.channel()时实际上会进入AbstractBootstrap.channel()逻辑,接着看AbstractBootstrap中代码:

    public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }
    
    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        }
        this.clazz = clazz;
    }
    
    public B channelFactory(ChannelFactory<? extends C> channelFactory) {
        if (channelFactory == null) {
            throw new NullPointerException("channelFactory");
        }
        if (this.channelFactory != null) {
            throw new IllegalStateException("channelFactory set already");
        }
    
        this.channelFactory = channelFactory;
        return self();
    }
    

    可以看到,这里ReflectiveChannelFactory其实就是返回我们指定的channelClass:NioSocketChannel, 然后指定AbstractBootstrap中的channelFactory = new ReflectiveChannelFactory()

    2.6、Channel初始化流程

    到了这一步,我们已经知道NioEventLoopGroupchannel()的流程,接着来看看Channel的 初始化流程,这也是Netty客户端启动的的核心流程之一:

    ChannelFuture f = b.connect(HOST, PORT).sync();
    

    接着就开始从b.connect()为入口一步步往后跟,先看下NioSocketChannel构造的整体流程:

    NioSocketChannel构造流程.png

    connet往后梳理下整体流程:

    Bootstrap.connect -> Bootstrap.doResolveAndConnect -> AbstractBootstrap.initAndRegister

    final ChannelFuture initAndRegister() {
        Channel channel = channelFactory.newChannel();
        init(channel);
        
        ChannelFuture regFuture = config().group().register(channel);
        return regFuture;
    }
    

    为了更易读,这里代码都做了简化,只保留了一些重要的代码。

    紧接着我们看看channelFactory.newChannel()做了什么,这里channelFactoryReflectiveChannelFactory,我们在上面的章节分析过:

    @Override
    public T newChannel() {
        try {
            return clazz.getConstructor().newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }
    

    这里的clazzNioSocketChannel,同样是在上面章节讲到过,这里是调用NioSocketChannel的构造函数然后初始化一个Channel实例。

    public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
        public NioSocketChannel() {
            this(DEFAULT_SELECTOR_PROVIDER);
        }
    
        public NioSocketChannel(SelectorProvider provider) {
            this(newSocket(provider));
        }
    
        private static SocketChannel newSocket(SelectorProvider provider) {
            try {
                return provider.openSocketChannel();
            } catch (IOException e) {
                throw new ChannelException("Failed to open a socket.", e);
            }
        }
    }
    

    这里其实也很简单,就是创建一个Java NIO SocketChannel而已,接着看看NioSocketChannel的父类还做了哪些事情,这里梳理下类的关系:

    NioSocketChannel -> extends AbstractNioByteChannel -> exntends AbstractNioChannel

    public abstract class AbstractNioChannel extends AbstractChannel {
        protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
            super(parent, ch, SelectionKey.OP_READ);
        }
    
        protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
            super(parent);
            ch.configureBlocking(false);
        }
    }
    

    这里会调用父类的构造参数,并且传递readInterestOp = SelectionKey.OP_READ:,这里还有一个很重要的点,配置 Java NIO SocketChannel 为非阻塞的,我们之前在NIO章节的时候讲解过,这里也不再赘述。

    接着继续看AbstractChannel的构造函数:

    public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
        protected AbstractChannel(Channel parent) {
            this.parent = parent;
            id = newId();
            unsafe = newUnsafe();
            pipeline = newChannelPipeline();
        }
    }
    

    这里创建一个ChannelId,创建一个Unsafe对象,这里的Unsafe并不是Java中的Unsafe,后面也会讲到。然后创建一个ChannelPipeline,后面也会讲到,到了这里,一个完整的NioSocketChannel 就初始化完成了,我们再来总结一下:

    • NettySocketChannel 会与 Java 原生的 SocketChannel 绑定在一起;
    • 会注册 Read 事件;
    • 会为每一个 Channel 分配一个 channelId
    • 会为每一个 Channel 创建一个Unsafe对象;
    • 会为每一个 Channel 分配一个 ChannelPipeline

    2.7、Channel 注册流程

    还是回到最上面initAndRegister方法,我们上面都是在分析里面newChannel的操作,这个方法是NioSocketChannel创建的一个流程,接着我们在继续跟init()register()的过程:

     public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
        final ChannelFuture initAndRegister() {
            Channel channel = channelFactory.newChannel();
            init(channel);
            ChannelFuture regFuture = config().group().register(channel);
        }
    }
    

    init()就是将一些参数optionsattrs设置到channel中,我们重点需要看的是register方法,其调用链为:

    AbstractBootstrap.initAndRegister -> MultithreadEventLoopGroup.register -> SingleThreadEventLoop.register -> AbstractUnsafe.register

    这里最后到了unsaferegister()方法,最终调用到AbstractNioChannel.doRegister():

    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        }
    }
    

    javaChannel()就是Java NIO中的SocketChannel,这里是将SocketChannel注册到与eventLoop相关联的selector上。

    Channel注册流程.png

    最后我们整理一下服务启动的整体流程:

    1. initAndRegister()初始化并注册什么呢?
    • channelFactory.newChannel()
    • 通过反射创建一个 NioSocketChannel
    • Java 原生 Channel 绑定到 NettyChannel
    • 注册 Read 事件
    • Channel 分配 id
    • Channel 创建 unsafe对象
    • Channel 创建 ChannelPipeline(默认是 head<=>tail 的双向链表)
    1. `init(channel)``
    • Bootstrap 中的配置设置到 Channel
    1. register(channel)
    • Channel 绑定到一个 EventLoop
    • Java 原生 Channel、NettyChannel、Selector 绑定到 SelectionKey
    • 触发 Register 相关的事件

    2.8 unsafe初始化

    上面有提到过在初始化Channel的过程中会创建一个Unsafe的对象,然后绑定到Channel上:

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }
    

    newUnsafe直接调用到了NioSocketChannel中的方法:

    @Override
    protected AbstractNioUnsafe newUnsafe() {
        return new NioSocketChannelUnsafe();
    }
    

    NioSocketChannelUnsafeNioSocketChannel中的一个内部类,然后向上还有几个父类继承,这里主要是对应到相关Java底层的Socket操作。

    2.9 pipeline初始化

    我们还是回到pipeline初始化的过程,来看一下newChannelPipeline()的具体实现:

    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }
    
    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);
    
        tail = new TailContext(this);
        head = new HeadContext(this);
    
        head.next = tail;
        tail.prev = head;
    }
    

    我们调用 DefaultChannelPipeline 的构造器, 传入了一个 channel, 而这个 channel 其实就是我们实例化的 NioSocketChannel

    DefaultChannelPipeline 会将这个 NioSocketChannel 对象保存在channel 字段中. DefaultChannelPipeline 中, 还有两个特殊的字段, 即 headtail, 而这两个字段是一个双向链表的头和尾. 其实在 DefaultChannelPipeline 中, 维护了一个以 AbstractChannelHandlerContext 为节点的双向链表, 这个链表是 Netty 实现 Pipeline 机制的关键.

    关于 DefaultChannelPipeline 中的双向链表以及它所起的作用, 我们会在后续章节详细讲解。这里只是对pipeline做个初步的认识。

    HeadContext 的继承层次结构如下所示:

    HeadContext继承结构.png

    TailContext 的继承层次结构如下所示:

    TailContext继承结构.png

    我们可以看到, 链表中 head 是一个 ChannelOutboundHandler, 而 tail 则是一个 ChannelInboundHandler.

    3.0、客户端connect过程

    客户端连接的入口方法还是在Bootstrap.connect()中,上面也分析过一部分内容,请求的具体流程是:

    Bootstrap.connect() -> AbstractChannel.coonnect() -> NioSocketChannel.doConnect()

    public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress)
                throws IOException {
        try {
            return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
                @Override
                public Boolean run() throws IOException {
                    return socketChannel.connect(remoteAddress);
                }
            });
        } catch (PrivilegedActionException e) {
            throw (IOException) e.getCause();
        }
    }
    

    看到这里,还是用Java NIO SocketChannel发送的connect请求进行客户端连接请求。

    总结

    本篇文章以一个Netty Client demo为入口,然后解析了NioEventLoopGroup创建的流程、Channel的创建和注册的流程,以及客户端发起connect的具体流程,这里对于很多细节并没有很深的深入下去,这些会放到后续的源码分析文章,敬请期待~

    原创干货分享.png

  • 相关阅读:
    jvisualm 结合 visualGC 进行jvm监控,并分析垃圾回收
    linux 查看服务器cpu 与内存配置
    arthas 使用总结
    selinux contexts 安全上下文的临时更改
    Android 8.1 Doze模式分析(五) Doze白名单及Debug方式
    Window 任意窗口置顶软件Window TopMost Control
    Android ApkToolPlus一个可视化的跨平台 apk 分析工具
    SVN Please execute the 'Cleanup' command.
    Android 如何在64位安卓系统中使用32位SO库
    Android cmd命令查看apk是32位还是64位?
  • 原文地址:https://www.cnblogs.com/wang-meng/p/13711756.html
Copyright © 2011-2022 走看看