zoukankan      html  css  js  c++  java
  • Netty源码分析第2章(NioEventLoop)---->第4节: NioEventLoop线程的启动

     

    Netty源码分析第二章: NioEventLoop

     

    第四节: NioEventLoop线程的启动

     

    之前的小节我们学习了NioEventLoop的创建以及线程分配器的初始化, 那么NioEventLoop是如何开启的呢, 我们这一小节继续学习

    NioEventLoop的开启方法在其父类SingleThreadEventExecutor中的execute(Runnable task)方法中, 我们跟到这个方法:

    @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        //判断当前线程是不是eventLoop线程
        boolean inEventLoop = inEventLoop();
        //如果是eventLoop线程
        if (inEventLoop) {
            addTask(task);
        } else {
            //不是eventLoop线程启动线程
            startThread();
            //添加task
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }
        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

    这个方法传入一个Runnble对象, 也就是一个任务

    首先boolean inEventLoop = inEventLoop()方法会判断是不是NioEventLoop线程

    跟进 inEventLoop()方法:

    @Override
    public boolean inEventLoop() {
        return inEventLoop(Thread.currentThread());
    }

    这里inEventLoop(Thread.currentThread())方法传入了当前线程对象, 这个方法会调用当前类的inEventLoop(Thread thread)方法

    跟进inEventLoop(Thread thread)方法:

    @Override
    public boolean inEventLoop(Thread thread) {
        return thread == this.thread;
    }

    我们看到判断的依据是当前线程对象是不是NioEventLoop绑定的线程对象, 这里我们会想到开启线程肯定会为NioEventLoop绑定一个线程对象, 如果判断当前线程对象不是当前NioEventLoop绑定的线程对象, 说明执行此方法的线程不是当前NioEventLoop线程, 那么这个线程如何初始化的, 后面我们会讲到, 我们继续看execute(Runnable task)方法:

    如果是NioEventLoop线程, 则会通过addTask(task)添加任务, 通过NioEventLoop异步执行, 那么这个task是什么时候执行的, 同样后面会讲到

    跟一下addTask(task):

    protected void addTask(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        //如果添加不成功
        if (!offerTask(task)) {
            reject(task);
        }
    }

    这里offerTask(task)代表添加一个task, 跟进去:

    final boolean offerTask(Runnable task) {
        if (isShutdown()) {
            reject();
        }
        //往taskQ中添加一个task
        return taskQueue.offer(task);
    }

    我们看到taskQueue.offer(task)将一个task添加到任务队列, 而这个任务队列taskQueue就是我们NioEventLoop初始化的时候与NioEventLoop唯一绑定的任务队列

    回顾一下初始构造方法:

    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, 
                                        boolean addTaskWakesUp, int maxPendingTasks, 
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
        this.executor = ObjectUtil.checkNotNull(executor, "executor");
        taskQueue = newTaskQueue(this.maxPendingTasks);
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }

    在这里通过 taskQueue = newTaskQueue(this.maxPendingTasks) 创建了taskQueue

    回到execute(Runnable task)方法中, 我们继续往下看:

    如果不是NioEventLoop线程我们通过startThread()开启一个NioEventLoop线程

    跟到startThread()之前, 我们先继续往下走:

    开启NioEventLoop线程之后, 又通过addTask(task)taskQueue添加任务

     

    最后我们注意有这么一段代码:

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }

    addTaskWakesUp代表添加task之后, NioEventLoopselect()操作是不是要唤醒, 这个属性是在初始化NioEventLoop的时候传入的, 大家可以回顾下, 默认是false, 这里!addTaskWakesUp就是需要唤醒, wakesUpForTask(task)addTaskWakesUp意义相同, 默认是true, 可以看代码:

    protected boolean wakesUpForTask(Runnable task) {
        return true;
    }

    这里恒为true, 所以这段代码就是添加task时需要通过wakeup(inEventLoop)唤醒, 这样NioEventLoop在做select()操作时如果正在阻塞则立刻唤醒, 然后执行任务队列的task

    回到execute(Runnable task)方法中我们跟进开启线程的startThread()方法中:

    private void startThread() {
        //判断线程是否启动, 未启动则启动
        if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                //当前线程未启动, 则启动
                doStartThread();
            }
        }
    }

    前面的判断是判断当前NioEventLoop线程是否启动, 如果未启动, 则通过doStartThread()方法启动, 我们第一次执行execute(Runnable task)线程是未启动的, 所以会执行doStartThread(), 后续该线程则不会再执行doStartThread()方法

    我们跟进doStartThread()方法中:

    private void doStartThread() { 
        assert thread == null;
        //线程执行器执行线程(所有的eventLoop共用一个线程执行器)
        executor.execute(new Runnable() {
            @Override
            public void run() {
                //将当前线程复制给属性
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }
                boolean success = false;
                updateLastExecutionTime();
                try {
                    //开始轮询
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                   //代码省略
                }
            }
        });
    }

    我们重点关注executor.execute()这个方法, 其中executor就是我们创建NioEventLoop的线程器, execute()就是开启一个线程

    回顾下execute()方法:

    public void execute(Runnable command) {
        //起一个线程
        threadFactory.newThread(command).start();
    }

    我们看到通过线程工厂开启一个线程, 由于前面的小节已经剖析, 这里不再赘述

     

    开启线程则执行Runnble类中的run()方法, 我们看到在run()方法里通过 thread = Thread.currentThread() 将新开启的线程对象赋值NioEventLoopthread的属性, 这样就可以通过线程对象的判断, 来确定是不是NioEventLoop线程了

     

    后面我们看到 SingleThreadEventExecutor.this.run() , 这里this, 就是当前NioEventLoop对象, 而这里的run()方法, 就是NioEventLoop中的run()方法, 在这个run()方法中, 真正开始了selector的轮询工作, 对于run()方法的详细剖析, 我们会在之后的小节中进行

     

    刚才我们剖析了NioEventLoop的启动方法, 那么根据我们的分析, 就是第一次调用NioEventLoopexecute(Runnable task)方法的时候, 则会开启NioEventLoop线程, 之后的调用只是往taskQueue中添加任务, 那么第一次是什么时候开启的呢?这里我们要回顾上一章讲过的内容

     

    上一章中我们讲过在AbstractServerBootstrap中有个initAndRegister()方法, 这个方法主要用于channel的初始化和注册, 其中注册的代码为:

    ChannelFuture regFuture = config().group().register(channel);

    其中group()我们剖析过是Boss线程的group, 我们剖析过其中的register(channel)方法:

    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

    首先跟到next()方法:

    public EventLoop next() {
        return (EventLoop) super.next();
    }

    首先调用了其父类MultithreadEventExecutorGroupnext方法, 跟进去:

    public EventExecutor next() {
        return chooser.next();
    }

    这里chooser, 就是初始化NioEventLoopGroup的线程选择器, 为此分配了不同的策略, 这里不再赘述, 通过这个方法, 返回一个NioEventLoop线程

     

    回到MultithreadEventLoopGroup类的register()方法中, next().register(channel)代表分配后的NioEventLoopregister()方法, 这里会调用NioEventLoop的父类SingleThreadEventLoop类中的register()方法

    跟到SingleThreadEventLoop类中的register()方法:

    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }

    DefaultChannelPromise是一个监听器, 它会跟随channel的读写进行监听, 绑定传入的channelNioEventLoop, 有关Promise后面的章节会讲到

    这里我们继续跟进register(new DefaultChannelPromise(channel, this))

    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

    unsafe()方法返回创建channel初始化的unsafe()对象, 如果是NioSeverSocketChannel, 则绑定NioMessageUnsafe对象, 上一小节进行剖析过这里不再赘述

     

    最终这个unsafe对象会调用到AbstractChannel的内部类AbstractUnsafe中的register()方法, 这里register(), 无论是客户端channel和服务器channel都会通过这个一个register注册, 在以后的客户端接入章节中我们会看到

    这里我们继续看register方法:

    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        //代码省略
        //所有的复制操作, 都交给eventLoop处理(1)
        AbstractChannel.this.eventLoop = eventLoop;
        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            try {
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        //做实际主注册(2)
                        register0(promise);
                    }
                });
            } catch (Throwable t) {
                //代码省略
            }
        }
    }

    这里我们上一小节分析过, 不再陌生, 这里只分析有关NioEventLoop相关的内容

    我们首先看到 AbstractChannel.this.eventLoop = eventLoop , 获取当前channelNioEventLoop, 通过上一章的学习, 我们知道每个channel创建的时候会绑定一个NioEventLoop

    这里通过eventLoop.inEventLoop()判断当前线程是否是NioEventLoop线程, inEventLoop()方法在前面的小节剖析过, 这里不再赘述

     

    如果是NioEventLoop线程则通过register0(promise)方法做实际的注册, 但是我们第一次执行注册方法的时候, 如果是服务器channel是则是由server的用户线程执行的, 如果是客户端channel, 则是由Boss线程执行的, 所以走到这里均不是当前channelNioEventLoop的线程, 于是会走到下面的eventLoop.execute()方法中

     

    eventLoop.execute()上一小节剖析过, 就是将task添加到taskQueue中并且开启器NioEventLoop线程, 所以, 在这里就开启了NioEventLoop线程, 有关开启步骤, 可以通过上一小节内容进行回顾

     

    这里注意一点, 有的资料会讲第一次开启NioEventLoop线程是在AbstractBootstrapdoBind0(regFuture, channel, localAddress, promise)方法中开启的, 个人经过debug和分析, 实际上并不是那样的, 希望大家不要被误导

     

     

    简单看下doBind0(regFuture, channel, localAddress, promise)方法:

    private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { 
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    //绑定端口
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

    这里虽然调用了eventLoopexecute()方法, 但是eventLoop线程在注册期间已经启动, 所以这里不会重复启动, 只会将任务添加到taskQueue

     

    其实这里我们也能够看出, 其实绑定端口的相关操作, 同样是也是eventLoop线程中执行的

     

    上一节: 初始化线程选择器

    下一节: 优化selector

  • 相关阅读:
    Codeforces Round #256 (Div. 2/B)/Codeforces448B_Suffix Structures(字符串处理)
    【android】优秀的UI资源站点集合
    升级iOS8系统后,保险箱Pro、私人保险箱、私密相冊打开就闪退的官方解决方式
    js产生随机数
    java实现各种数据统计图(柱形图,饼图,折线图)
    Matlab画图-非常具体,非常全面
    Lucene教程具体解释
    NAND FLASH
    Jenkins(二)
    iOS 本地通知
  • 原文地址:https://www.cnblogs.com/xiangnan6122/p/10203016.html
Copyright © 2011-2022 走看看