向bossGroup里注册通道
流程图
书接上文
ServerBootstrap.bind(hostname, port)
1 final ChannelFuture initAndRegister() { 2 Channel channel = null; 3 try { 4 channel = channelFactory.newChannel(); 5 init(channel); 6 } catch (Throwable t) { 7 if (channel != null) { 8 // channel can be null if newChannel crashed (eg SocketException("too many open files")) 9 channel.unsafe().closeForcibly(); 10 // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor 11 return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); 12 } 13 // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor 14 return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); 15 } 16 17 ChannelFuture regFuture = config().group().register(channel); 18 if (regFuture.cause() != null) { 19 if (channel.isRegistered()) { 20 channel.close(); 21 } else { 22 channel.unsafe().closeForcibly(); 23 } 24 } 25 26 return regFuture; 27 }
上篇文章说了init方法,接下去就是注册通道
config().group().register(channel)方法
首先调用的是MultithreadEventLoopGroup.register(Channel channel)方法
1 public ChannelFuture register(Channel channel) { 2 return next().register(channel); 3 } 4 5 public EventLoop next() { 6 return (EventLoop) super.next(); 7 }
调用MultithreadEventExecutorGroup的next()方法
1 public EventExecutor next() { 2 return chooser.next(); 3 }
chooser选择器。在MultithreadEventExecutorGroup构造方法中实例化。
1 protected MultithreadEventExecutorGroup(int nThreads, Executor executor, 2 EventExecutorChooserFactory chooserFactory, Object... args) { 3 if (nThreads <= 0) { 4 throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); 5 } 6 7 if (executor == null) { 8 executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); 9 } 10 11 children = new EventExecutor[nThreads]; 12 13 for (int i = 0; i < nThreads; i ++) { 14 boolean success = false; 15 try { 16 children[i] = newChild(executor, args); 17 success = true; 18 } catch (Exception e) { 19 // TODO: Think about if this is a good exception type 20 throw new IllegalStateException("failed to create a child event loop", e); 21 } finally { 22 if (!success) { 23 for (int j = 0; j < i; j ++) { 24 children[j].shutdownGracefully(); 25 } 26 27 for (int j = 0; j < i; j ++) { 28 EventExecutor e = children[j]; 29 try { 30 while (!e.isTerminated()) { 31 e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); 32 } 33 } catch (InterruptedException interrupted) { 34 // Let the caller handle the interruption. 35 Thread.currentThread().interrupt(); 36 break; 37 } 38 } 39 } 40 } 41 } 42 43 chooser = chooserFactory.newChooser(children);//实例化选择器 44 45 final FutureListener<Object> terminationListener = new FutureListener<Object>() { 46 @Override 47 public void operationComplete(Future<Object> future) throws Exception { 48 if (terminatedChildren.incrementAndGet() == children.length) { 49 terminationFuture.setSuccess(null); 50 } 51 } 52 }; 53 54 for (EventExecutor e: children) { 55 e.terminationFuture().addListener(terminationListener); 56 } 57 58 Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); 59 Collections.addAll(childrenSet, children); 60 readonlyChildren = Collections.unmodifiableSet(childrenSet); 61 }
chooserFactory.newChooser(children);
其实就前面讲过的选择器工厂DefaultEventExecutorChooserFactory,如何选择一个执行器。
1 public EventExecutorChooser newChooser(EventExecutor[] executors) { 2 if (isPowerOfTwo(executors.length)) { 3 return new PowerOfTwoEventExecutorChooser(executors); 4 } else { 5 return new GenericEventExecutorChooser(executors); 6 } 7 }
chooser.next();
1 private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { 2 private final AtomicInteger idx = new AtomicInteger(); 3 private final EventExecutor[] executors; 4 5 PowerOfTwoEventExecutorChooser(EventExecutor[] executors) { 6 this.executors = executors; 7 } 8 9 @Override 10 public EventExecutor next() { 11 return executors[idx.getAndIncrement() & executors.length - 1]; 12 } 13 } 14 15 private static final class GenericEventExecutorChooser implements EventExecutorChooser { 16 // Use a 'long' counter to avoid non-round-robin behaviour at the 32-bit overflow boundary. 17 // The 64-bit long solves this by placing the overflow so far into the future, that no system 18 // will encounter this in practice. 19 private final AtomicLong idx = new AtomicLong(); 20 private final EventExecutor[] executors; 21 22 GenericEventExecutorChooser(EventExecutor[] executors) { 23 this.executors = executors; 24 } 25 26 @Override 27 public EventExecutor next() { 28 return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)]; 29 } 30 }
选到一个NioEventLoop,执行注册,调用的是NioEventLoop的register。子类没有这个方法,由父类SingleThreadEventLoop实现。
1 public ChannelFuture register(Channel channel) { 2 return register(new DefaultChannelPromise(channel, this)); 3 }
创建了一个DefaultChannelPromise把通道和NioEventLoop都封装进去,DefaultChannelPromise是对Future做了加强,也是异步回调,可以设置很多监听,也可以将结果写入。
1 public ChannelFuture register(final ChannelPromise promise) { 2 ObjectUtil.checkNotNull(promise, "promise"); 3 promise.channel().unsafe().register(this, promise); 4 return promise; 5 }
Channel.Unsafe
这里的Unsafe不是JDK
的Unsafe,是netty封装的Channel的内部类,因为他是一些操作NIO底层的方法,不建议外部用的,所有也叫Unsafe。
我们所说的把通道注册进bossGroup实际上就是调用他的实现类。promise.channel()方法返回NioServerSocketChannel,他的unsafe方法由它的父类AbstractNioChannel实现。
1 public NioUnsafe unsafe() { 2 3 return (NioUnsafe) super.unsafe(); 4 } 5 //父类unsafe方法 6 public Unsafe unsafe() { 7 8 return unsafe; 9 }
返回值 在初始化AbstractChannel时 指定
1 protected AbstractChannel(Channel parent) { 2 3 this.parent = parent; 4 id = newId(); 5 unsafe = newUnsafe(); 6 pipeline = newChannelPipeline(); 7 }
newUnsafe(),又由子类AbstractNioMessageChannel实现
1 protected AbstractNioUnsafe newUnsafe() { 2 3 return new NioMessageUnsafe(); 4 }
所以config().group().register(channel)调用的是NioMessageUnsafe.register方法,但是自己没有这个方法,所以最终调用父类AbstractUnsafe的register方法,AbstractUnsafe为AbstractChannel的内部类.
这个就是真正的注册方法:
1 public final void register(EventLoop eventLoop, final ChannelPromise promise) { 2 ObjectUtil.checkNotNull(eventLoop, "eventLoop"); 3 if (isRegistered()) { 4 promise.setFailure(new IllegalStateException("registered to an event loop already")); 5 return; 6 } 7 if (!isCompatible(eventLoop)) { 8 promise.setFailure( 9 new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); 10 return; 11 } 12 13 AbstractChannel.this.eventLoop = eventLoop;//设置通道的事件循环,1对1,只设置一次 14 //只能当前线程是eventLoop的线程才可以注册,防止多线程并发问题,所以即使多线程来操作,也是安全的,会按照一定顺序提交到任务队列里 15 if (eventLoop.inEventLoop()) { 16 register0(promise); 17 } else {//否则就当做任务提交给eventLoop的任务队列 18 try { 19 eventLoop.execute(new Runnable() { 20 @Override 21 public void run() { 22 register0(promise); 23 } 24 }); 25 } catch (Throwable t) { 26 logger.warn( 27 "Force-closing a channel whose registration task was not accepted by an event loop: {}", 28 AbstractChannel.this, t); 29 closeForcibly(); 30 closeFuture.setClosed(); 31 safeSetFailure(promise, t); 32 } 33 } 34 }
这里有个很巧妙的点,就是eventLoop.inEventLoop()这个的判断,就是判断调用这个方法的是不是eventLoop的线程,如果是,那就是同一个线程调用,直接就注册,否则属于多线程调用,可能会有问题,所以还是提交一个任务给eventLoop的线程去执行,这样就是单线程,不会有线程安全问题。
此时当前线程是main肯定不是事件循环里的线程,事件循环里的线程还没创建呢,所以会提交到队列。eventLoop为NioEventLoop,执行父类SingleThreadEventExecutor的execute方法。
1 public void execute(Runnable task) { 2 3 ObjectUtil.checkNotNull(task, "task"); 4 execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task)); 5 }
1 private void execute(Runnable task, boolean immediate) { 2 boolean inEventLoop = inEventLoop(); 3 addTask(task);//添加任务到taskQueue 4 if (!inEventLoop) {//当前线程不是事件循环线程 5 startThread();//让线程工厂开启线程 6 if (isShutdown()) { 7 boolean reject = false; 8 try { 9 if (removeTask(task)) { 10 reject = true; 11 } 12 } catch (UnsupportedOperationException e) { 13 // The task queue does not support removal so the best thing we can do is to just move on and 14 // hope we will be able to pick-up the task before its completely terminated. 15 // In worst case we will log on termination. 16 } 17 if (reject) { 18 reject(); 19 } 20 } 21 } 22 23 if (!addTaskWakesUp && immediate) { 24 wakeup(inEventLoop);//唤醒线程,添加了一个空的任务去唤醒 25 } 26 }
task为register0(promise);方法,addTask是给taskqueue添加task,
startThread();
1 private static final int ST_NOT_STARTED = 1;//没启动 2 private static final int ST_STARTED = 2;//启动了 3 private static final int ST_SHUTTING_DOWN = 3;//正在关闭中 4 private static final int ST_SHUTDOWN = 4;//关闭了 5 private static final int ST_TERMINATED = 5;//终止了 6 7 private void startThread() { 8 if (state == ST_NOT_STARTED) { 9 if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { 10 boolean success = false; 11 try { 12 doStartThread();//具体的开启线程 13 success = true; 14 } finally { 15 if (!success) { 16 STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED); 17 } 18 } 19 } 20 } 21 }
doStartThread()
其实里面就是调用SingleThreadEventExecutor.this.run();,这里其实executor.execute已经启动了新的线程来执行new Runnable()里的任务,也就是执行NioEventLoop. run()。
1 private void doStartThread() { 2 assert thread == null; 3 executor.execute(new Runnable() { 4 @Override 5 public void run() { 6 thread = Thread.currentThread(); 7 if (interrupted) { 8 thread.interrupt(); 9 } 10 11 boolean success = false; 12 updateLastExecutionTime(); 13 try { 14 SingleThreadEventExecutor.this.run(); 15 success = true; 16 } catch (Throwable t) { 17 logger.warn("Unexpected exception from an event executor: ", t); 18 } finally { 19 for (;;) { 20 int oldState = state; 21 if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( 22 SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) { 23 break; 24 } 25 } 26 27 // Check if confirmShutdown() was called at the end of the loop. 28 if (success && gracefulShutdownStartTime == 0) { 29 if (logger.isErrorEnabled()) { 30 logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + 31 SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " + 32 "be called before run() implementation terminates."); 33 } 34 } 35 36 try { 37 // Run all remaining tasks and shutdown hooks. At this point the event loop 38 // is in ST_SHUTTING_DOWN state still accepting tasks which is needed for 39 // graceful shutdown with quietPeriod. 40 for (;;) { 41 if (confirmShutdown()) { 42 break; 43 } 44 } 45 46 // Now we want to make sure no more tasks can be added from this point. This is 47 // achieved by switching the state. Any new tasks beyond this point will be rejected. 48 for (;;) { 49 int oldState = state; 50 if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet( 51 SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) { 52 break; 53 } 54 } 55 56 // We have the final set of tasks in the queue now, no more can be added, run all remaining. 57 // No need to loop here, this is the final pass. 58 confirmShutdown(); 59 } finally { 60 try { 61 cleanup(); 62 } finally { 63 // Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify 64 // the future. The user may block on the future and once it unblocks the JVM may terminate 65 // and start unloading classes. 66 // See https://github.com/netty/netty/issues/6596. 67 FastThreadLocal.removeAll(); 68 69 STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); 70 threadLock.countDown(); 71 int numUserTasks = drainTasks(); 72 if (numUserTasks > 0 && logger.isWarnEnabled()) { 73 logger.warn("An event executor terminated with " + 74 "non-empty task queue (" + numUserTasks + ')'); 75 } 76 terminationFuture.setSuccess(null); 77 } 78 } 79 } 80 } 81 }); 82 }
executor在SingleThreadEventExecutor构造方法中实例化,this.executor = ThreadExecutorMap.apply(executor, this);
1 public static Executor apply(final Executor executor, final EventExecutor eventExecutor) { 2 ObjectUtil.checkNotNull(executor, "executor"); 3 ObjectUtil.checkNotNull(eventExecutor, "eventExecutor"); 4 return new Executor() { 5 @Override 6 public void execute(final Runnable command) { 7 executor.execute(apply(command, eventExecutor)); 8 } 9 }; 10 }
apply方法中的executor为SingleThreadEventExecutor构造方法的入参参数,根据继承关系可知SingleThreadEventExecutor构造方法在NioEventLoop构造方法中调用了,而NioEventLoop构造方法在之前已说明,在
MultithreadEventExecutorGroup的构造方法中调用所以apply方法中的executor为ThreadPerTaskExecutor
ThreadPerTaskExecutor该类的execute比较简单,利用DefaultThreadFactory实话为thread实现异步执行,但是thread为FastThreadLocalThread而不是原生的thread.
apply方法中的eventExecutor为SingleThreadEventExecutor对象eventExecutor
apply方法内部的apply
1 public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) { 2 ObjectUtil.checkNotNull(command, "command"); 3 ObjectUtil.checkNotNull(eventExecutor, "eventExecutor"); 4 return new Runnable() { 5 @Override 6 public void run() { 7 setCurrentEventExecutor(eventExecutor); 8 try { 9 command.run(); 10 } finally { 11 setCurrentEventExecutor(null); 12 } 13 } 14 }; 15 }
setCurrentEventExecutor(eventExecutor);设置本地线程变量,利用了FastThreadLocal
command.run()方法调用的为doStartThread()方法内部的异步执行方法体SingleThreadEventExecutor.this.run();下文详解
sync()
main线程如果完成了注册流程后,就会调用sync()尝试阻塞,调用的就是:
ChannelFuture channelFuture = b.bind(hostname, port).sync();
最终到了DefaultPromise的await()方法:
1 public Promise<V> await() throws InterruptedException {
2 if (isDone()) {//完成了就返回
3 return this;
4 }
5
6 if (Thread.interrupted()) {
7 throw new InterruptedException(toString());
8 }
9
10 checkDeadLock();
11
12 synchronized (this) {
13 while (!isDone()) {//判断是否完成
14 incWaiters();
15 try {
16 wait();//阻塞
17 } finally {
18 decWaiters();
19 }
20 }
21 }
22 return this;
23 }
如果这个时候注册和绑定端口完成了,就会返回,否则就会wait();阻塞。
safeSetSuccess(promise);
绑定是事件循环的线程去做的,完成绑定后会调用safeSetSuccess(promise);方法,设置成功的结果,唤醒阻塞的主线程。最终是调用DefaultPromise的checkNotifyWaiters:
1 protected final void safeSetSuccess(ChannelPromise promise) { 2 if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) { 3 logger.warn("Failed to mark a promise as success because it is done already: {}", promise); 4 } 5 }
调用DefaultChannelPromise的trySuccess()
1 private boolean setValue0(Object objResult) { 2 if (RESULT_UPDATER.compareAndSet(this, null, objResult) || 3 RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { 4 if (checkNotifyWaiters()) { 5 notifyListeners(); 6 } 7 return true; 8 } 9 return false; 10 }
方法checkNotifyWaiters()
1 private synchronized boolean checkNotifyWaiters() { 2 if (waiters > 0) { 3 notifyAll(); 4 } 5 return listeners != null; 6 }
方法notifyListeners()
1 private void notifyListeners() { 2 EventExecutor executor = executor(); 3 if (executor.inEventLoop()) { 4 final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); 5 final int stackDepth = threadLocals.futureListenerStackDepth(); 6 if (stackDepth < MAX_LISTENER_STACK_DEPTH) { 7 threadLocals.setFutureListenerStackDepth(stackDepth + 1); 8 try { 9 notifyListenersNow(); 10 } finally { 11 threadLocals.setFutureListenerStackDepth(stackDepth); 12 } 13 return; 14 } 15 } 16 17 safeExecute(executor, new Runnable() { 18 @Override 19 public void run() { 20 notifyListenersNow(); 21 } 22 }); 23 }
注册绑定监听并阻塞
阻塞被唤醒后,就注册一个监听的回调,然后阻塞关闭事件,其实就意味着不关闭就永远运行着了:
1 ChannelFuture cf = bootstrap.bind(8888).sync(); 2 cf.addListener((ChannelFutureListener) future -> { 3 if (cf.isSuccess()) { 4 System.out.println("监听端口 8888 成功"); 5 } else { 6 System.out.println("监听端口 8888 失败"); 7 } 8 }); 9 System.out.println("服务器开始提供服务"); 10 cf.channel().closeFuture().sync();
当然也可以这样写,先添加监听,然后阻塞,反正结果都是一样的:
1 ChannelFuture cf = bootstrap.bind(8888); 2 cf.addListener((ChannelFutureListener) future -> { 3 if (cf.isSuccess()) { 4 System.out.println("监听端口 8888 成功"); 5 } else { 6 System.out.println("监听端口 8888 失败"); 7 } 8 }); 9 System.out.println("服务器开始提供服务"); 10 cf.sync();
如果addListener是添加在同步后面的,此时可能事件循环线程已经完成了注册和绑定,addListener里面就会提交一个任务,然后事件循环会去执行这个任务。
控制台打印结果
服务器开始提供服务
监听端口 8888 成功
至此注册流程基本结束,后续就看事件循环中的新线程如何运行了。