zoukankan      html  css  js  c++  java
  • netty源码解析(4.0)-6 线程模型-IO线程EventLoopGroup和NIO实现(一)

    接口定义
    io.netty.channel.EventLoopGroup extends EventExecutorGroup
    方法
    说明
    ChannelFuture register(Channel channel)
    把一个channel注册到一个EventLoop
    ChannelFuture register(Channel channel, ChannelPromise promise);
    同上
    io.netty.channel.EventLoop extends OrderedEventExecutor, EventLoopGroup
    方法
    说明
    EventLoopGroup parent()
    得到创建这个eventLoop的EventLoopGroup
    EventLoopGroup定义的主要方法是register, 这个方法的语义是把channel和eventLoop绑定在一起。一个channel对应一个eventLoop, 一个eventLoop会持有多个channel。
     
     
    I/O线程EventLoopGroup的抽象实现
    io.netty.channel.MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup
    io.netty.channel.SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop
    两个类主功能都是实现了EventLoopGroup定义的register方法
    MultithreadEventLoopGroup
    public ChannelFuture register(Channel channel) {
    return next().register(channel);
    }
    public ChannelFuture register(Channel channel, ChannelPromise promise) {
    return next().register(channel, promise);
    }
     
    SingleThreadEventLoop
    public ChannelFuture register(Channel channel) {
    return register(channel, new DefaultChannelPromise(channel, this));
    }
    public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
    channel.unsafe().register(this, promise);
    return promise;
    }
    register的实现主要是为了调用Channel.Unsafe实例的register方法。
     
    NIO实现
    io.netty.channel.nio.NioEventLoopGroup extends MultithreadEventLoopGroup
    io.netty.channel.nio.NioEventLoop extends SingleThreadEventLoop
    NioEventLoopGroup是在MultithreadEventLoopGroup基础上实现了对JDK NIO Selector的封装, 它实现以下几个功能:
    • 创建selector
    • 在selector上注册channel感兴趣的NIO事件
    • 实现EventExecutor的run方法,定义NIO事件和Executor任务的处理流程。
    • 把NIO事件转换成对channel unsafe的调用或NioTask的调用
    • 控制线程执行I/O操作和排队任务的用时比例
    • 处理epoll selector cpu 100%的bug
     
    下面来具体分析这几个功能的实现。
     
    创建Selector
    NioEventLoop#openSelector()实现了创建selector的功能,默认情况下,使用SelectorProvider#openSelector()方法创建一个新个selector:
    final Selector unwrappedSelector = provider.openSelector();
    如果设置环境变量io.netty.noKeySetOptimization=true, 会创建一个selectedKeySet = new SelectedSelectionKeySet(), 然后使用java的反射机制把selector的selectedKeys和publicSelectedKeys替换成selectedKeySet,具体步骤是:
    1.得到selector的真正类型: sun.nio.ch.SelectorImpl
    Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
    @Override
    public Object run() {
    try {
    return Class.forName(
    "sun.nio.ch.SelectorImpl",
    false,
    PlatformDependent.getSystemClassLoader());
    } catch (Throwable cause) {
    return cause;
    }
    }
    });
    final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
    2.替换selector是属性unwrappedSelector
    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
    selectedKeysField.set(unwrappedSelector, selectedKeySet);
    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
    之所以会设计一个这样的优化选项,是因为一般情况下调用完selector的select或selectNow方法后需要调用Selector#selectedKeys()得到触发NIO事件的的SelectableChannel,这样优化之后,可以直接从selectedKeySet中得到已经触发了NIO事件的SelectableChannel。
     
     
    在selector上注册channel感兴趣的NIO事件
    NioEventLoop提供了unwrappedSelector方法,这个方法返回了它创建好的Selector实例。这样任何的外部类都可以把任意的SelectableChannel注册到这selector上。在AbstractNioChannel中, doRegister方法的实现就是使用了这个方法:
    selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
    另外,它还提供了一个register方法:
    public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task)
    这个方法会把task当成SelectableChannel的附件注册到selector上:
    ch.register(selector, interestOps, task);
     
     
    实现EventExecutor的run方法,定义NIO事件和Executor任务的处理流程
    在NioEventLoop的run方法中实现NIO事件和EventExecutor的任务处理逻辑,这个run方法在io.netty.util.concurrent.SingleThreadEventExecutor中定义。在上一章中,我们看到了DefaultEventExecutor中是如何实现这个run方法的,这里我们将要看到这run方法的另一个实现。和SingleThreadEventExecutor中的run方法相比,NioEventLoop的run方法不仅要及时地执行taskQueue中的任务,还要能及时地处理NIO事件,因此它会同时检查selector中的NIO事件和和taskQueue队列,任何一个中有事件需要处理或有任务需要执行,它不会阻塞线程。同时它也保证了在没有NIO事件和任务的情况下线程不会无谓的空转浪费CUP资源。
    run主要实现如下,为了更清晰的说明它的主要功能,我对原来的代码进行了一些删减。
    for(;;){
    try{
    //phase1: 同时检查NIO事件和任务
    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
    case SelectStrategy.CONTINUE:
    continue;
    case SelectStrategy.SELECT:
    select(wakenUp.getAndSet(false)); //在taskQueue中没有任务的时候执行select
    }
     
    //phase2: 进入处理NIO事件,执行executor任务
    try{
    //处理NIO事件
    processSelectedKeys();
    }finally{
    //处理taskQueu中的任务
    runAllTasks();
    }
    }catch(Throwable t){
    handleLoopException(t);
    }
    }
     
    run方法有两个阶段构成:
     
    phase1: 检查NIO事件或executor任务,如果有任何的NIO事件或executor任务进入phase2。
    这样阶段的主要工作在selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())和select中完成。
     
    selectStrategy.calculateStrategy实现
    selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())
    这行代码的含义是: 如果hasTasks() == true, 调用以下selector#selectNow, 然后进入phase2。 否则调用select。这里使用了strategy模式,默认的strategy实现是io.netty.channe.DefaultSelectStrategy implements SelectStrategy
    @Override
    public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
    return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
    }
    DefaultSelectStrategy实现了SelectStrategy接口,这接口定义了两个常量:
    int SELECT = -1;
    int CONTINUE = -2;
    运行时selectSuppler参数传入的是selectNowSupplier, 它的实现如下:
    private final IntSupplier selectNowSupplier = new IntSupplier() {
    @Override
    public int get() throws Exception {
    return selectNow();
    }
    };
    这里的get方法调用了selectNow, selectNow调用的是Selector#selectNew方法,这个方法的返回值是>=0。
    hashTasks的传入的参数是hasTask()的返回值: return !taskQueue.isEmpty();
    代码读到这里就会发现,使用默认的的SelectStrategy实现,calculateStrategy在hasTasks()==true时返回值>=0, hasTasks() == false时返回值是SelectStrategy.SELECT,不会返回SelectStrategy.CONTINUE。
     
    select实现
    select的执行逻辑是:
    1. 计算超select方法的结束时间selectDeadLineNanos
    long currentTimeNanos = System.nanoTime();
    long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
    2. 进入循环,检查超时--超时跳出循环。
    long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
    if (timeoutMillis <= 0) {
    if (selectCnt == 0) {
    selector.selectNow();
    selectCnt = 1;
    }
    break;
    }
    3. 如果在select执行过程中有executor任务提交或可以当前的wakeUp由false变成true, 跳出循环
    if (hasTasks() && wakenUp.compareAndSet(false, true)) {
    selector.selectNow();
    selectCnt = 1;
    break;
    }
    4. 调用selector#select等待NIO事件。
    int selectedKeys = selector.select(timeoutMillis);
    selectCnt ++;
    5. 如果满足这些条件的任何一个,跳出循环: 有NIO事件、wakeUp的新旧值都是true、taskQueue中有任务、有定时任务到期。
    if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
    break;
    }
    6. 如果线程被中断,跳出循环。
    if (Thread.interrupted()) {
    break;
    }
    7. 如果selector.select超时,没有检查到任何NIO事件, 会在下次循环开始时跳出循环。 如果每次超时,跳到第2步继续下一次循环。
    long time = System.nanoTime();
    if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
    selectCnt = 1;
    }
    currentTimeNanos = time;
     
    select 最迟会在当前时间>= selectDeadLineNanos时返回,这个时间是最近一个到期的定时任务执行的时间,换言之如果没有任何的NIO事件或executor任务,select会在定时任务到期时返回。如果没有定时任务,delayNanos(currentTimeNanos)返回的值是 TimeUnit.SECONDS.toNanos(1),即1秒。 select会在检查到任何NIO事件或executor任务时返回,为了保证这点,在selector.select(timeoutMillis)前后都会调用hasTasks检查executor任务,为了能在调用executet提交任务时唤醒selector.select,NioEventLoop覆盖了SingleThreadEventExecutor的wake方法:
    protected void wakeup(boolean inEventLoop) {
    if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
    selector.wakeup();
    }
    }
    这个方法会及时的唤醒selector.select, 保证新提交的任务可以得到及时的执行。
     
    phase2: 进入处理NIO事件,执行executor任务
    这个阶段是先调用processSelectedKeys()处理NIO事件,然后掉用 runAllTasks()处理所有已经到期的定时任务和已经在排队的任务。这个阶段还实现了NIO事件和executor任务的用时比例管理,这个特性稍后会详细分析。
     
  • 相关阅读:
    linux查看CPU性能及工作状态的指令mpstat,vmstat,iostat,sar,top
    Linux vmstat命令实战详解
    dstat 性能监测工具
    sysstat 工具
    Linux命令详解----iostat
    Linux CPU实时监控mpstat命令详解
    Linux Top 命令解析 比较详细
    Linux统计/监控工具SAR详细介绍
    ubuntu 添加用户到已存在的组
    Ubuntu 14.04 使用速度极快的Genymotion 取代蜗牛速度的原生AVD模拟器
  • 原文地址:https://www.cnblogs.com/brandonli/p/10100139.html
Copyright © 2011-2022 走看看