zoukankan      html  css  js  c++  java
  • 2.NioEventLoop的创建

    NioEventLoop的创建

    NioEventLoop是netty及其重要的组成部件,它的首要职责就是为注册在它上的channels服务,发现这些channels上发生的新连接、读写等I/O事件,然后将事件转交 channel 流水线处理。使用netty时,我们首先要做的就是创建NioEventLoopGroup,这是一组NioEventLoop的集合,类似线程与线程池。通常,服务端会创建2个group,一个叫做bossGroup,一个叫做workerGroup。bossGroup负责监听绑定的端口,接受请求并创建新连接,初始化后交由workerGroup处理后续IO事件。

    NioEventLoop和NioEventLoopGroup的类图

    首先看看NioEventLoop和NioEventLoopGroup的类关系图

    NioEventLoop
    NioEventLoopGroup
    类多但不乱,可以发现三个特点:

    1. 两者都继承了ExecutorService,从而与线程池建立了联系
    2. NioEventLoop继承的都是SingleThread,NioEventLoop继承的是MultiThread
    3. NioEventLoop还继承了AbstractScheduledEventExecutor,不难猜出这是个和定时任务调度有关的线程池

    NioEventLoopGroup的创建

    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    

    我们先看看bossGroup和workerGroup的构造方法。

    public NioEventLoopGroup() {
        this(0);
    }
    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }
    除此之外,还有多达8种构造方法,这些构造方法可以指定5种参数:
    1、最大线程数量。如果指定为0,那么Netty会将线程数量设置为CPU逻辑处理器数量的2倍
    2、线程工厂。要求线程工厂类必须实现java.util.concurrent.ThreadFactory接口。如果没有指定线程工厂,那么默认DefaultThreadFactory。
    3、SelectorProvider。如果没有指定SelectorProvider,那么默认的SelectorProvider为SelectorProvider.provider()。
    4、SelectStrategyFactory。如果没有指定则默认为DefaultSelectStrategyFactory.INSTANCE
    5、RejectedExecutionHandler。拒绝策略处理类,如果这个EventLoopGroup已被关闭,那么之后提交的Runnable任务会默认调用RejectedExecutionHandler的reject方法进行处理。如果没有指定,则默认调用拒绝策略。
    

    最终,NioEventLoopGroup会重载到父类MultiThreadEventExecutorGroup的构造方法上,这里省略了一些健壮性代码。

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {
        // 步骤1
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
        
        // 步骤2
        children = new EventExecutor[nThreads];
        for (int i = 0; i < nThreads; i ++) {
            children[i] = newChild(executor, args);
        }    
    
        // 步骤3
        chooser = chooserFactory.newChooser(children);
    
        // 步骤4
        final FutureListener<Object> terminationListener = future -> {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        };
        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }
    
        // 步骤5
        Set<EventExecutor> childrenSet = new LinkedHashSet<>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
        }
    

    这里可以分解为5个步骤,下面一步步讲解

    步骤1

    第一个步骤是创建线程池executor。从workerGroup构造方法可知,默认传进来的executor为null,所以首先创建executor。newDefaultThreadFactory的作用是设置线程的前缀名和线程优先级,默认情况下,前缀名是nioEventLoopGroup-x-y这样的命名规则,而线程优先级则是5,处于中间位置。
    创建完newDefaultThreadFactory后,进入到ThreadPerTaskExecutor。它直接实现了juc包的线程池顶级接口,从构造方法可以看到它只是简单的把factory赋值给自己的成员变量。而它实现的接口方法调用了threadFactory的newThread方法。从名字可以看出,它构造了一个thread,并立即启动thread。

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
    }
    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }    
    

    那么我们回过头来看下DefaultThreadFactory的newThread方法,发现他创建了一个FastThreadLocalThread。这是netty自定义的一个线程类,顾名思义,netty认为它的性能更快。关于它的解析留待以后。这里步骤1创建线程池就完成了。总的来说他与我们通常使用的线程池不太一样,不设置线程池的线程数和任务队列,而是来一个任务启动一个线程。(问题:那任务一多岂不是直接线程爆炸?)

    @Override
    public Thread newThread(Runnable r) {
        Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());        
        return t;
    }
    protected Thread newThread(Runnable r, String name) {
        return new FastThreadLocalThread(threadGroup, r, name);
    }
    

    步骤2

    步骤2是创建workerGroup中的NioEventLoop。在示例代码中,传进来的线程数是0,显然不可能真的只创建0个nioEventLoop线程。在调用父类MultithreadEventLoopGroup构造函数时,对线程数进行了判断,若为0,则传入默认线程数,该值默认为2倍CPU核心数

    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
    // 静态代码块初始化DEFAULT_EVENT_LOOP_THREADS
    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads",     NettyRuntime.availableProcessors() * 2));
    }    
    

    接下来是通过newChild方法为每一个EventExecutor创建一个对应的NioEventLoop。这个方法传入了一些args到NioEventLoop中,前三个是在NioEventLoopGroup创建时传过来的参数。默认值见上文

    1. SlectorProvider.provider, 用于创建 Java NIO Selector 对象;
    2. SelectStrategyFactory, 选择策略工厂;
    3. RejectedExecutionHandlers, 拒绝执行处理器;
    4. EventLoopTaskQueueFactory,任务队列工厂,默认为null;

    进入NioEventLoop的构造函数,如下:

    NioEventLoop构造函数
    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                     EventLoopTaskQueueFactory queueFactory) {
            super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
                    rejectedExecutionHandler);
            if (selectorProvider == null) {
                throw new NullPointerException("selectorProvider");
            }
            if (strategy == null) {
                throw new NullPointerException("selectStrategy");
            }
            provider = selectorProvider;
            final SelectorTuple selectorTuple = openSelector();
            selector = selectorTuple.selector;
            unwrappedSelector = selectorTuple.unwrappedSelector;
            selectStrategy = strategy;
        }
    // 父类构造函数    
    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                            boolean addTaskWakesUp, Queue<Runnable> taskQueue,
                                            RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
        this.executor = ThreadExecutorMap.apply(executor, this);
        this.taskQueue = ObjectUtil.checkNotNull(taskQueue, taskQueue");
        rejectedExecutionHandler = ObjectUtil.checkNotNullrejectedHandler, "rejectedHandler");
    }    
    

    首先调用一个newTaskQueue方法创建一个任务队列。这是一个mpsc即多生产者单消费者的无锁队列。之后调用父类的构造函数,在父类的构造函数中,将NioEventLoopGroup设置为自己的parent,并通过匿名内部类创建了这样一个Executor————通过ThreadPerTaskExecutor执行传进来的任务,并且在执行时将当前线程与NioEventLoop绑定。其他属性也一一设置。
    在nioEventLoop构造函数中,我们发现创建了一个selector,不妨看一看netty对它的包装。

    unwrappedSelector = provider.openSelector();
    if (DISABLE_KEY_SET_OPTIMIZATION) {
        return new SelectorTuple(unwrappedSelector);
    }
    

    首先看到netty定义了一个常量DISABLE_KEY_SET_OPTIMIZATION,如果这个常量设置为true,也即不对keyset进行优化,则直接返回未包装的selector。那么netty对selector进行了哪些优化?

    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
    
    final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
    
        SelectionKey[] keys;
        int size;
    
        SelectedSelectionKeySet() {
            keys = new SelectionKey[1024];
        }
    } 
    

    往下,我们看到了一个叫做selectedSelectionKeySet的类,点进去可以看到,它继承了AbstractSet,然而它的成员变量却让我们想到了ArrayList,再看看它定义的方法,除了不支持remove和contains外,活脱脱一个简化版的ArrayList,甚至也支持扩容。
    没错,netty确实通过反射的方式,将selectionKey从Set替换为了ArrayList。仔细一想,却又觉得此番做法有些道理。众所周知,虽然HashSet和ArrayList随机查找的时间复杂度都是o(1),但相比数组直接通过偏移量定位,HashSet由于需要Hash运算,时间消耗上又稍稍逊色了些。再加上使用场景上,都是获取selectionKey集合然后遍历,Set去重的特性完全用不上,也无怪乎追求性能的netty想要替换它了。

    步骤3

    创建完workerGroup的NioEventLoop后,如何挑选一个nioEventLoop进行工作是netty接下来想要做的事。一般来说轮询是一个很容易想到的方案,为此需要创建一个类似负载均衡作用的线程选择器。当然追求性能到丧心病狂的netty是不会轻易满足的。我们看看netty在这样常见的方案里又做了哪些操作。

    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }
    // PowerOfTwo
    public EventExecutor next() {
        return executors[idx.getAndIncrement() & executors.length - 1];
    }
    // Generic
    public EventExecutor next() {
        return executors[Math.abs(idx.getAndIncrement() % executors.length)];
    }
    

    可以看到,netty根据workerGroup内线程的数量采取了2种不同的线程选择器,当线程数x是2的幂次方时,可以通过&(x-1)来达到对x取模的效果,其他情况则需要直接取模。这与hashmap强制设置容量为2的幂次方有异曲同工之妙。

    步骤4

    步骤4就是添加一些保证健壮性而添加的监听器了,这些监听器会在EventLoop被关闭后得到通知。

    步骤5

    创建一个只读的NioEventLoop线程组

    到此NioEventLoopGroup及其包含的NioEventLoop组就创建完成了

  • 相关阅读:
    golang实现并发爬虫一(单任务版本爬虫功能)
    golang实现rabbitmq的五种模式
    KNN笔记
    Observer Pattern
    关于data-属性
    Python中的装饰器
    Python中的高阶函数与匿名函数
    Python中的列表生成式和多层表达式
    Thymeleaf读取国际化文本时出现??xxxxxx_zh_CN??问题
    Java8新特性(1):Lambda表达式
  • 原文地址:https://www.cnblogs.com/spiritsx/p/11900541.html
Copyright © 2011-2022 走看看