本文来源:http://blog.csdn.net/zxhoo/article/details/17709765
感谢博主。本文仅供参考学习。
前一篇文章画了一张巨大的类图,但是只重点分析了NioEventLoopGroup的继承层次,这篇文章来看看NioEventLoop的继承层次。
从Executor接口说起
- public interface Executor {
- void execute(Runnable command);
- }
ExecutorService接口
ExecutorService接口扩展了Executor接口,添加了两组方法,一组用来终止Executor的执行:
- void shutdown();
- List<Runnable> shutdownNow();
- boolean isShutdown();
- boolean isTerminated();
- boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
- <T> Future<T> submit(Callable<T> task);
- <T> Future<T> submit(Runnable task, T result);
- Future<?> submit(Runnable task);
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
- <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
- <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
ScheduledExecutorService接口
ScheduledExecutorService接口扩展ExecutorService接口,添加了调度能力:
- public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
- public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
- public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
- public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
AbstractExecutorService抽象类
AbstractExecutorService实现了ExecutorService接口,实现了ExecutorService接口里定义的任务执行方法。
EventExecutorGroup接口
前面几个是java.util.concurrent包里头的类或接口,从EventExecutorGroup开始,进入Netty的世界。
- public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
- boolean isShuttingDown();
- Future<?> shutdownGracefully();
- Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
- Future<?> terminationFuture();
- EventExecutor next();
- }
EventExecutorGroup还实现了Iterable接口,并其提供了next()方法返回其中的一个EventExecutor。这样来看,EventExecutorGroup这个类名起的真是太恰当了:它逻辑上实际上就是一组EventExecutor:
EventExecutor接口
EventExecutor给EventExecutorGroup添加了下面这些方法:
- public interface EventExecutor extends EventExecutorGroup {
- EventExecutorGroup parent();
- boolean inEventLoop();
- boolean inEventLoop(Thread thread);
- <V> Promise<V> newPromise();
- <V> ProgressivePromise<V> newProgressivePromise();
- <V> Future<V> newSucceededFuture(V result);
- <V> Future<V> newFailedFuture(Throwable cause);
- }
EventLoopGroup接口
EventLoopGroup接口比较简单,只定义了两个方法,用来注册Channel:
- public interface EventLoopGroup extends EventExecutorGroup {
- ChannelFuture register(Channel channel);
- ChannelFuture register(Channel channel, ChannelPromise promise);
- }
AbstractEventExecutor
AbstractEventExecutor为EventExecutor接口里的一些方法提供了默认实现,这些默认实现大概可以分为下面几组:
- 无参数版方法默认实现(调用有参数版)
- inEventLoop()
- shutdownGracefully()
- 转发给AbstractExecutorService
- submit(Runnable task)
- submit(Runnable task, T result)
- submit(Callable<T> task)
- 直接抛出UnsupportedOperationException
- schedule(Callable<V> callable, long delay, TimeUnit unit)
- scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
- scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
- 覆盖(override)AbstractExecutorService中的方法实现
- newTaskFor(Runnable runnable, T value)
- newTaskFor(Callable<T> callable)
最后next()方法只是返回自己:
- @Override
- public EventExecutor next() {
- return this;
- }
SingleThreadEventExecutor
再往下看是SingleThreadEventExecutor,从名字上看,SingleThreadEventExecutor肯定是在单线程里执行task。SingleThreadEventExecutor类的代码有800多行,但是只要大概看一下就可以了解它是如何工作的:
- 它内部有两个task队列,taskQueue和delayedTaskQueue,普通任务会进入taskQueue,调度任务进入delayedTaskQueue
- 默认的taskQueue是一个LinkedBlockingQueue
- 实现了很多任务执行相关的方法,但核心的run()方法留给子类来实现,子类需要在run()方法里取出并执行task
下面是类图:
SingleThreadEventLoop
SingleThreadEventLoop比较简单,下面是它的全部代码:
- public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
- protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
- super(parent, threadFactory, addTaskWakesUp);
- }
- @Override
- public EventLoopGroup parent() {
- return (EventLoopGroup) super.parent();
- }
- @Override
- public EventLoop next() {
- return (EventLoop) super.next();
- }
- @Override
- public ChannelFuture register(Channel channel) {
- return register(channel, channel.newPromise());
- }
- @Override
- public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
- if (channel == null) {
- throw new NullPointerException("channel");
- }
- if (promise == null) {
- throw new NullPointerException("promise");
- }
- channel.unsafe().register(this, promise);
- return promise;
- }
- }
NioEventLoop
最后终于轮到NioEventLoop了,不过遗憾的是,NioEventLoop太复杂了,我还没有完全看明白。下面是完整的继承层次结构图:
NioEventLoop的run()方法
最后我想简单分析一下NioEventLoop的run()方法,下面是run()方法的部分代码:- @Override
- protected void run() {
- for (;;) {
- // process io events
- ...
- final long ioTime = System.nanoTime() - ioStartTime;
- final int ioRatio = this.ioRatio;
- runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
- ...
- }
- }
也就是说,NioEventLoop在单线程里同时处理IO事件和其他任务,NioEventLoop尽量(但不能保 证)按照给定的比率(默认为50%)来分配花在这两种事情上的时间。换句话说,我们不应该在NioEventLoop里执行耗时的操作(比如数据库操 作),这样会卡死NioEventLoop,降低程序的响应性。
- 顶
- 1
- 踩
- 0