zoukankan      html  css  js  c++  java
  • Netty源码解析 事件循环机制实现原理

    本文主要分享Netty中事件循环机制的实现。
    源码分析基于Netty 4.1

    EventLoop

    前面分享服务端和客户端启动过程的文章中说过,Netty通过事件循环机制(EventLoop)处理IO事件和异步任务,简单来说,就是通过一个死循环,不断处理当前已发生的IO事件和待处理的异步任务。示例如下

    while(true) {
    	process(selector.select());
    
    	process(getTask());
    }
    

    这种事件循环机制也是一种常用的IO事件处理机制,包括Redis,Mysql都使用了类似的机制。

    关于异步任务,前面文章说过,EventLoop实现了(jvm)Executor的接口,execute方法可以提供异步任务。
    register,bind,connect等操作,都会提交一个任务给EventLoop处理。如

    if (eventLoop.inEventLoop()) {
    	register0(promise);	
    } else {
    	eventLoop.execute(new Runnable() {	
    		public void run() {
    			register0(promise);
    		}
    	});
    }
    

    下面看一下Netty中事件循环机制相关的类。

    EventExecutor,事件执行器,负责处理事件。
    EventExecutorGroup维护了一个EventExecutor链表,它继承了ScheduledExecutorService,execute方法通过next方法选择一个EventExecutor,并调用EventLoop#execute处理事件。
    (EventExecutor继承了EventExecutorGroup,可以看做一个特殊的EventExecutorGroup,其execute方法可以提交一个任务任务)

    EventLoop,事件循环器,继承了EventExecutor,通过循环不断处理注册于其上的Channel的IO事件。
    EventLoopGroup接口则继承了EventExecutorGroup,负责调度EventLoop。

    SingleThreadEventExecutor实现了EventExecutor,它会创建一个新线程,并在该线程上处理事件,可以理解为单线程处理器。
    MultithreadEventExecutorGroup实现EventExecutorGroup,可以理解为多线程处理器(实际上是维护了多个EventExecutor,一个EventExecutor可以理解为一个线程),newChild方法构造具体的EventExecutor。
    MultithreadEventExecutorGroup可以配置EventExecutor数量,即线程数量。
    EventExecutorChooserFactory.EventExecutorChooser负责选择一个EventExecutor执行实际操作。

    NioEventLoop继承了SingleThreadEventExecutor,负责处理NIO事件。所以,一个NioEventLoop对象可以看做是一个线程。
    NioEventLoop也实现了EventLoop接口,它实现了事件循环机制,是Netty核心类。

    MultithreadEventLoopGroup继承了MultithreadEventExecutorGroup,并实现了EventLoopGroup,其newChild方法构造具体的EventLoop。
    NioEventLoopGroup#newChild会构建NioEventLoop。

    EventLoop各实现类关系如下

    启动

    SingleThreadEventExecutor关键字段

    private final Queue<Runnable> taskQueue;	// 待处理异步任务
    private volatile Thread thread;				// EventLoop执行线程,即SingleThreadEventExecutor创建的新线程
    private final Executor executor;			// java.util.concurrent.Executor,负责创建线程
    

    当我们通过execute方法提交任务时,如果还没有创建执行新线程,会通过SingleThreadEventExecutor#executor一个新线程,并在新线程中调用run方法(run方法由子类实现,负责实现事件循环机制,新线程是EventLoop真正执行线程)。

    SingleThreadEventExecutor#execute

    public void execute(Runnable task) {
    	...
    
    	boolean inEventLoop = inEventLoop();
    	// #1
    	addTask(task);
    	// #2
    	if (!inEventLoop) {
    		startThread();
    		// #3
    		if (isShutdown()) {
    			...
    		}
    	}
    	// #4
    	if (!addTaskWakesUp && wakesUpForTask(task)) {
    		wakeup(inEventLoop);
    	}
    }
    

    #1 添加任务到待处理列表
    #2
    inEventLoop方法,判断当前线程是否为EventLoop执行线程
    若当前线程非EventLoop执行线程,调用startThread方法启动一个新的线程,执行run方法。
    这里可以理解为启动EventLoop。
    #3 如果当前EventLoop已关闭,拒绝任务
    #4 若当前EventLoop线程阻塞正等待IO事件(Selector#select方法),调用wakeup方法唤醒线程执行该新增任务

    循环机制

    NioEventLoop#run方法负责实现NIO事件处理机制。

    protected void run() {
    	int selectCnt = 0;
    	// #1
    	for (;;) {
    
    			int strategy;
    			
    				// #2
    				strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
    				switch (strategy) {
    				case SelectStrategy.CONTINUE:
    					continue;
    
    				case SelectStrategy.BUSY_WAIT:
    					// fall-through to SELECT since the busy-wait is not supported with NIO
    
    				case SelectStrategy.SELECT:
    					// #3
    					long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
    					if (curDeadlineNanos == -1L) {
    						curDeadlineNanos = NONE; // nothing on the calendar
    					}
    					nextWakeupNanos.set(curDeadlineNanos);
    					try {
    						// #4
    						if (!hasTasks()) {
    							strategy = select(curDeadlineNanos);
    						}
    					} finally {
    						// #5
    						nextWakeupNanos.lazySet(AWAKE);
    					}
    					// fall through
    				default:
    				}
    				...
    			
    			// #6
    			selectCnt++;
    			cancelledKeys = 0;
    			needsToSelectAgain = false;
    			final int ioRatio = this.ioRatio;
    			boolean ranTasks;
    			// #7
    			if (ioRatio == 100) {
    				try {
    					if (strategy > 0) {
    						processSelectedKeys();
    					}
    				} finally {
    					// Ensure we always run tasks.
    					ranTasks = runAllTasks();
    				}
    			} else if (strategy > 0) {
    				final long ioStartTime = System.nanoTime();
    				try {
    					processSelectedKeys();
    				} finally {
    					// Ensure we always run tasks.
    					final long ioTime = System.nanoTime() - ioStartTime;
    					ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
    				}
    			} else {
    				ranTasks = runAllTasks(0); // This will run the minimum number of tasks
    			}
    			// #8
    			if (ranTasks || strategy > 0) {
    				if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
    					logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
    							selectCnt - 1, selector);
    				}
    				selectCnt = 0;
    			} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
    				selectCnt = 0;
    			}
    		
    		
    			// #9
    			if (isShuttingDown()) {
    				closeAll();
    				if (confirmShutdown()) {
    					return;
    				}
    			}
    		
    	}
    }
    

    为了版面整洁,这里删除了异常处理代码。
    #1 可以看到,这里通过一个死循环不断处理IO事件和异步任务。
    #2 如果当前存在待处理的任务,调用selector.selectNow(),这时会跳出switch语句,往下处理事件和任务,否则返回SelectStrategy.SELECT。
    #3 curDeadlineNanos,计算延迟任务队列中第一个任务的到期执行时间(即最晚还能延迟多长时间执行),没有任务则返回-1。
    更新nextWakeupNanos为阻塞时间。
    由于频繁调用(jvm)Selector.wakeup会造成性能消耗,NioEventLoop维护了一个唤醒标识nextWakeupNanos。nextWakeupNanos有三种值
    NONE -- 执行线程被阻塞;
    AWAKE -- 执行线程未阻塞;
    其他值 -- 执行线程被超时阻塞,在指定的时间后唤醒;
    NioEventLoop#wakeup方法中,只有nextWakeupNanos.getAndSet(AWAKE) != AWAKE成功才调用selector.wakeup()方法。
    #4
    这时如果还没有任务加入,则执行select,阻塞线程。select方法返回结果作为新的strategy。
    #5
    lazySet方法,设置值之后其他线程在短期内还是可能读到旧值
    这里将nextWakeupNanos设置为AWAKE,主要是减少wakeup方法中不必要的wakeup操作。
    所以使用lazySet方法也没有问题。
    #6 selectCnt增加
    旧版本的Java NIO在Linux Epoll实现上存在bug,(jvm)Selector.select方法可能在没有任何就绪事件的情况下返回,导致CPU空转,利用率飙升到100%。
    于是,Netty计算select方法重复调用次数selectCnt,并在selectCnt大于SELECTOR_AUTO_REBUILD_THRESHOLD配置(默认512)时,重建selector,从而规避该问题。
    幸好在JDK6_6u4,JDK7_b12已修复该Bug。
    #7 processSelectedKeys方法处理IO事件,runAllTasks方法处理任务。
    ioRatio表示执行IO事件所占CPU时间百分比,默认50,
    ioTime * (100 - ioRatio) / ioRatio,通过ioTime,ioRatio计算处理任务的CPU时间。
    #8 如果执行了任务或者select方法返回有效值,直接重置selectCnt。
    unexpectedSelectorWakeup方法中会在selectCnt大于SELECTOR_AUTO_REBUILD_THRESHOLD时重建selector。
    #9 如果是正在关闭状态,则要关闭所有的Channel

    IO事件

    下面看一下Eventloop中如何处理IO事件。
    NioEventLoop关键字段

    Selector unwrappedSelector;				// JVM中的Selector
    Selector selector;						// 优化后的SelectedSelectionKeySetSelector
    SelectedSelectionKeySet selectedKeys;	// 对(jvm)Selector#selectedKeys进行优化
    

    SelectedSelectionKeySetSelector每次调用select前都清除SelectedSelectionKeySet
    SelectedSelectionKeySet使用数组代替原Selector的中的HashSet,提高性能。数组默认大小为1024,不够用时扩展为原大小的2倍。

    NioEventLoop#构造方法 -> NioEventLoop#openSelector

    private SelectorTuple openSelector() {
        final Selector unwrappedSelector;
        try {
    		// #1
            unwrappedSelector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }
    
        if (DISABLE_KEY_SET_OPTIMIZATION) {
            return new SelectorTuple(unwrappedSelector);
        }
    
        ...
    
        final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
    
        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            
            public Object run() {
                try {
    				// #2
                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
    
                    ...
    
                    selectedKeysField.set(unwrappedSelector, selectedKeySet);
                    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                    return null;
                } ...
            }
        });
    
        ...
        selectedKeys = selectedKeySet;
        logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
    	// #3
        return new SelectorTuple(unwrappedSelector,
                                 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
    }
    

    #1 通过(jvm)SelectorProvider打开一个Selector
    #2 构造了selectedKeySet,并通过反射将该对象设置到Selector的selectedKeys,publicSelectedKeys属性中,这样Selector监听到的事件就会存储到selectedKeySet。
    #3 构造了SelectedSelectionKeySetSelector对象

    NioEventLoop#select负责阻塞线程,等待IO事件

    private int select(long deadlineNanos) throws IOException {
    	// #1
    	if (deadlineNanos == NONE) {
    		return selector.select();
    	}
    
    	// #2
    	long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
    	return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
    }
    

    #1 一直阻塞,知道发生IO事件或加入了新任务
    #2 计算阻塞时间,在原阻塞时间加上995微秒后转化为毫秒。
    如果原阻塞时间在5微秒内,就不阻塞了。

    IO事件的处理流程为
    NioEventLoop#processSelectedKeys -> (没有禁用SelectedSelectionKeySet)processSelectedKeysOptimized -> processSelectedKey

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    	final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    	...
    
    	try {
    		int readyOps = k.readyOps();
    		// #1
    		if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
    			int ops = k.interestOps();
    			ops &= ~SelectionKey.OP_CONNECT;
    			k.interestOps(ops);
    
    			unsafe.finishConnect();
    		}
    
    		// #2
    		if ((readyOps & SelectionKey.OP_WRITE) != 0) {
    			// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
    			ch.unsafe().forceFlush();
    		}
    
    		// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
    		// to a spin loop
    		// #3
    		if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    			unsafe.read();
    		}
    	} catch (CancelledKeyException ignored) {
    		unsafe.close(unsafe.voidPromise());
    	}
    }
    

    #1 处理OP_CONNECT
    移除关注事件OP_CONNECT,否则Selector.select(..)将不断返回
    前面分享客户端启动过程的文章说过了,这里会调用AbstractNioUnsafe#finishConnect,完成客户端Connect操作,可回顾《客户端启动过程解析》。
    #2 先处理OP_WRITE事件,能够尽早写入数据释放内存,这里涉及flush操作,后面有文章解析。
    #3 处理OP_READ或OP_ACCEPT事件。
    对于ServerChannel,这里会调用NioMessageUnsafe#read,处理OP_ACCEPT事件,可回顾《客户端启动过程解析》。
    对于SocketChannel,这里会调用NioByteUnsafe#read,进行读写操作,后面有文章解析。

    异步任务

    下面看一下Eventloop中如何处理异步任务。
    run方法#4步骤 -> SingleThreadEventExecutor#runAllTasks

    protected boolean runAllTasks(long timeoutNanos) {
        // #1
        fetchFromScheduledTaskQueue();
        Runnable task = pollTask();
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }
    
        final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
        	// #2
            safeExecute(task);
    
            runTasks ++;
    
            // #3
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }
            // #4
            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }
        // #5
        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }
    

    #1 AbstractScheduledEventExecutor#scheduledTaskQueue中存放的是定时任务,
    SingleThreadEventExecutor#taskQueue中存放的是待处理的任务。
    fetchFromScheduledTaskQueue方法会获取已到期的定时任务,移动到SingleThreadEventExecutor#taskQueue。
    #2 执行获取的任务
    #3 每个64个任务检查一次是否超时,因为nanoTime()方法也是一个相对昂贵的操作。
    #4 取下一个任务,继续处理
    #5 预留的扩展方法。

    EventLoop在4.1.44版本被优化,代码做了较大改动,删除了原来的wakeup标志,改用nextWakeupNanos,代码更清晰。
    请参考 -- Clean up NioEventLoop

    Netty是由事件驱动的,服务端register,bind,客户端connect等操作都是提交异步任务给EventLoop处理的
    ,而Accept,Read/Writ,Connect等IO事件都都需要EventLoop的处理。
    大家可以结合前面分析服务端和客户端启动过程的文章,理解EventLoop是如何驱动Netty工作的。

    如果您觉得本文不错,欢迎关注我的微信公众号。您的关注是我坚持的动力!

  • 相关阅读:
    redis限流器的设计
    使用HystrixCommand封装http请求
    自定义的最简单的可回调的线程任务CallbackableFeatureTask(模仿google的ListenableFutureTask)
    通用的规则匹配算法(原创)(java+.net)
    redis缓存切面实现(支持缓存key的spel表达式)
    使用join和CountDownLatch来等待线程结束
    shell日常实战练习——通过监视用户登陆找到入侵者
    Nginx web服务器
    nginx——防盗链功能
    nginx 隐藏nginx版本号
  • 原文地址:https://www.cnblogs.com/binecy/p/13922558.html
Copyright © 2011-2022 走看看