zoukankan      html  css  js  c++  java
  • Netty线程模型及EventLoop

    转自https://www.jianshu.com/p/128ddc36e713

    线程模型与并发

    什么是线程模型呢?线程模型指定了线程管理的模型。在进行并发编程的过程中,我们需要小心的处理多个线程之间的同步关系,而一个好的线程模型可以大大减少管理多个线程的成本。在阅读本文之前,你可以选择性的阅读下面列出的文章,来快速了解和回顾java中的并发编程内容:

    Reactor线程模型

    Reactor是一种经典的线程模型,Reactor线程模型分为单线程模型、多线程模型以及主从多线程模型。下面分别分析一下各个Reactor线程模型的优缺点。首先是Reactor单线程模型,下面的图片展示了这个线程模型的结构:

     
    Reactor单线程模型

    Reactor单线程模型仅使用一个线程来处理所有的事情,包括客户端的连接和到服务器的连接,以及所有连接产生的读写事件,这种线程模型需要使用异步非阻塞I/O,使得每一个操作都不会发生阻塞,Handler为具体的处理事件的处理器,而Acceptor为连接的接收者,作为服务端接收来自客户端的链接请求。这样的线程模型理论上可以仅仅使用一个线程就完成所有的事件处理,显得线程的利用率非常高,而且因为只有一个线程在工作,所有不会产生在多线程环境下会发生的各种多线程之间的并发问题,架构简单明了,线程模型的简单性决定了线程管理工作的简单性。但是这样的线程模型存在很多不足,比如:

    • 仅利用一个线程来处理事件,对于目前普遍多核心的机器来说太过浪费资源
    • 一个线程同时处理N个连接,管理起来较为复杂,而且性能也无法得到保证,这是以线程管理的简洁换取来的事件管理的复杂性,而且是在性能无 法得到保证的前提下换取的,在大流量的应用场景下根本没有实用性
    • 根据第二条,当处理的这个线程负载过重之后,处理速度会变慢,会有大量的事件堆积,甚至超时,而超时的情况下,客户端往往会重新发送请求,这样的情况下,这个单线程的模型就会成为整个系统的瓶颈
    • 单线程模型的一个致命缺钱就是可靠性问题,因为仅有一个线程在工作,如果这个线程出错了无法正常执行任务了,那么整个系统就会停止响应,也就是系统会因为这个单线程模型而变得不可用,这在绝大部分场景(所有)下是不允许出现的

    介于上面的种种缺陷,Reactor演变出了第二种模型,也就是Reactor多线程模型,下面展示了这种模型:

     
    Reactor多线程模型

    可以发现,多线程模型下,接收链接和处理请求作为两部分分离了,而Acceptor使用单独的线程来接收请求,做好准备后就交给事件处理的handler来处理,而handler使用了一个线程池来实现,这个线程池可以使用Executor框架实现的线程池来实现,所以,一个连接会交给一个handler线程来复杂其上面的所有事件,需要注意,一个连接只会由一个线程来处理,而多个连接可能会由一个handler线程来处理,关键在于一个连接上的所有事件都只会由一个线程来处理,这样的好处就是消除了不必要的并发同步的麻烦。Reactor多线程模型似乎已经可以很好的工作在我们的项目中了,但是还有一个问题没有解决,那就是,多线程模型下任然只有一个线程来处理客户端的连接请求,那如果这个线程挂了,那整个系统任然会变为不可用,而且,因为仅仅由一个线程来负责客户端的连接请求,如果连接之后要做一些验证之类复杂耗时操作再提交给handler线程来处理的话,就会出现性能问题。

    Reactor多线程模型对Reactor单线程模型做了一些改进,但是在某些场景下任然有所缺陷,所以就有了第三种Reactor模型,Reactor主从多线程模型,下面展示了这种模型的架构:

     
    Reactor主从多线程模型

    Reactor多线程模型解决了Reactor单线程模型和Reactor多线程模型中存在的问题,解决了handler的性能问题,以及Acceptor的安全以及性能问题,Netty就使用了这种线程模型来处理事件。

    Netty线程模型

    在了解了线程模型以及Reactor线程模型之后,我们来看一下Netty的线程模型是怎么样的。首先,Netty使用EventLoop来处理连接上的读写事件,而一个连接上的所有请求都保证在一个EventLoop中被处理,一个EventLoop中只有一个Thread,所以也就实现了一个连接上的所有事件只会在一个线程中被执行。一个EventLoopGroup包含多个EventLoop,可以把一个EventLoop当做是Reactor线程模型中的一个线程,而一个EventLoopGroup类似于一个ExecutorService,当然,这只是为了更好的理解Netty的线程模型,它们之间是没有等价关系的,后面的分析中会详细讲到。下面的图片展示了Netty的线程模型:

     
    Netty线程模型

    首先看一下Netty服务端启动的代码:

    
            // Configure the server.
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                 .channel(NioServerSocketChannel.class)
                 .option(ChannelOption.SO_BACKLOG, 100)
                 .handler(new LoggingHandler(LogLevel.INFO))
                 .childHandler(new ChannelInitializer<SocketChannel>() {
                     @Override
                     public void initChannel(SocketChannel ch) throws Exception {
                         ChannelPipeline p = ch.pipeline();
                         p.addLast(your_handler_name, your_handler_instance);
                     }
                 });
    
                // Start the server.
                ChannelFuture f = b.bind(PORT).sync();
    
                // Wait until the server socket is closed.
                f.channel().closeFuture().sync();
            } finally {
                // Shut down all event loops to terminate all threads.
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
    
    

    Netty的服务端使用了两个EventLoopGroup,而第一个EventLoopGroup通常只有一个EventLoop,通常叫做bossGroup,负责客户端的连接请求,然后打开Channel,交给后面的EventLoopGroup中的一个EventLoop来负责这个Channel上的所有读写事件,一个Channel只会被一个EventLoop处理,而一个EventLoop可能会被分配给多个Channel来负责上面的事件,当然,Netty不仅支持NI/O,还支持OI/O,所以两者的EventLoop分配方式有所区别,下面分别展示了NI/O和OI/O的分配方式:

     
    Netty NIO分配EventLoop模型
     
    Netty OIO分配EventLoop模型

    在NI/O非阻塞模式下,Netty将负责为每个Channel分配一个EventLoop,一旦一个EventLoop呗分配给了一个Channel,那么在它的整个生命周期中都使用这个EventLoop,但是多个Channel将可能共享一个EventLoop,所以和Thread相关的ThreadLocal的使用就要特别注意,因为有多个Channel在使用该Thread来处理读写时间。在阻塞IO模式下,考虑到一个Channel将会阻塞,所以不太可能将一个EventLoop共用于多个Channel之间,所以,每一个Channel都将被分配一个EventLoop,并且反过来也成立,也就是一个EventLoop将只会被绑定到一个Channel上来处理这个Channel上的读写事件。无论是非阻塞模式还是阻塞模式,一个Channel都将会保证一个Channel上的所有读写事件都只会在一个EventLoop上被处理。

    Netty EventLoop

    上文中分析了Reactor线程模型以及Netty的线程模型,在Netty中,EventLoop是一个极为重要的组件,它翻译过来称为事件循环,一个EventLoop将被分配给一个Channel,来负责这个Channel的整个生命周期之内的所有事件,下面来分析一下EventLoop的结构和实现细节。首先展示了EventLoop的类图:

     
    EventLoop类图

    从EventLoop的类图中可以发现,其实EventLoop继承了Java的ScheduledExecutorService,也就是调度线程池,所以,EventLoop应当有ScheduledExecutorService提供的所有功能。那为什么需要继承ScheduledExecutorService呢,也就是为什么需要延时调度功能,那是因为,在Netty中,有可能用户线程和Netty的I/O线程同时操作网络资源,而为了减少并发锁竞争,Netty将用户线程的任务包装成Netty的task,然后向Netty的I/O任务一样去执行它们。有些时候我们需要延时执行任务,或者周期性执行任务,那么就需要调度功能。这是Netty在设计上的考虑,为我们极大的简化的编程方法。

    EventLoop是一个接口,它在继承了ScheduledExecutorService等多个类的同时,仅仅提供了一个方法parent,这个方法返回它属于哪个EventLoopGroup。本文只分析非阻塞模式,而阻塞模式留到未来某个合适的时候再做分析总结。在上文中展示的服务端启动的代码中我们发现我们使用的EventLoop是一个子类NioEventLoopGroup,下面就来分析一下NioEventLoopGroup这个类。首先展示一下NioEventLoopGroup的类图:

     
    NioEventLoopGroup类图

    可以发现,NioEventLoopGroup的实现非常的复杂,但是只要我们清楚了Netty的线程模型,我们就可以有入口去分析它的代码。首先,我们知道每个EventLoop只要一个Thread来处理事件,那我们就来找到那个Thread在什么地方。可以在SingleThreadEventExecutor类中找到thread,它的初始化在doStartThread这个方法中,而这个方法被startThread方法调用,而startThread 这个方法被execute方法调用,也就是提交任务的入口,这个方法是Executor接口的唯一方法。也就是说,所有我们通过EventLoop的execute方法提交的任务都将被这个Thread线程来执行。我们还知道一个事实,EventLoop是一个循环执行来消耗Channel事件的类,那么它必然会有一个类似循环的方法来作为任务,来提交给这个Thread来执行,而这可以在doStartThread方法中被发现,因为这个方法非常重要,所以下面展示了它的实现细节,但是去掉了一些代码来减少代码量:

    
        private void doStartThread() {
            assert thread == null;
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    thread = Thread.currentThread();
                    if (interrupted) {
                        thread.interrupt();
                    }
    
                    boolean success = false;
                    updateLastExecutionTime();
                    try {
                        SingleThreadEventExecutor.this.run();
                        success = true;
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception from an event executor: ", t);
                    } finally {
                        for (;;) {
                            int oldState = state;
                            if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                    SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                                break;
                            }
                        }
    
                        try {
                            // Run all remaining tasks and shutdown hooks.
                            for (;;) {
                                if (confirmShutdown()) {
                                    break;
                                }
                            }
                        } finally {
                            try {
                                cleanup();
                            } finally {
                                STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                                threadLock.release();
                                terminationFuture.setSuccess(null);
                            }
                        }
                    }
                }
            });
        }
    
    

    上面所提到的事件循环就是通过SingleThreadEventExecutor.this.run()这句话来触发的。这个run方法的具体实现在NioEventLoop中,下面展示了它的实现代码:

    
        protected void run() {
            for (;;) {
                try {
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                        case SelectStrategy.CONTINUE:
                            continue;
                        case SelectStrategy.SELECT:
                            select(wakenUp.getAndSet(false));
                            if (wakenUp.get()) {
                                selector.wakeup();
                            }
                            // fall through
                        default:
                    }
    
                    cancelledKeys = 0;
                    needsToSelectAgain = false;
                    final int ioRatio = this.ioRatio;
                    if (ioRatio == 100) {
                        try {
                            processSelectedKeys();
                        } finally {
                            // Ensure we always run tasks.
                            runAllTasks();
                        }
                    } else {
                        final long ioStartTime = System.nanoTime();
                        try {
                            processSelectedKeys();
                        } finally {
                            // Ensure we always run tasks.
                            final long ioTime = System.nanoTime() - ioStartTime;
                            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
                // Always handle shutdown even if the loop processing threw an exception.
                try {
                    if (isShuttingDown()) {
                        closeAll();
                        if (confirmShutdown()) {
                            return;
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
            }
        }
    
    

    首先,我们来分析一下NioEventLoop的相关细节,在一个无限循环里面,只有在遇到shutdown的情况下才会停止循环。然后在循环里会询问是否有事件,如果没有,则继续循环,如果有事件,那么就开始处理时间。上文中我们提到,在事件循环中我们不仅要处理IO事件,还要处理非I/O事件。Netty中可以设置用于I/O操作和非I/O操作的时间占比,默认各位50%,也就是说,如果某次I/O操作的时间花了100ms,那么这次循环中非I/O得任务也可以花费100ms。Netty中的I/O时间处理通过processSelectedKeys方法来进行,而非I/O操作通过runAllTasks反复来进行,首先来看runAllTasks方法,虽然设定了一个可以运行的时间参数,但是实际上Netty并不保证能精确的确保非I/O任务只运行设定的毫秒,下面来看下runAllTasks的代码:

    
        protected boolean runAllTasks(long timeoutNanos) {
            fetchFromScheduledTaskQueue();
            Runnable task = pollTask();
            if (task == null) {
                afterRunningAllTasks();
                return false;
            }
    
            final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
            long runTasks = 0;
            long lastExecutionTime;
            for (;;) {
                safeExecute(task);
    
                runTasks ++;
    
                // Check timeout every 64 tasks because nanoTime() is relatively expensive.
                // XXX: Hard-coded value - will make it configurable if it is really a problem.
                if ((runTasks & 0x3F) == 0) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    if (lastExecutionTime >= deadline) {
                        break;
                    }
                }
    
                task = pollTask();
                if (task == null) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    break;
                }
            }
    
            afterRunningAllTasks();
            this.lastExecutionTime = lastExecutionTime;
            return true;
        }
    
        // 将任务运行起来
        protected static void safeExecute(Runnable task) {
            try {
                task.run();
            } catch (Throwable t) {
                logger.warn("A task raised an exception. Task: {}", task, t);
            }
        }
    

    可以看到,这个方法是在每运行了64个任务之后再进行比较的,如果超出了设定的运行时间则退出,否则再运行64个任务再比较。所以,Netty强烈要求不要在I/O线程中运行阻塞任务,因为阻塞任务将会阻塞住Netty的事件循环,从而造成事件堆积的现象。现在回头看处理I/O任务的processSelectedKeys方法,跟踪代码之后发现最后实际处理I/O事件的一个方法为processSelectedKey,下面展示了它的代码:

    
        private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
            if (!k.isValid()) {
                final EventLoop eventLoop;
                try {
                    eventLoop = ch.eventLoop();
                } catch (Throwable ignored) {
                    // If the channel implementation throws an exception because there is no event loop, we ignore this
                    // because we are only trying to determine if ch is registered to this event loop and thus has authority
                    // to close ch.
                    return;
                }
                // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
                // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
                // still healthy and should not be closed.
                // See https://github.com/netty/netty/issues/5125
                if (eventLoop != this || eventLoop == null) {
                    return;
                }
                // close the channel if the key is not valid anymore
                unsafe.close(unsafe.voidPromise());
                return;
            }
    
            try {
                int readyOps = k.readyOps();
                // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
                // the NIO JDK channel implementation may throw a NotYetConnectedException.
                if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                    // See https://github.com/netty/netty/issues/924
                    int ops = k.interestOps();
                    ops &= ~SelectionKey.OP_CONNECT;
                    k.interestOps(ops);
    
                    unsafe.finishConnect();
                }
    
                // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                    ch.unsafe().forceFlush();
                }
    
                // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
                // to a spin loop
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                }
            } catch (CancelledKeyException ignored) {
                unsafe.close(unsafe.voidPromise());
            }
        }
    
    
    

    这个方法运行的流程为:

    1. 从Channel上获取一个unsafe对象,这个对象 是用来进行NIO操作的一系列系统级API,关于Netty的Channel的深层次分析将在另外的篇章中进行
    2. 从Channel上获取了eventLoop,而这个eventLoop是什么时候分配给Channel的细节在后文中进行分析
    3. 根据事件调用底层API来处理事件

    下面,我们分析一下是什么时候将一个EventLoop分配给一个Channel的,并且这个EventLoop的那个唯一的Thread是什么时候被赋值的。在这个问题上,服务端的流程和客户端的流程可能不太一样,对于服务端来说,首先需要bind一个端口,然后在进行Accept进来的连接,而客户端需要进行connect到服务端。先来分析一下服务端。

    还是看上面提供的服务端的示例代码,其中启动的代码为下面这句代码:

    
     // Start the server.
     ChannelFuture f = b.bind(PORT).sync();
    
    

    也就是我们网络编程中的bind操作,这个操作会发生什么呢?追踪代码如下:

     
     -> AbstractBootstrap.bind(port)
     -> AbstractBootstrap.bind(address) 
     -> AbstractBootstrap.doBind(final SocketAddress localAddress) 
     -> AbstractBootstrap.initAndRegister 
     -> AbstractBootstrap.doBind0 
     -> SingleThreadEventExecutor.execute 
     -> SingleThreadEventExecutor.startThread()
     -> SingleThreadEventExecutor.doStartThread
    
    

    EventLoop在AbstractBootstrap.initAndRegister中获得了一个新的Channel,然后在AbstractBootstrap.doBind0 方法里面调用接下来的方法来初始化EventLoop的Thread的工作,并且将EventLoop的时间循环打开了,可以开始接收客户端的连接请求了。下面来分析一下客户端的流程。

    一个客户端的启动代码示例:

    
            // Configure the client.
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group)
                 .channel(NioSocketChannel.class)
                 .option(ChannelOption.TCP_NODELAY, true)
                 .handler(new ChannelInitializer<SocketChannel>() {
                     @Override
                     public void initChannel(SocketChannel ch) throws Exception {
                         ChannelPipeline p = ch.pipeline();
                         if (sslCtx != null) {
                             p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                         }
                         //p.addLast(new LoggingHandler(LogLevel.INFO));
                         p.addLast(new EchoClientHandler());
                     }
                 });
    
                // Start the client.
                ChannelFuture f = b.connect(HOST, PORT).sync();
    
                // Wait until the connection is closed.
                f.channel().closeFuture().sync();
            } finally {
                // Shut down the event loop to terminate all threads.
                group.shutdownGracefully();
            }
    
    

    其中启动的关键代码为:

    
     // Start the client.
     ChannelFuture f = b.connect(HOST, PORT).sync();
    
    

    下面是connect的调用流程:

     -> Bootstrap.doResolveAndConnect
     -> AbstractBootstrap.initAndRegister 
     -> Bootstrap.doResolveAndConnect0
     -> Bootstrap.doConnect
     -> SingleThreadEventExecutor.execute 
     -> SingleThreadEventExecutor.startThread()
     -> SingleThreadEventExecutor.doStartThread
    
    

    后半部分和服务端的启动过程是一致的,而区别在于服务端是通过bind操作来启动的,而客户端是通过connect操作来启动的。执行到此,客户端和服务端的EventLoop都已经启动起来,服务端可以接受客户端的连接并且处理Channel上的读写事件,而客户端可以去连接远程服务端来请求数据。

    EventLoopGroup

    到目前为止,我们已经知道了Reactor多个线程模型,并且知道了一个EventLoop会负责一个Channel的生命周期内的所有事件,并且知道了服务端和客户端是如何启动这个EventLoop得,但是还有一个问题没有解决,那就是一个EventLoop是如何被分配给一个Channel的。下文就来分析这个分配的原理和过程。而对于阻塞I/O模型的分配和非阻塞I/O模型的分配是不一样的,在上文中也提到这个内容,所以本文只分析对于非阻塞I/O模型的分配。

    EventLoopGroup是用来管理EventLoop的对象,一个EventLoopGroup里面有多个EventLoop,下面展示了EventLoopGroup的类图:

     
    EventLoopGroup类图

    我们从实际的代码出发来分析EventLoopGroup。上文中已经展示了客户端和服务端的启动代码,其中有类似的代码如下:

    
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
    

    上文中我们分析了EventLoop被启动的过程,我们肯定,EventLoop是在分配之后启动的,因为对于服务端而言,bind是一个最开始的网络操作,对于客户端来说,connect也是最开始的网络操作,在这之前是没有关于网络I/O的操作的,所以,EventLoop的分配和启动是在这两个过程或者之后的流程中进行的,但是EventLoop的分配肯定是在启动之前的,但是EventLoop的分配和启动在bind和connect中进行,那么我们可以肯定,EventLoop的分配也是在这两个方法中进行的。为了证明这个假设,回头再看一下服务端的EventLoop的启动过程,其中有一个方法值得我们注意:AbstractBootstrap.initAndRegister,我们进行了init部分的分析,而register部分我们还没有分析,下面就对服务端来进行register部分的分析,下面展示了register的调用链路:

    
     -> Bootstrap.doResolveAndConnect
     -> AbstractBootstrap.initAndRegister 
     -> EventLoopGroup.register
     -> MultithreadEventLoopGroup.register
     -> SingleThreadEventLoop.register
     -> Channel.register
     -> AbstractUnsafe.register
     
             public final void register(EventLoop eventLoop, final ChannelPromise promise) {
                if (eventLoop == null) {
                    throw new NullPointerException("eventLoop");
                }
                if (isRegistered()) {
                    promise.setFailure(new IllegalStateException("registered to an event loop already"));
                    return;
                }
                if (!isCompatible(eventLoop)) {
                    promise.setFailure(
                            new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                    return;
                }
    
                AbstractChannel.this.eventLoop = eventLoop;
    
                if (eventLoop.inEventLoop()) {
                    register0(promise);
                } else {
                    try {
                        eventLoop.execute(new Runnable() {
                            @Override
                            public void run() {
                                register0(promise);
                            }
                        });
                    } catch (Throwable t) {
                        logger.warn(
                                "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                                AbstractChannel.this, t);
                        closeForcibly();
                        closeFuture.setClosed();
                        safeSetFailure(promise, t);
                    }
                }
            }
    
    

    最后展示了AbstractUnsafe.register这个方法,在这里初始化了一个EventLoop,需要记住的一点是,EventLoopGroup中的是EventLoop,不然在追踪代码的时候会迷失。现在来正式看一下NioEventLoopGroup这个类,它的它继承了MultithreadEventExecutorGroup这个类,而我们在初始化EventLoopGroup的时候传递进去的参数,也就是我们希望这个EventLoopGroup拥有的EventLoop数量,会在MultithreadEventExecutorGroup这个类中初始化,并且是在构造函数中初始化的,如果在new EventLoopGroup的时候没有任何参数,那么默认的EventLoop的数量是机器CPU数量的两倍。现在我们来看一下MultithreadEventExecutorGroup这个类的一个重要的构造函数,这个构造函数初始化了EventLoopGroup的EventLoop。

    
     protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
            if (nThreads <= 0) {
                throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
            }
    
            if (executor == null) {
                executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
            }
    
            children = new EventExecutor[nThreads];
    
            for (int i = 0; i < nThreads; i ++) {
                boolean success = false;
                try {
                    children[i] = newChild(executor, args);
                    success = true;
                } catch (Exception e) {
                    // TODO: Think about if this is a good exception type
                    throw new IllegalStateException("failed to create a child event loop", e);
                } finally {
                    if (!success) {
                        for (int j = 0; j < i; j ++) {
                            children[j].shutdownGracefully();
                        }
    
                        for (int j = 0; j < i; j ++) {
                            EventExecutor e = children[j];
                            try {
                                while (!e.isTerminated()) {
                                    e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                                }
                            } catch (InterruptedException interrupted) {
                                // Let the caller handle the interruption.
                                Thread.currentThread().interrupt();
                                break;
                            }
                        }
                    }
                }
            }
    
            chooser = chooserFactory.newChooser(children);
    
            final FutureListener<Object> terminationListener = new FutureListener<Object>() {
                @Override
                public void operationComplete(Future<Object> future) throws Exception {
                    if (terminatedChildren.incrementAndGet() == children.length) {
                        terminationFuture.setSuccess(null);
                    }
                }
            };
    
            for (EventExecutor e: children) {
                e.terminationFuture().addListener(terminationListener);
            }
    
            Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
            Collections.addAll(childrenSet, children);
            readonlyChildren = Collections.unmodifiableSet(childrenSet);
        }
    
    

    一个较为重要的方法为newChild,这是初始化一个EventLoop的方法,下面是它的具体实现,假设我们使用NioEventLoop:

    
        protected EventLoop newChild(Executor executor, Object... args) throws Exception {
            return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
        }
    
    

    我们现在知道了EventLoopGroup管理着很多的EventLoop,上文中我们仅仅分析了分配的流程,但是分配的策略还没有分析,现在来分析一下EventLoopGroup是如何分配EventLoop给Channel的,我们仅分析非阻塞I/O下的分配策略,阻塞模式下的分配策略可以参考非阻塞下的分配策略。

    在MultithreadEventLoopGroup.register方法中,调用了next()方法,我们来看一下这个流程:

    
      -> MultithreadEventExecutorGroup.next()
      
        public EventExecutor next() {
            return chooser.next();
        }
    
    

    chooser是什么东西?

    
     private final EventExecutorChooserFactory.EventExecutorChooser chooser;
    
    

    它是怎么初始化的呢?

    
        public EventExecutorChooser newChooser(EventExecutor[] executors) {
            if (isPowerOfTwo(executors.length)) {
                return new PowerOfTwoEventExecutorChooser(executors);
            } else {
                return new GenericEventExecutorChooser(executors);
            }
        }
    
    

    这是它初始化最后调用的方法,这个方法在DefaultEventExecutorChooserFactory中被实现,这个参数是MultithreadEventExecutorGroup类中的children,也就是EventLoopGroup中的所有EventLoop,那这个newChooser得分配方法就是如果EventLoop的数量是2的n次方,那么就使用PowerOfTwoEventExecutorChooser来分配,否则使用GenericEventExecutorChooser来分配。这两个策略类的分配方法实现分别如下:

         
         1、PowerOfTwoEventExecutorChooser
            public EventExecutor next() {
                return executors[idx.getAndIncrement() & executors.length - 1];
            }
    
        2、GenericEventExecutorChooser
            public EventExecutor next() {
                return executors[Math.abs(idx.getAndIncrement() % executors.length)];
            }
    

    所以,到此为止,我们可以解决为什么一个EventLoop会被分配给多个Channel的疑惑。本文到此也就结束了。篇幅较长,内容涉及到Reactor的三种线程模型,然后分析了Netty的线程模型,然后分析了Netty的EventLoop,以及EventLoopGroup,以及分析了EventLoop是怎么被分配给一个Channel的,和一个EventLoop是如何启动起来来处理事件的。最后分析了EventLoopGroup分配EventLoop的策略,对于本文涉及的内容的更为深入的分析总结,将在未来的某个适宜的时刻进行。

     转自https://www.cnblogs.com/heavenhome/articles/6554262.html

    摘要: Infoq有篇文章提到通过Netty4+Thrift压缩二进制编码技术有人实现了10W TPS(1K的复杂POJO对象)跨节点远程服务调用,对于RPC应用来说高性能的三个主题永远是IO模型、数据协议、线程模型,10W TPS的测试结果一方面归功于Thrift方面压缩二进制编码技术的高效(这里有protobuf和thrift相关测试数据)。另一方面还要归功于Netty精心设计的高效线程模型。本文主要对Netty线程模型的设计进行分析,结合对比JavaScript或Node单线程模型的设计与实现,以及对IO密集型和计算密集型应用方面线程模型的设计进行一些思考。转https://my.oschina.net/andylucc/blog/618179

    Infoq有篇文章提到通过Netty4+Thrift压缩二进制编码技术有人实现了10W TPS(1K的复杂POJO对象)跨节点远程服务调用,对于RPC应用来说高性能的三个主题永远是IO模型、数据协议、线程模型,10W TPS的测试结果一方面归功于Thrift方面压缩二进制编码技术的高效(这里有protobuf和thrift相关测试数据)。另一方面还要归功于Netty精心设计的高效线程模型。本文主要对Netty线程模型的设计进行分析,结合对比JavaScript或Node单线程模型的设计与实现,以及对IO密集型和计算密集型应用方面线程模型的设计进行一些思考。

    单线程Reactor模式

    Netty线程模型总体上可以说是Reactor模式的一种变种,我们先看看什么是Reactor模式。这里主要参考维基百科上对Ractor的定义与描述。

    Reactor模式是一种事件处理模式,单个或多个事件(Event)并发地投递到事件处理服务(Service Handler),事件处理服务将事件进行分离,同步的将他们分发到对应的事件处理器中去处理。Reactor模式有下面几种参与者:

    1. 资源:任何提供系统的输入或者消费系统的输出的资源,如:Socket句柄。

    2. 同步事件分离器:通常使用event loop来进行对资源的阻塞等待,当有资源就绪的时候事件分离器将资源传递给事件分发器。

    3. 事件分发器:处理请求处理器的注册或者反注册,将资源从时间分离器分发到资源对应的请求处理器中同步执行。

    4. 请求处理器:应用定义的对相关资源的请求处理。 

    下面用一张图表示通用Reactor模式的示意图:

    Reactor模式的优点与缺点:

    Reactor模式使得应用代码和Reactor实现相分离,这使得用户可以将应用代码设计成最大程度可复用的模块,由于对于请求处理器的调用的是同步的,用户不需要去考虑并发问题,同时也减少了多线程对系统资源的消耗。另一方面,相比于过程化模式的程序,Reactor模式下的程序相对比较难于Debug,同时单线程的设计在多核时代不能够充分利用多核处理器资源,影响了系统的扩展性。

    这是最简单的单线程Reactor模式,网上也有对于多线程Reactor模式的一些介绍(这里),本文不做过多介绍,多线程Reactor模式也是在原有的模型基础上进行的变种。

    Netty线程模型

    Netty是一款高效的NIO框架和工具,基于JAVA NIO提供的API实现。在JAVA NIO方面Selector给Reactor模式提供了基础,Netty结合Selector和Reactor模式设计了高效的线程模型,Reactor模式的参与者主要有下面一些组件:

    1. Selector

    2. EventLoopGroup/EventLoop

    3. ChannelPipeline

    下面对其功能和其在Netty之Reactor模式中扮演的角色进行介绍。

    Selector

    Selector是JAVA NIO提供的SelectableChannel多路复用器,它内部维护着三个SelectionKey集合,负责配合select操作将就绪的IO事件分离出来,落地为SelectionKey,我前面有一篇文章的一部分对Selector进行了相对详细的介绍(这里)。在Netty线程模型中,我认为Selector充当着demultiplexer的角色,而对于SelectionKey我们可以将它看成Reactor模式中的资源。

    EventLoopGroup/EventLoop

    EventLoopGroup是一组EventLoop的抽象,由于Netty对Reactor模式进行了变种,实际上为更好的利用多核CPU资源,Netty实例中一般会有多个EventLoop同时工作,每个EventLoop维护着一个Selector实例,类似单线程Reactor模式地工作着。至于多少线程可有用户决定,Netty也根据实际上的处理器核数提供了一个默认的数字,我们也建议使用这个数字:

    复制代码
    private static final int DEFAULT_EVENT_LOOP_THREADS;
    
    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
    
        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }
    复制代码

    EventLoopGroup提供next接口,可以总一组EventLoop里面按照一定规则获取其中一个EventLoop来处理任务,对于EventLoopGroup这里需要了解的是在Netty中,在Netty服务器编程中我们需要BossEventLoopGroup和WorkerEventLoopGroup两个EventLoopGroup来进行工作。通常一个服务端口即一个ServerSocketChannel对应一个Selector和一个EventLoop线程,也就是我们建议BossEventLoopGroup的线程数参数这是为1。BossEventLoop负责接收客户端的连接并将SocketChannel交给WorkerEventLoopGroup来进行IO处理。下面是他们的工作示意图:

    如上图,BossEventLoopGroup通常是一个单线程的EventLoop,EventLoop维护着一个注册了ServerSocketChannel的Selector实例,BoosEventLoop不断轮询Selector将连接事件分离出来,通常是OP_ACCEPT事件,然后将accept得到的SocketChannel交给WorkerEventLoopGroup,WorkerEventLoopGroup会由next选择其中一个EventLoopGroup来将这个SocketChannel注册到其维护的Selector并对其后续的IO事件进行处理。在Reactor模式中BossEventLoopGroup主要是对多线程的扩展,而每个EventLoop的实现涵盖IO事件的分离,和分发(Dispatcher)。

    ChannelPipeline

    在Netty中ChannelPipeline维护着一个ChannelHandler的链表队列,每个SocketChannel都有一个维护着一个ChannelPipeline实例,而每个ChannelPipeline实例通常维护着一个ChannelHandler链表队列,由于SocketChannel是和SelectionKey关联的,也就是Reactor模式中的资源,当EventLoop将SelectionKey分离出来的时候会将SelectionKey关联的Channel交给Channel关联的ChannelHandler链来处理,那么ChannelPipeline其实是担任着Reactor模式中的请求处理器这个角色。既然提到ChannelPipeline,这里对其也进行一些简单的介绍吧。

    ChannelPipeline的默认实现是DefaultChannelPipeline,DefaultChannelPipeline本身维护着一个用户不可见的tail和head的ChannelHandler,他们分别位于链表队列的头部和尾部。tail在更上从的部分,而head在靠近网络层的方向。在Netty中关于ChannelHandler有两个重要的接口,ChannelInBoundHandler和ChannelOutBoundHandler。inbound可以理解为网络数据从外部流向系统内部,而outbound可以理解为网络数据从系统内部流向系统外部。用户实现的ChannelHandler可以根据需要实现其中一个或多个接口,将其放入Pipeline中的链表队列中,ChannelPipeline会根据不同的IO事件类型来找到相应的Handler来处理,同时链表队列是责任链模式的一种变种,自上而下或自下而上所有满足事件关联的Handler都会对事件进行处理。

    上面部分主要是对比Reactor模式对Netty的线程模型进行相应的对比介绍,下面主要会结合JavaScript单线程模型多介绍一些Netty对EventLoop的实现及相应的思考。

    JavaScript单线程模型

    众所周知,JavaScript是单线程的,也就是任何时刻同时只能有一个线程堆栈在执行,那么对于下面这段代码可能有同学会疑惑这,这个是怎么执行的:

    复制代码
    console.log("A");
    setTimeout(function timeout() {
        console.log("B");
    }, 10);
    console.log("C");
    ....//biz code
    console.log("D");
    复制代码

    最初的想法是我们设置了一个定时任务,10ms之后执行,如果在biz code处的code需要执行20ms以上,那么timeout怎么能够顺利执行呢,而且单线程是如何做到既执行下面的biz code又执行timeout的呢。事实上如果biz code的部分如果执行时间大于10ms,那么timeout并不会立即准时执行的。要明白其中的原因,我们可以从一张图来理解JavaScript的单线程模型:

    首先简单理解下eventloop机制,即一个线程在执行完主线程后会不断轮询callback队列,取出就绪任务执行,每个循环称为一个tick。因为JavaScript只有一个线程执行,因此也只有一个线程堆栈,结合上面的code实例接单说明一下对应堆栈的变动:

    console.log("A")入栈执行,输出"A",console.log("A")出栈。setTimeout入栈,WebAPIs后台不断检查timeout对象的超时时间是否已经到达,如果到达则会将对于的callback也即timeout放入callback队列。接下来console.log("C")会入栈执行,输出"C",然后出栈。...最后console.log("D")会入栈执行,输出"D",然后出栈。主区域代码执行完毕线程会不断轮询callback队列来查询是否有就绪callback,如果有则取出执行,如果没有则继续轮询。而对于超时或者是我们使用ajax的callback,后台会根据IO操作或超时时间是否完毕来决定是否将callback放入callback队列,这就是EventLoop机制。Node的单线程EventLoop模型相比于JavaScript的单线程EventLoop模型类似,但是更复杂一些,整体模型可以作为参考去理解。

    Netty EventLoop

    理解完JavaScript的EventLoop机制之后我们再回过头来看看Netty EventLoop机制的具体实现。对比JavaScript单线程模型图,我画了一张Netty的单线程模型图:

    在Netty的EventLoop线程中,这个线程主要需要处理IO事件和其他两种任务,分别为定时任务和一般任务。Netty提供可一个参数ioRatio用于用户调整单线程对于IO处理时间和任务处理时间的分配的比率。这样根据实际应用场景用户可以对这个值进行调整,默认值是50,也就是这个线程会将处理IO的时间和处理任务的时间控制为1:1。

    final long ioStartTime = System.nanoTime();
    
    processSelectedKeys();//处理IO事件
    
    final long ioTime = System.nanoTime() - ioStartTime;//处理IO事件的时间
    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);//计算用于处理任务的时间

    这样尽管一个EventLoop会关联多个Channel,这些Channel在单个线程下并不会出现并发问题,同时对于异步任务的处理也一样,Netty这样设计即免去了并发问题的烦恼,有减少了多线程上下文切换带来的性能损耗,同时基于EventLoopGroup实现的有限的线程数能够充分利用CPU处理能力。

    关于IO密集型和CPU密集型的思考

    Netty基于单线程设计的EventLoop能够同时处理成千上万的客户端连接的IO事件,缺点是单线程不能够处理时间过长的任务,这样会阻塞使得IO事件的处理被阻塞,严重的时候回造成IO事件堆积,服务不能够高效响应客户端请求。所谓时间过长的任务通常是占用CPU资源比较长的任务,也即CPU密集型,对于业务应用也可能是业务代码的耗时。这点和Node是极其相似的,我可以认为这是基于单线程的EventLoop模型的通病,我们不能够将过长的任务交给这个单线程来处理,也就是不适合CPU密集型应用。那么问题怎么解决呢,参照Node的解决方案,当我们遇到需要处理时间很长的任务的时候,我们可以将它交给子线程来处理,主线程继续去EventLoop,当子线程计算完毕再讲结果交给主线程。这也是通常基于Netty的应用的解决方案,通常业务代码执行时间比较长,我们不能够把业务逻辑交给这个单线程来处理,因此我们需要额外的线程池来分配线程资源来专门处理耗时较长的业务逻辑,这是比较通用的设计方案。

     
  • 相关阅读:
    JAVA环境安装配置
    使用apache设置绑定多个域名或网站
    C#资源释放及Dispose、Close和析构方法
    【数据库】百万级数据库SQL优化大总结
    【ABP】Abp的AspNetZero5.0版本无法使用ctrl+f5调式
    【.Net】vs2017 自带发布工具 ClickOnce发布包遇到的问题
    【Python】python 反射机制在实际的应用场景讲解
    【.Net】从字符串数组中寻找数字的元素
    【.Net】输出的字符靠右对齐
    【.Net+数据库】sqlserver的四种分页方式
  • 原文地址:https://www.cnblogs.com/ffaiss/p/10833026.html
Copyright © 2011-2022 走看看