zoukankan      html  css  js  c++  java
  • 4.不完全的 NioEventLoopGroup & NioEventLoop 源码分析

    创建服务端时需要先实例化两个 NioEventLoopGroup 分别为 bossGroupworkerGroup.

    • bossGroup: 只是处理连接请求.
    • workerGroup: 处理客户端业务逻辑.

    上图是该类的继承关系, 我个人觉得没必要去了解完全这些类的作用, 只需要记住几个重点类就可以了.

    EventExecutorGroup

    该接口虽然继承了 java.util.concurrent.ScheduledExecutorService, 但是只表示它支持定时任务.

    同时标记了 ExecutorService 中的 shutdown()shutdownNow() 方法为过时方法, 并提供了 shutdownGracefully() 方法代替它们.

    还有就是最重要的两个方法 next()iterator(), 来管理 EventExecutor:

    • next(): 返回这个 EventExecutorGroup 管理的一个 EventExecutor.
    • iterator(): 返回这个 EventExecutorGroup 管理的所有 EventExecutor 的集合.

    最后 submitschedule 方法的返回值改为了, io.netty.util.concurrent.Futureio.netty.util.concurrent.ScheduledFuture.

    MultithreadEventExecutorGroup

    可以把该类当做是 EventExecutorGroup 的最终实现, 虽然它是一个抽象类. 可以把它看做一个线程池.

    主要实现了一下两个方面的功能:

    • EventExecutor 管理(创建/结束): EventExecutor 的数据是固定的, 由传入的参数决定.
    • 任务派发策略: 创建 EventExecutor 选择器, next 方法使选择器选中一个 Executor.

    EventExecutor 选择器是通过 DefaultEventExecutorChooserFactory 对象的 newChooser 方法, 根据 EventExecutor 的数量创建不同的选择器实现 PowerOfTwoEventExecutorChooserGenericEventExecutorChooser.

    PowerOfTwoEventExecutorChooser: executors[idx.getAndIncrement() & executors.length - 1];
    
    GenericEventExecutorChooser: executors[Math.abs(idx.getAndIncrement() % executors.length)];
    

    NioEventLoop

    SingleThreadEventExecutor

    该抽象类中保存了 private volatile Thread thread;, 用来执行一些特定任务. 也就是说一个 NioEventLoop 会和一个线程绑定.

    SingleThreadEventLoop

    主要实现了 ChannelChannelPromise 的注册.

    NioEventLoop

    只要实现了获取和调用 Selector.

    NioEventLoopGroup 实例化过程

        protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
            if (nThreads <= 0) {
                throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
            }
    
            // 1. 如果没有指定 executor, 就创建 ThreadPerTaskExecutor.
            // ThreadPerTaskExecutor 实例, 主要就是用来创建线程的.
            if (executor == null) {
                executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
            }
    
            // 2. 根据指定的线程数量, 创建 EventExecutor 数组, 并实例化每个元素.
            children = new EventExecutor[nThreads];
    
            for (int i = 0; i < nThreads; i ++) {
                boolean success = false;
                try {
                    children[i] = newChild(executor, args);
                    success = true;
                } catch (Exception e) {
                    throw new IllegalStateException("failed to create a child event loop", e);
                } finally {
                    // 如果构造失败, 就清理资源
                }
            }
    
            // 3. 创建 EventLoop 选择器.
            chooser = chooserFactory.newChooser(children);
    
            // 4. 为每个 EventLoop 线程添加 线程终止监听器
            final FutureListener<Object> terminationListener = new FutureListener<Object>() {
                @Override
                public void operationComplete(Future<Object> future) throws Exception {
                    if (terminatedChildren.incrementAndGet() == children.length) {
                        terminationFuture.setSuccess(null);
                    }
                }
            };
    
            for (EventExecutor e: children) {
                e.terminationFuture().addListener(terminationListener);
            }
    
            // 5. 将第二步创建的数组 添加到对应的 set 集合中去重, 表示只可读.
            Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
            Collections.addAll(childrenSet, children);
            readonlyChildren = Collections.unmodifiableSet(childrenSet);
        }
    

    可以通过 NioEventLoopGroup 的有参构造, 来指定线程数量. 如果没有指定线程数量或指定线程数量为 0 则线程数量会设置为 CPU核心数 * 2.

    NioEventLoop 实例化过程

    它是在创建完 EventExecutor 数组后, 调用 newChild 方法来进行实例化, newChild 方法在该类中实现.

    1. 如果没有创建任务队列, 就会先创建任务队列.
    2. 利用 JDK 提供的 SelectorProvider 直接创建一个 Selector.
    3. 强制将 Selector 中的 selectedKeys 和 publicSelectedKeys 替换为优化版的 SelectedSelectionKeySet 对象.
    4. 最后将这个 Selector 包装为 SelectedSelectionKeySetSelector.
      SelectedSelectionKeySet 继承 java.util.AbstractSet.

    通常来说, NioEventLoop 肩负着两种任务:

    1. 作为 IO 线程, 执行与 Channel 相关的 IO 操作, 包括 调用 select 等待就绪的 IO 事件、读写数据与数据的处理等;
    2. 作为任务队列, 执行 taskQueue 中的任务, 例如用户调用 eventLoop.schedule 提交的定时任务也是这个线程执行的.

    讨论

    1.为什么要替换为 SelectedSelectionKeySet
    因为默认的域是 Set 类型, 插入元素的开销是o(log n), 而优化版的 SelectedSelectionKeySet 继承了 AbstractSet, 具有 Set 的功能, 但是内部是用数组实现, 只具有 add 功能, 而且其开销为o(1).

    参考资料

    EventExecutorGroup
    Netty源码分析之服务启动
    Netty: DefaultPromise源码解读
    认真的 Netty 源码解析(一)
    Netty系列(一):NioEventLoopGroup源码解析
    netty源码分析--EventLoopGroup与EventLoop 分析netty的线程模型
    Netty源码学习(三)NioEventLoop

  • 相关阅读:
    【Mysql+shell】查询结果导出到文件,文件数据导入到数据库
    【mysql】IP地址整数int和varchar的转换
    【JVM】Class结构之常量池
    【Java】Java初始化过程总结
    【转】探索 ConcurrentHashMap 高并发性的实现机制
    【并发编程】使用BlockingQueue实现<多生产者,多消费者>
    【FTP】FTP文件上传下载-支持断点续传
    【SFTP】使用Jsch实现Sftp文件下载-支持断点续传和进程监控
    【大数据】基本概念--01(转)
    【Java】Java环境变量配置
  • 原文地址:https://www.cnblogs.com/scikstack/p/13524002.html
Copyright © 2011-2022 走看看