zoukankan      html  css  js  c++  java
  • Netty 中 EventLoopGroup 的创建

      本文是基于 Netty 4.1.6.Final 的源码来分析的。

      在分析源码之前做一些准备工作:

      先熟悉一下 IDEA 的几个快捷键,能极大的提高我们查看源码的效率:

    1. Ctrl + Alt + B:用鼠标点击指定的方法,然后按下快捷键,IDEA 就会跳转到该方法的定义的地方,如果是重写的方法,则会列出该方法的所有实现;
    2. Ctrl + Alt + ←/→:跳转至前/后一次鼠标点击的地方,方便我们来回查看源码;
    3. Ctrl + F12:弹出当前类的所有方法,可以直接敲字母来过滤方法;
    4. Shift + F7:Debug 的时候,当一行代码中链式的调用了多个方法,按下该快捷键会弹出改行所有的方法,然后选择要进入的方法,查看源码。

    1. 创建过程

    1. 创建 1 个 executor,后续用来创建并执行线程;
    2. 创建指定数量的 EventLoop;
      1. 为当前 EventLoop 创建 1 个 selector;
    3. 根据 EventLoop 的数量创建指定类型的 chooser,后续用来分配线程。

    2. 代码

    1  EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    2  EventLoopGroup workerGroup = new NioEventLoopGroup();

      这两行代码创建的 EventLoopGroup 分别用来处理新连接的接入和已接入连接的事件处理。

    3. 源码分析

       NioEventLoopGroup 的构造方法,最终调用的是 MultithreadEventExecutorGroup 的构造方法:

     1 protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
     2                                         EventExecutorChooserFactory chooserFactory, Object... args) {
     3     //...
     4     //1. 创建 executor
     5     if (executor == null) {
     6         executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
     7     }
     8     // EventLoopGroup 中的 EventLoop 数组
     9     children = new EventExecutor[nThreads];
    10 
    11     for (int i = 0; i < nThreads; i ++) {
    12         boolean success = false;
    13         try {
    14             //2. 创建 EventLoop
    15             children[i] = newChild(executor, args);
    16             success = true;
    17         } catch (Exception e) {
    18             // TODO: Think about if this is a good exception type
    19             throw new IllegalStateException("failed to create a child event loop", e);
    20         } finally {
    21             //...
    22         }
    23     }
    24     //3. 创建 chooser
    25     chooser = chooserFactory.newChooser(children);
    26     //..
    27 }

    3.1 executor 的创建

      创建 executor 的构造方法中传入了 1 个 DefaultThreadFactory:

    1 public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
    2     //... poolName 的值是 EventLoopGroup 的类名,首字母小写
    3     prefix = poolName + '-' + poolId.incrementAndGet() + '-';
    4     this.daemon = daemon;
    5     this.priority = priority;
    6     this.threadGroup = threadGroup;
    7 }

      ThreadPerTaskExecutor  类:

     1 public final class ThreadPerTaskExecutor implements Executor {
     2     private final ThreadFactory threadFactory;
     3 
     4     public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
     5         if (threadFactory == null) {
     6             throw new NullPointerException("threadFactory");
     7         }
     8         this.threadFactory = threadFactory;
     9     }
    10 
    11     @Override
    12     public void execute(Runnable command) {
    13         //通过线程工厂创建并启动线程
    14         threadFactory.newThread(command).start();
    15     }
    16 }

      DefaultThreadFactory 的 newThread(Runnable command)方法:

     1 @Override
     2 public Thread newThread(Runnable r) {
     3     //调用了后面的方法,最终创建的是 Netty 封装的 FastThreadLocalThread
     4     Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());
     5     try {//线程相关的设置
     6         if (t.isDaemon()) {
     7             if (!daemon) {
     8                 t.setDaemon(false);
     9             }
    10         } else {
    11             if (daemon) {
    12                 t.setDaemon(true);
    13             }
    14         }
    15 
    16         if (t.getPriority() != priority) {
    17             t.setPriority(priority);
    18         }
    19     } catch (Exception ignored) {
    20         // Doesn't matter even if failed to set.
    21     }
    22     return t;
    23 }
    24 protected Thread newThread(Runnable r, String name) {
    25     return new FastThreadLocalThread(threadGroup, r, name);
    26 }

      注意:这里只是分析了 executor 的创建,以及它创建线程的方法,这一阶段并没有创建和运行新线程。

    3.2 EventLoop 的创建

      newChild()方法将 executor 传了进去,这里以 NioEventLoop 举例,所以最终调用了 NioEventLoop 的构造方法:

     1 NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
     2              SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
     3     //executor 最终传递给父类 SingleThreadEventExecutor
     4     super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
     5     if (selectorProvider == null) {
     6         throw new NullPointerException("selectorProvider");
     7     }
     8     if (strategy == null) {
     9         throw new NullPointerException("selectStrategy");
    10     }
    11     provider = selectorProvider;
    12     //创建 selector
    13     selector = openSelector();
    14     selectStrategy = strategy;
    15 }

      在 NioEventLoop 几个父类的构造方法中,创建了任务队列,暂时不做分析。

    3.3 chooser 的创建

      chooser 也是通过工厂模式创建的,参数 children 是前面创建的 EventLoop 数组,chooserFactory 会根据数组的长度是否为 2 的幂来创建 chooser。

     1 @SuppressWarnings("unchecked")
     2 @Override
     3 public EventExecutorChooser newChooser(EventExecutor[] executors) {
     4     if (isPowerOfTwo(executors.length)) {
     5         return new PowerOfTowEventExecutorChooser(executors);
     6     } else {
     7         return new GenericEventExecutorChooser(executors);
     8     }
     9 }
    10 
    11 private static boolean isPowerOfTwo(int val) {
    12     //判断数组长度是否为 2 的幂
    13     //有符号数的计算:以 Byte 为例
    14     // 3  ---> 0000 0011                              0000 0011
    15     // -3 ---> 1000 0000 - 0000 0011 = 0111 1101 ===> 1111 1101  &
    16     //                                                0000 0000 
    17     // 2  ---> 0000 0010                              0000 0010
    18     // -2 ---> 1000 0000 - 0000 0010 = 0111 1110 ===> 1111 1110  &
    19     //                                                0000 0010
    20     return (val & -val) == val;
    21 }
    22 
    23 private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
    24     private final AtomicInteger idx = new AtomicInteger();
    25     private final EventExecutor[] executors;
    26 
    27     PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
    28         this.executors = executors;
    29     }
    30 
    31     @Override
    32     public EventExecutor next() {
    33         //因为 length 是 2 的幂,减去 1,退一位,二进制就全是 1
    34         //比如 8 是 1000,减 1 是 0111,将 idx 自增后和前面的值相与
    35         //相当于是循环取值
    36         return executors[idx.getAndIncrement() & executors.length - 1];
    37     }
    38 }
    39 
    40 private static final class GenericEventExecutorChooser implements EventExecutorChooser {
    41     private final AtomicInteger idx = new AtomicInteger();
    42     private final EventExecutor[] executors;
    43 
    44     GenericEventExecutorChooser(EventExecutor[] executors) {
    45         this.executors = executors;
    46     }
    47 
    48     @Override
    49     public EventExecutor next() {
    50         //普通的就是直接取模的绝对值
    51         return executors[Math.abs(idx.getAndIncrement() % executors.length)];
    52     }
    53 }

       至此,EventLoopGroup 就创建完成了,boosGroup 和 wrokerGroup 的创建是一样的。

  • 相关阅读:
    pytest文档29-allure-pytest(最新最全,保证能搞成功!)
    使用 JMeter 进行压力测试
    web自动化针对PO模式进行二次封装之basepage
    关于面试总结-http协议相关面试题 -----转载
    移动APP测试基础分享
    基于python+requests+unittest框架接口自动化测试设计开发
    jmeter断言接口响应字段大小
    csv文件转换为xlsx文件
    钉钉机器人发群消息笔记
    docker学习笔记
  • 原文地址:https://www.cnblogs.com/magexi/p/10222644.html
Copyright © 2011-2022 走看看