zoukankan      html  css  js  c++  java
  • Netty源码分析之NioEventLoop(一)—NioEventLoop的创建

    一、NioEventLoop的概述

    NioEventLoop做为Netty线程模型的核心部分,从本质上讲是一个事件循环执行器,每个NioEventLoop都会绑定一个对应的线程通过一个for(;;)循环来处理与 Channel 相关的 IO 操作, 包括 调用 select 等待就绪的 IO 事件、读写数据与数据的处理等;其次作为任务队列, 执行 taskQueue 中的任务, 例如eventLoop.schedule 提交的定时任务也是这个线程执行的。而NioEventLoopGroup顾名思义,它是维护了一组这样的事件循环器,这也是Netty基于Reactor模式的具体设计体现。

    接下来我们就结合具体的代码,对NioEventLoop的整个创建流程进行一个说明与总结

    二、NioEventLoop的创建

    我们基于Netty构建服务端还是客户端时,都首先需要创建NioEventLoopGroup 实例

            // Configure the server.
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();

     NioEventLoopGroup 做为基于NIO的处理channle相关IO操作的事件循环器组,它的类层次结构如下

    通过NioEventLoopGroup构造函数传入线程数量

        /**
         * Create a new instance using the specified number of threads, {@link ThreadFactory} and the
         * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
         */
        public NioEventLoopGroup(int nThreads) {
            this(nThreads, (Executor) null);
        }

     NioEventLoopGroup最终的构造函数中会包含以下几个函数

    1、nThreads:传入的线程数量

    2、executor :线程执行器Executor接口,默认为空

    3、selectorProvider:用于创建Selector的SelectorProvider 

    4、selectStrategyFactory:传入DefaultSelectStrategyFactory.INSTANCE,  一个使用默认选择策略的工厂。

    5、RejectedExecutionHandlers.reject():Netty自定义线程拒绝策略

    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                                 final SelectStrategyFactory selectStrategyFactory) {
            super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
        }

     在父类MultithreadEventLoopGroup中,会根据你传入nThreads大小,确定初始化的线程数量,为0且没有设置io.netty.eventLoopThreads参数项,则会以当前系统的核心线程数*2做为默认的线程数量

        static {
            //如果没有设置io.netty.eventLoopThreads参数项,则会以当前运行系统的核心线程数*2作为线程数
            DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                    "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    
            if (logger.isDebugEnabled()) {
                logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
            }
        }
    
        /**
         * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
         */
        protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
            super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
        }

    接下来在MultithreadEventExecutorGroup的构造函数中我们会根据传入的线程数,去初始化和创建一组NioEventLoop

    首先我们看下NioEventLoop的类层次结构

    下面在MultithreadEventExecutorGroup构造函数中主要完成以下几个功能:

    1、初始化ThreadPerTaskExecutor线程执行器,并传入一个线程创建工厂,用于NioEventLoop对应线程的创建

    2、根据传入的线程数,初始化一个EventExecutor数组,用于放置创建的NioEventLoop对象

    3、循环数组,通过newChild方法创建NioEventLoop对象。

        /**  
         * Create a new instance.
         *
         * @param nThreads          the number of threads that will be used by this instance.
         * @param executor          the Executor to use, or {@code null} if the default should be used.
         * @param chooserFactory    the {@link EventExecutorChooserFactory} to use.
         * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
         */
        protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
            if (nThreads <= 0) {
                throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
            }
    
            if (executor == null) {
                // 创建线程工厂,netty根据需要指定了线程的命名方式、优先级、是否是守护线程等属性
                // 该线程池没有任何队列,提交任务后,创建任何线程类型都是 FastThreadLocalRunnable, 并且立即start。
                executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
            }
            //初始化一组事件循环执行器
            children = new EventExecutor[nThreads];
    
            //根据传入的线程数,初始化一个线程数组
            for (int i = 0; i < nThreads; i ++) {
                boolean success = false;
                try {
                    // 创建 new NioEventLoop
                    children[i] = newChild(executor, args);
                    success = true;
                } catch (Exception e) {
                    // TODO: Think about if this is a good exception type
                    throw new IllegalStateException("failed to create a child event loop", e);
                } finally {
                    if (!success) {
                        for (int j = 0; j < i; j ++) {
                            children[j].shutdownGracefully();
                        }
    
                        for (int j = 0; j < i; j ++) {
                            EventExecutor e = children[j];
                            try {
                                while (!e.isTerminated()) {
                                    e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                                }
                            } catch (InterruptedException interrupted) {
                                // Let the caller handle the interruption.
                                Thread.currentThread().interrupt();
                                break;
                            }
                        }
                    }
                }
            }

     继续跟踪进入newChild(executor, args)内部,看到它会返回一个NioEventLoop对象

        @Override
        protected EventLoop newChild(Executor executor, Object... args) throws Exception {
            return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
        }

    继续查看NioEventLoop构造函数和他的父类构造函数

        NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
            super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
            if (selectorProvider == null) {
                throw new NullPointerException("selectorProvider");
            }
            if (strategy == null) {
                throw new NullPointerException("selectStrategy");
            }
            provider = selectorProvider;
            final SelectorTuple selectorTuple = openSelector();
            selector = selectorTuple.selector;
            unwrappedSelector = selectorTuple.unwrappedSelector;
            selectStrategy = strategy;
        }

    父类构造函数

        /**
         * Create a new instance
         *
         * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
         * @param executor          the {@link Executor} which will be used for executing
         * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
         *                          executor thread
         * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
         * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
         */
        protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                            boolean addTaskWakesUp, int maxPendingTasks,
                                            RejectedExecutionHandler rejectedHandler) {
            super(parent);
            this.addTaskWakesUp = addTaskWakesUp;
            this.maxPendingTasks = Math.max(16, maxPendingTasks);
            this.executor = ObjectUtil.checkNotNull(executor, "executor");
            taskQueue = newTaskQueue(this.maxPendingTasks);
            rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
        }

    通过上面的代码我们可以看到,初始化NioEventLoop主要完成了以下的功能

    1、保存线程执行器ThreadPerTaskExecutor

    2、创建一个selector 

    3、基于LinkedBlockingQueue创建一个taskQueue任务队列,用于保存要执行的任务

    这些都是为了后续的循环执行Channel 相关事件所做准备。

    到这里其实我们创建了一组NioEventLoop,也就是一组事件循环执行器,每个NioEventLoop中都有对应的一个线程和一个selector ;创建完毕之后,自然就是要为每一个连接分配对应的NioEventLoop。Netty中通过

    实现EventLoopGroup接口中的next()方法来返回一个可以使用的的NioEventLoop

    public interface EventLoopGroup extends EventExecutorGroup {
        /**
         * Return the next {@link EventLoop} to use
         */
        @Override
        EventLoop next();
    }

    在MultithreadEventExecutorGroup中我们可以查看它的具体实现方式

       chooser = chooserFactory.newChooser(children);
        
       @Override
        public EventExecutor next() {
            return chooser.next();
        }

    进入代码内部我们可以看到Netty针对数组大小,对数组下标的计算方式进行了优化

    /**
     * Default implementation which uses simple round-robin to choose next {@link EventExecutor}.
     */
    @UnstableApi
    public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
    
        public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
    
        private DefaultEventExecutorChooserFactory() { }
    
        @SuppressWarnings("unchecked")
        @Override
        public EventExecutorChooser newChooser(EventExecutor[] executors) {
            //判断是否是二的次幂,如果为true返回PowerOfTwoEventExecutorChooser,反之GenericEventExecutorChooser
            if (isPowerOfTwo(executors.length)) {
                return new PowerOfTwoEventExecutorChooser(executors);
            } else {
                return new GenericEventExecutorChooser(executors);
            }
        }
    
        private static boolean isPowerOfTwo(int val) {
            return (val & -val) == val;
        }
    
        private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
            private final AtomicInteger idx = new AtomicInteger();
            private final EventExecutor[] executors;
    
            PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
                this.executors = executors;
            }
    
            //通过&运算的方式循环获取数组下标
            @Override
            public EventExecutor next() {
                return executors[idx.getAndIncrement() & executors.length - 1];
            }
        }
    
        private static final class GenericEventExecutorChooser implements EventExecutorChooser {
            private final AtomicInteger idx = new AtomicInteger();
            private final EventExecutor[] executors;
    
            GenericEventExecutorChooser(EventExecutor[] executors) {
                this.executors = executors;
            }
    
            //通过取模的方式循环获取数组下标
            @Override
            public EventExecutor next() {
                return executors[Math.abs(idx.getAndIncrement() % executors.length)];
            }
        }
    }

    到此我们基本把Netty中NioEventLoop及NioEventLoopGroup的创建流程及核心代码梳理了一遍。NioEventLoop做为Netty线程模型的核心部分包含的内容比较多,上面只是初始化及创建的一部分内容,后续的部分我会陆续的补齐,其中有错误和不足之处还请指正与海涵。

    关注微信公众号,查看更多技术文章。

  • 相关阅读:
    Redis21:客户端与服务器端的通信与redis管道
    Redis20:keys、scan、bigkeys、查看key的存储方式
    Redis19:限流
    Redis18:分布式锁
    Redis17:cluster集群
    Redis16:两种redis集群解决方案:codis和cluster
    Android : 获取声卡信息的测试代码
    Android : 基于alsa库的音乐播放
    Android system :灯光系统_HAL_lights
    Android system :led_class驱动
  • 原文地址:https://www.cnblogs.com/dafanjoy/p/10486019.html
Copyright © 2011-2022 走看看