本文是基于 Netty 4.1.6.Final 的源码来分析的。
在分析源码之前做一些准备工作:
先熟悉一下 IDEA 的几个快捷键,能极大的提高我们查看源码的效率:
- Ctrl + Alt + B:用鼠标点击指定的方法,然后按下快捷键,IDEA 就会跳转到该方法的定义的地方,如果是重写的方法,则会列出该方法的所有实现;
- Ctrl + Alt + ←/→:跳转至前/后一次鼠标点击的地方,方便我们来回查看源码;
- Ctrl + F12:弹出当前类的所有方法,可以直接敲字母来过滤方法;
- Shift + F7:Debug 的时候,当一行代码中链式的调用了多个方法,按下该快捷键会弹出改行所有的方法,然后选择要进入的方法,查看源码。
1. 创建过程
- 创建 1 个 executor,后续用来创建并执行线程;
- 创建指定数量的 EventLoop;
- 为当前 EventLoop 创建 1 个 selector;
- 根据 EventLoop 的数量创建指定类型的 chooser,后续用来分配线程。
2. 代码
1 EventLoopGroup bossGroup = new NioEventLoopGroup(1); 2 EventLoopGroup workerGroup = new NioEventLoopGroup();
这两行代码创建的 EventLoopGroup 分别用来处理新连接的接入和已接入连接的事件处理。
3. 源码分析
NioEventLoopGroup 的构造方法,最终调用的是 MultithreadEventExecutorGroup 的构造方法:
1 protected MultithreadEventExecutorGroup(int nThreads, Executor executor, 2 EventExecutorChooserFactory chooserFactory, Object... args) { 3 //... 4 //1. 创建 executor 5 if (executor == null) { 6 executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); 7 } 8 // EventLoopGroup 中的 EventLoop 数组 9 children = new EventExecutor[nThreads]; 10 11 for (int i = 0; i < nThreads; i ++) { 12 boolean success = false; 13 try { 14 //2. 创建 EventLoop 15 children[i] = newChild(executor, args); 16 success = true; 17 } catch (Exception e) { 18 // TODO: Think about if this is a good exception type 19 throw new IllegalStateException("failed to create a child event loop", e); 20 } finally { 21 //... 22 } 23 } 24 //3. 创建 chooser 25 chooser = chooserFactory.newChooser(children); 26 //.. 27 }
3.1 executor 的创建
创建 executor 的构造方法中传入了 1 个 DefaultThreadFactory:
1 public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) { 2 //... poolName 的值是 EventLoopGroup 的类名,首字母小写 3 prefix = poolName + '-' + poolId.incrementAndGet() + '-'; 4 this.daemon = daemon; 5 this.priority = priority; 6 this.threadGroup = threadGroup; 7 }
ThreadPerTaskExecutor 类:
1 public final class ThreadPerTaskExecutor implements Executor { 2 private final ThreadFactory threadFactory; 3 4 public ThreadPerTaskExecutor(ThreadFactory threadFactory) { 5 if (threadFactory == null) { 6 throw new NullPointerException("threadFactory"); 7 } 8 this.threadFactory = threadFactory; 9 } 10 11 @Override 12 public void execute(Runnable command) { 13 //通过线程工厂创建并启动线程 14 threadFactory.newThread(command).start(); 15 } 16 }
DefaultThreadFactory 的 newThread(Runnable command)方法:
1 @Override 2 public Thread newThread(Runnable r) { 3 //调用了后面的方法,最终创建的是 Netty 封装的 FastThreadLocalThread 4 Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet()); 5 try {//线程相关的设置 6 if (t.isDaemon()) { 7 if (!daemon) { 8 t.setDaemon(false); 9 } 10 } else { 11 if (daemon) { 12 t.setDaemon(true); 13 } 14 } 15 16 if (t.getPriority() != priority) { 17 t.setPriority(priority); 18 } 19 } catch (Exception ignored) { 20 // Doesn't matter even if failed to set. 21 } 22 return t; 23 } 24 protected Thread newThread(Runnable r, String name) { 25 return new FastThreadLocalThread(threadGroup, r, name); 26 }
注意:这里只是分析了 executor 的创建,以及它创建线程的方法,这一阶段并没有创建和运行新线程。
3.2 EventLoop 的创建
newChild()方法将 executor 传了进去,这里以 NioEventLoop 举例,所以最终调用了 NioEventLoop 的构造方法:
1 NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, 2 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { 3 //executor 最终传递给父类 SingleThreadEventExecutor 4 super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); 5 if (selectorProvider == null) { 6 throw new NullPointerException("selectorProvider"); 7 } 8 if (strategy == null) { 9 throw new NullPointerException("selectStrategy"); 10 } 11 provider = selectorProvider; 12 //创建 selector 13 selector = openSelector(); 14 selectStrategy = strategy; 15 }
在 NioEventLoop 几个父类的构造方法中,创建了任务队列,暂时不做分析。
3.3 chooser 的创建
chooser 也是通过工厂模式创建的,参数 children 是前面创建的 EventLoop 数组,chooserFactory 会根据数组的长度是否为 2 的幂来创建 chooser。
1 @SuppressWarnings("unchecked") 2 @Override 3 public EventExecutorChooser newChooser(EventExecutor[] executors) { 4 if (isPowerOfTwo(executors.length)) { 5 return new PowerOfTowEventExecutorChooser(executors); 6 } else { 7 return new GenericEventExecutorChooser(executors); 8 } 9 } 10 11 private static boolean isPowerOfTwo(int val) { 12 //判断数组长度是否为 2 的幂 13 //有符号数的计算:以 Byte 为例 14 // 3 ---> 0000 0011 0000 0011 15 // -3 ---> 1000 0000 - 0000 0011 = 0111 1101 ===> 1111 1101 & 16 // 0000 0000 17 // 2 ---> 0000 0010 0000 0010 18 // -2 ---> 1000 0000 - 0000 0010 = 0111 1110 ===> 1111 1110 & 19 // 0000 0010 20 return (val & -val) == val; 21 } 22 23 private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser { 24 private final AtomicInteger idx = new AtomicInteger(); 25 private final EventExecutor[] executors; 26 27 PowerOfTowEventExecutorChooser(EventExecutor[] executors) { 28 this.executors = executors; 29 } 30 31 @Override 32 public EventExecutor next() { 33 //因为 length 是 2 的幂,减去 1,退一位,二进制就全是 1 34 //比如 8 是 1000,减 1 是 0111,将 idx 自增后和前面的值相与 35 //相当于是循环取值 36 return executors[idx.getAndIncrement() & executors.length - 1]; 37 } 38 } 39 40 private static final class GenericEventExecutorChooser implements EventExecutorChooser { 41 private final AtomicInteger idx = new AtomicInteger(); 42 private final EventExecutor[] executors; 43 44 GenericEventExecutorChooser(EventExecutor[] executors) { 45 this.executors = executors; 46 } 47 48 @Override 49 public EventExecutor next() { 50 //普通的就是直接取模的绝对值 51 return executors[Math.abs(idx.getAndIncrement() % executors.length)]; 52 } 53 }
至此,EventLoopGroup 就创建完成了,boosGroup 和 wrokerGroup 的创建是一样的。