zoukankan      html  css  js  c++  java
  • Netty源码—三、select

    NioEventLoop功能

    前面channel已经准备好了,可以接收来自客户端的请求了,NioEventLoop作为一个线程池,只有一个线程,但是有一个queue存储了待执行的task,由于只有一个线程,所以run方法是死循环,除非线程池shutdown。

    这个run方法的主要作用:

    1. 执行selector.select,监听IO事件,并处理IO事件
    2. 由于NioEventLoop兼有线程池的功能,执行线程池中任务
    // io.netty.channel.nio.NioEventLoop#run
    protected void run() {
        // loop,循环处理IO事件或者处理线程池中的task任务
        for (;;) {
            try {
                // 判断接下来是是执行select还是直接处理IO事件和执行队列中的task
                // hasTask判断当前线程的queue中是否还有待执行的任务
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        // NioEventLoop默认不会有这种状态
                        continue;
                    case SelectStrategy.SELECT:
                        // 说明当前queue中没有task待执行
                        select(wakenUp.getAndSet(false));
    					// 唤醒epoll_wait
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                        // fall through
                    default:
                }
    
                cancelledKeys = 0;
                needsToSelectAgain = false;
                // 这个比例是处理IO事件所需的时间和花费在处理task时间的比例
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    // 如果比例是100,表示每次都处理完IO事件后,执行所有的task
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        // 保证能执行所有的task
                        runAllTasks();
                    }
                } else {
                    // 记录处理IO事件开始的时间
                    final long ioStartTime = System.nanoTime();
                    try {
                        // 处理IO事件
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        // 当前时间减去处理IO事件开始的时间就是处理IO事件花费的时间
                        final long ioTime = System.nanoTime() - ioStartTime;
                        // 执行task的时间taskTime就是ioTime * (100 - ioRatio) / ioRatio
                        // 如果taskTime时间到了还有未执行的task,runAllTasks也会返回
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                // 如果已经shutdown则关闭所有资源
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
    
    // io.netty.channel.DefaultSelectStrategy#calculateStrategy
    public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
        // 如果还有task待执行则先执行selectNow,selectNow是立即返回的,不是阻塞等待
        // 如果没有待执行的task则执行select,有可能是阻塞等待IO事件
        return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
    }
    
    // io.netty.channel.nio.NioEventLoop#selectNowSupplier
    private final IntSupplier selectNowSupplier = new IntSupplier() {
        @Override
        public int get() throws Exception {
            // epoll_wait的参数timeout可以指定超时时间,selectNow传入的参数是0,也就是不超时等待立即返回
            return selectNow();
        }
    };
    

    select过程

    epoll模型中最重要的一部分来了,Java把epoll_wait封装成了一个selector,可以理解为多路复用选择器,所以在调用selector.select过程中最后都是通过epoll_wait实现的,下面先看看SelectorImpl的两个select方法

    public int select(long timeout)
        throws IOException {
        if (timeout < 0)
            throw new IllegalArgumentException("Negative timeout");
        // timeout = 0,传递给epoll_wait的参数是-1,表示阻塞等待
        // timeout > 0,表示超时等待timeout时间后返回
        return lockAndDoSelect((timeout == 0) ? -1 : timeout);
    }
    
    // 调用epoll_wait阻塞等待
    public int select() throws IOException {
        return select(0);
    }
    
    // 调用epoll_wait立即返回
    public int selectNow() throws IOException {
        return lockAndDoSelect(0);
    }
    

    上面三个select方法都调用了lockAndDoSelect,只是timeout参数不同,其实最后就是调用epoll_wait参数不同,epoll_wait有一个timeout参数,表示超时时间

    • -1:阻塞
    • 0:立即返回,非阻塞
    • 大于0:指定微秒
    // sun.nio.ch.EPollSelectorImpl#doSelect
    protected int doSelect(long timeout) throws IOException {
        if (closed)
            throw new ClosedSelectorException();
        // 省略中间代码...
        // 开始poll,这里的pollWrapper是EPollArrayWrapper
        pollWrapper.poll(timeout);
        // 省略中间代码...
    
        int numKeysUpdated = updateSelectedKeys();
        // 如果epoll_wait是因为wakeup pipe解除阻塞返回
        if (pollWrapper.interrupted()) {
            // Clear the wakeup pipe
            // 清除中断文件描述符接收到的IO事件
            pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
            synchronized (interruptLock) {
                pollWrapper.clearInterrupted();
                // 读取完管道中的数据
                IOUtil.drain(fd0);
                interruptTriggered = false;
            }
        }
        return numKeysUpdated;
    }
    
    int poll(long timeout) throws IOException {
        // 这里会向epoll注册每个socket需要监听的事件
        updateRegistrations();
        // 调用epollWait,这是一个native方法
        updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
        for (int i=0; i<updated; i++) {
            if (getDescriptor(i) == incomingInterruptFD) {
                interruptedIndex = i;
                interrupted = true;
                break;
            }
        }
        return updated;
    }
    

    看看epollWait的native实现

    // jdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c
    JNIEXPORT jint JNICALL
    Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,
                                                jlong address, jint numfds,
                                                jlong timeout, jint epfd)
    {
        struct epoll_event *events = jlong_to_ptr(address);
        int res;
    
        if (timeout <= 0) {           /* Indefinite or no wait */
            // epoll_wait参数的含义是
            // epfd,创建的epoll句柄
            // events是一个结构体指针,如果有IO事件发生,linux会将事件放在这个结构体中返回
            // numfds是上面指针指向的结构体的个数,也就是最多能接收的IO事件的个数
            // timeout是超时时间
            RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);
        } else {                      /* Bounded wait; bounded restarts */
            res = iepoll(epfd, events, numfds, timeout);
        }
    
        if (res < 0) {
            JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
        }
        return res;
    }
    

    从native实现上可以看出最终调用了epoll_wait(在timeout >= 0),接着看看epoll_wait的里一个参数events的来源。在上一篇文章里面我们说了channel注册的时候会将自己需要监听的事件类型保存在sun.nio.ch.EPollArrayWrapper#eventsLow中,而上面EPollArrayWrapper#poll中又调用了updateSelectedKeys来注册每个socket监听的事件

    // sun.nio.ch.EPollArrayWrapper#getUpdateEvents
    // 获取需要监听的文件描述符对应的事件
    private byte getUpdateEvents(int fd) {
        // 如果没有超出预定义的数组大小则直接从数组中获取
        if (fd < MAX_UPDATE_ARRAY_SIZE) {
            return eventsLow[fd];
        } else {
            // 超出预订单数组大小的部分从map中获取
            Byte result = eventsHigh.get(Integer.valueOf(fd));
            // result should never be null
            return result.byteValue();
        }
    }
    
    // sun.nio.ch.EPollArrayWrapper#updateRegistrations
    // 这个方法是在epoll_wait前把需要监听的文件描述符及其需要监听的事件注册到epoll上
    private void updateRegistrations() {
        synchronized (updateLock) {
            int j = 0;
            // 每调用一次setInterest,updateCount加1
            while (j < updateCount) {
                // 需要监听的文件描述符
                int fd = updateDescriptors[j];
                // 需要监听的事件,比如channel注册之后的事件是
                short events = getUpdateEvents(fd);
                boolean isRegistered = registered.get(fd);
                int opcode = 0;
    
                if (events != KILLED) {
                    if (isRegistered) {
                        opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
                    } else {
                        opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
                    }
                    if (opcode != 0) {
                        // 调用epollCtl来add、update或者delete对应文件描述符坚挺的事件
                        epollCtl(epfd, opcode, fd, events);
                        if (opcode == EPOLL_CTL_ADD) {
                            registered.set(fd);
                        } else if (opcode == EPOLL_CTL_DEL) {
                            registered.clear(fd);
                        }
                    }
                }
                j++;
            }
            updateCount = 0;
        }
    }
    
    
    

    上面epollCtl又是一个native方法

    // jdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c
    JNIEXPORT void JNICALL
    Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,
                                               jint opcode, jint fd, jint events)
    {
        struct epoll_event event;
        int res;
    
        event.events = events;
        event.data.fd = fd;
    
        // opcode,EPOLL_CTL_ADD(注册新的fd到epfd), EPOLL_CTL_MOD(修改已经注册的fd的监听事件), EPOLL_CTL_DEL(从epfd删除一个fd);
        // fd,需要监听的socket对应的文件描述符
        // event,该文件描述符监听的事件
        RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);
    	// 省略中间代码...
    }
    

    关于select过程中的中断说明

    这里的中断是什么?

    这里中断并不是操作系统层面的中断,只是中断epoll_wait。由于epoll_wait可能会阻塞等待IO事件(timeout = -1),这里的中断就是指中断epoll_wait,即时返回。也就是让select即时返回

    这里的中断是怎么实现的?

    由于epoll_wait处在等待的情况下的时候,如果有文件描述符上有事件发生,epoll_wait就会返回,所以基本思路就是在epoll监控的文件描述符上产生IO事件,具体实现原理就是使用管道创建两个文件描述符fd0,fd1(EPollSelectorImpl),fd0用来作为读描述符,fd1作为写描述符,然后将读描述符注册到epoll上,如果向fd1写内容,epoll发现fd0有IO事件就会返回,起到了让epoll_wait及时返回的作用。

    什么时候会中断

    1. 调用Thread.interrupt()
    2. selector关闭的时候
    3. 可以直接调用sun.nio.ch.EPollSelectorImpl#wakeup,这是一个public方法

    中断的方法是sun.nio.ch.EPollArrayWrapper#interrupt(),这个方法会调用一个native方法

    // jdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c
    JNIEXPORT void JNICALL
    Java_sun_nio_ch_EPollArrayWrapper_interrupt(JNIEnv *env, jobject this, jint fd)
    {
        int fakebuf[1];
        fakebuf[0] = 1;
        // 传入的文件描述符是sun.nio.ch.EPollArrayWrapper#outgoingInterruptFD,也就是创建的pipe的写文件描述符fd1,向pipe的fd1写入一个字节的1
        if (write(fd, fakebuf, 1) < 0) {
            JNU_ThrowIOExceptionWithLastError(env,"write to interrupt fd failed");
        }
    }
    

    这个时候epoll_wait就收到中断文件描述符sun.nio.ch.EPollArrayWrapper#incomingInterruptFD,也就是创建的pipe的读文件描述符上有IO事件产生,epoll_wait可以返回。

    所以调用到方法EPollArrayWrapper#interrupt()就可以中断文件描述符,而方法EPollSelectorImpl#wakeup调用了EPollArrayWrapper#interrupt()。

    那么为什么调用Thread.interrupt()的时候也会中断epoll_wait呢?

    因为在Thread.interrupt()方法中

    public void interrupt() {
        if (this != Thread.currentThread())
            checkAccess();
    
        synchronized (blockerLock) {
            Interruptible b = blocker;
            if (b != null) {
                interrupt0();           // Just to set the interrupt flag
                // 上面设置完中断标志位后,会调用当前线程的blocker的interrupt方法
                b.interrupt(this);
                return;
            }
        }
        interrupt0();
    }
    

    而在sun.nio.ch.EPollSelectorImpl#doSelect方法中,开始poll之前会调用begin方法

    protected final void begin() {
        if (interruptor == null) {
    		// 新建一个interruptor
            interruptor = new Interruptible() {
                public void interrupt(Thread ignore) {
                    // 此时Thread.interrupt()中的blocker就是这个匿名内部类,也就是调用的这个interrupt方法
                    AbstractSelector.this.wakeup();
                }};
        }
        // 设置当前线程的interruptor
        AbstractInterruptibleChannel.blockedOn(interruptor);
        Thread me = Thread.currentThread();
        if (me.isInterrupted())
            interruptor.interrupt(me);
    }
    

    所以Thread.interrupt()会调用到EPollSelectorImpl#wakeup方法,也就可以起到中断select的作用。

    什么时候清除中断标志?

    可以不止一次的中断select,为了实现这个功能,每次在中断之后斗湖清除相关的中断标志。在sun.nio.ch.EPollSelectorImpl#doSelect方法中pollWrapper.poll完成之后

    int poll(long timeout) throws IOException {
        updateRegistrations();
        updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
        for (int i=0; i<updated; i++) {
            // epollWait返回之后会判断有没有中断文件描述符
            if (getDescriptor(i) == incomingInterruptFD) {
                // 设置中断的文件描述符处于返回的pollArray的index
                interruptedIndex = i;
                interrupted = true;
                break;
            }
        }
        return updated;
    }
    protected int doSelect(long timeout) throws IOException {
        // 省略中间代码...
            pollWrapper.poll(timeout);
    	// // 省略中间代码...
        // 如果
        if (pollWrapper.interrupted()) {
            // Clear the wakeup pipe
            // 将返回的中断文件描述符的IO事件清空,也就是用户不会读取到用做中断的文件描述符
            pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
            synchronized (interruptLock) {
                // sun.nio.ch.EPollArrayWrapper#interrupted设置为false,可再次中断
                pollWrapper.clearInterrupted();
                // 读取用以中断的文件描述符上所有的数据
                IOUtil.drain(fd0);
                interruptTriggered = false;
            }
        }
        return numKeysUpdated;
    }
    

    总结

    到目前为止已经看清了Java对应linux中epoll相关api的封装

    // 创建epoll文件描述符
    // 对应到Java就是创建selector
    int epoll_create(int size);
    
    // 打开一个网络通讯端口,也就是创建一个socket,创建返回一个文件描述符
    // 对应到Java就是创建一个socketChannel
    int socket(int domain, int type, int protocol);
    
    // 将socket对应的文件描述符和ip:port绑定在一起
    // 对应于Java中绑定ip:port
    int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
    
    // 表明socket对应的文件描述符处于监听状态,并且最多允许有backlog个客户端处于连接等待状态
    // 对应于Java中bind中调用listen方法
    int listen(int sockfd, int backlog);
    
    // 控制某个文件描述符上的事件:add、update、delete事件
    // 对应于Java中调用select过程中添加每个channel关注的事件
    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    
    // 等待监控的所有描述符有事件发生
    // 对应于Java中select的时候等待有IO事件发生
    int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
    
  • 相关阅读:
    docker私有仓库harbor的安装及使用
    docker 搭建 zipkin
    docker安装redis并以配置文件方式启动
    docker 安装 mysql5.7.25
    什么是反射?可以解决什么问题?
    什么是死锁?如何防止死锁?
    说说TCP和UDP的区别
    什么是XSS攻击?
    怎么更改Visual Studio项目名字
    windows上使用getopt和getopt_long
  • 原文地址:https://www.cnblogs.com/sunshine-2015/p/9363593.html
Copyright © 2011-2022 走看看