NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
// 获取Selector对象的SelectorProvider对象
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
// 作为Select选择策略以及一次select的个数
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
// Selector的包装
final SelectorTuple selectorTuple = openSelector();
// Selector选择器,用来轮询IO事件
this.selector = selectorTuple.selector;
// 没有包装的原始Selector选择器
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
// 尾部任务队列
tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
}
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue,
RejectedExecutionHandler rejectedHandler) {
super(parent);
// 新增任务时是否唤醒线程,默认为false
this.addTaskWakesUp = addTaskWakesUp;
// 最大任务等待数
this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
// 当前执行任务的线程池
this.executor = ThreadExecutorMap.apply(executor, this);
// 任务队列
this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
// 任务队列满时的异常处理器
this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
Netty是怎么实现一个Channel绑定一个线程,一个线程绑定多个Channel的呢?
从上面的代码段可以看出,一个AbstractChannel会持有一个NioEventLoop对象,而一个NioEventLoop持有一个Selector对象。当使用
SelectableChannel#register()
API将Channel注册到Selector选择器中时,由于使用的是NioEventLoop中的Selector对象,所以当NioEventLoop进行轮询时,就可以只轮询当前线程绑定的所有Channel对象。
接下来详细分析一下NioEventLoop#run()
里的流程
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
// 获取selector策略
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
// 获取最近的定时任务开始时间,如果不存在定时任务,则为-1
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
// 如果不存在异步任务,直接执行I/O事件轮询
strategy = select(curDeadlineNanos);
}
} finally {
nextWakeupNanos.lazySet(AWAKE);
}
default:
}
} catch (IOException e) {
// 当发生未知异常时,需重新生成一个新的Selector,替换老的Selector,并把注册到老的Selector上面的channel注册到新的Selector上去
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}
// 省略部分代码
if (ioRatio == 100) {
try {
if (strategy > 0) {
// 处理轮询到的I/O事件
processSelectedKeys();
}
} finally {
// 执行异步任务
ranTasks = runAllTasks();
}
}
// 省略部分代码
} catch (CancelledKeyException e) {
// 省略部分代码
} catch (Throwable t) {
handleLoopException(t);
}
// 省略部分代码
}
}
获取SELECT策略时,需要用到一个
hasTasks()
方法,用来判断是否存在异步任务。如果存在异步任务,则会执行selector.selectNow()
方法,该方法不会阻塞,接下来的switch就会跳到default分支。netty通过这种方式可以保证当存在异步任务时,优先执行异步任务。当不存在异步任务时,就会执行到selector.select()
方法,在执行该方法之前,会通过nextScheduledTaskDeadlineNanos()
方法获取最近的定时任务的开始时间curDeadlineNanos,如果不存在定时任务,则为Long类型的最大值。接下来会使用curDeadlineNanos作为select的超时时间,如果不存在定时任务,也就是curDeadlineNanos为Long的最大值,就会执行selector.select()
方法,也就是不设置超时时间,直到有I/O事件为止才会返回。如果存在定时任务,则会通过定时任务的开始时间计算出select操作的超时时间。当有I/O事件响应时,则会通过
processSelectedKeys()
方法处理I/O事件,下面是代码段
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
从代码可以看出,这里处理有两个分支,一个是处理优化过的SelectedKey,另一个分支是处理正常的SelectedKey,什么是优化过的key?这里就需要看一下selectedKeys变量的数据结构。跟踪代码可以看出,selectedKeys的数据类型是
SelectedSelectionKeySet
,SelectedSelectionKeySet
底层存储结构是用的数组,所以它的遍历效率比较高,原生的存储结构是HashSet,其实也就是HashMap,它的遍历效率没有数组高。当通过SelectorProvider#openSelector()
方法获取一个selector时,这里有个特殊操作,就是一个优化开关,如果开关关闭,则使用原生的selectedKeys存储结构,也就是HashSet。如果开关打开,就会使用反射,将SelectorImpl类中的publicKeys与publicSelectedKeys字段替换成SelectedSelectionKeySet
的存储类型。可以看下面的代码片段
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
// 获取一个Selector实例
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
// 获取SelectorImpl的class
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
接下来再看一下processSelectedKeysOptimized()
方法的具体逻辑
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;
// 获取I/O事件类型
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
// 处理I/O事件
processSelectedKey(k, (AbstractNioChannel) a);
} else {
这里开始处理I/O事件,大家还记得将channel注册到selector时,可以填写一个attach参数。在这就可以通过
attachment()
方法获取事件类型,如果是AbstractNioChannel
类型,则处理I/O事件,否则处理异步任务。接下来有一个needsToSelectAgain标识,表示是否需要重新轮询一次I/O事件,为什么会有这个标识呢?我们先跟踪一下这个标识是在哪里写入的,最终发现,当channel从Selector取消注册超过256次时,就会将这个标志位置为true。
接下来继续看处理I/O事件的详细逻辑processSelectedKey()
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 省略部分代码
try {
int readyOps = k.readyOps();
// 处理连接事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
// 在ChinnelPipeline上传递Connect事件
unsafe.finishConnect();
}
// 处理可写事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
// 处理客户端的读事件与接收事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 在ChinnelPipeline上传递Read事件
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
Connect事件的传递比较简单,感兴趣的同学可以自己去看一下源码,这里我们主要看一下读写事件。
首先看一下写事件,从下面的代码可以看出,主要是把buffer里的数据刷新到socket缓冲区,然后发送到客户端
protected void flush0() {
if (inFlush0) {
// Avoid re-entrance
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
inFlush0 = true;
// 省略部分代码
try {
// 真正的将数据写到客户端,底层会调用Nio的SocketChannel#write()方法
doWrite(outboundBuffer);
} catch (Throwable t) {
// 省略部分代码
} finally {
inFlush0 = false;
}
}
下面我们再来看一下Netty是怎么处理读事件的
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
// 获取一个netty自己的内存分配器
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
// 将数据读取到readBuf中
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
// 增加读的次数
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 在ChannelPipeline上传播read事件
pipeline.fireChannelRead(readBuf.get(i));
}
// 清空读的缓冲区
readBuf.clear();
// 读取完成,记录读取总的字节数
allocHandle.readComplete();
// 传播一次读取完成的事件,一次读取完成可能包含多个SocketChannel
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!readPending && !config.isAutoRead()) {
// 移除read 事件
removeReadOp();
}
}
}
当Netty处理读事件时,首先会获取一个内存分配处理器,读取消息到分配的内存里(Netty内存分配器后面分析),然后在ChannelPipeline上传播读事件,最终会在channel上移除read事件。
最后我们来看一下Netty是怎么处理异步任务的
protected boolean runAllTasks(long timeoutNanos) {
// 从定时任务队列中取出任务添加到普通任务队列中
fetchFromScheduledTaskQueue();
// 从任务列表中取出一个任务
Runnable task = pollTask();
if (task == null) {
// 如果任务为空,表示没有普通任务可执行,直接执行tailTask任务队列中的任务
afterRunningAllTasks();
return false;
}
// 计算执行任务的截止时间(相对时间,当前时间减去服务启动时间再加上超时时间)
final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
// 安全的执行任务
safeExecute(task);
runTasks ++;
// 执行任务个数和0x3f(0011 1111)进行与运算,这里指的是当任务个数为64个时,进行一次截止时间判断
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
// 如果当前时间大于了截止时间,则中断执行任务,这是为了把时间留出一部分来执行I/O事件
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
// 如果任务为空,终止循环
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
// 最后执行尾部任务队列中的任务
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
首选执行异步任务有一个超时,通过这个超时时间来计算任务可以执行多久。首先netty会把定时任务合并到普通任务队列中,然后判断这个任务队列中是否有任务,如果没有任务,则先执行尾部任务队列,然后提前返回。如果有任务,则会计算任务的执行截止时间,每当任务执行了64个时,都会判断一下当前时间是否大于截止时间,如果大于了截止时间,则停止执行普通任务,然后再执行尾部任务队列。