zoukankan      html  css  js  c++  java
  • epoll(2) 源码分析

    epoll(2) 源码分析

    文本内核代码取自 5.0.18 版本,和上一篇文章中的版本不同是因为另一个电脑出了问题,但是总体差异不大。

    引子留下的问题

    上一篇文章中留下了几个问题,本文将针对这几个问题进行分析:

    1. epoll(2) 得到就绪事件的复杂度为何是 (O(1))
    2. epoll(2) 和普通的文件相比的区别在哪里,比如和 eventfd(2) 比较
    3. epoll(2) 相对 poll(2)/select(2) 多提供了 EPOLLET 的触发模式,现象在上面可以看到区别,实现是如何做到的。
    4. epoll(2) 相互关注时,有就绪事件到来会产生相互唤醒的问题,为何会出现这样的问题
    5. 对于问题 4,内核是如何解决这种相互唤醒的问题。

    解答在文末.

    关键的数据结构

    在第一次阅读代码时,优先掌握该功能的核心数据结构有利于对于全局的把控。

    struct eventpoll

    struct eventpoll 对应一个 epoll 实例的结构,包含所有的文件事件,作为 epoll 的接口使用。

    /*
     * This structure is stored inside the "private_data" member of the file
     * structure and represents the main data structure for the eventpoll
     * interface.
     */
    struct eventpoll {
            spinlock_t lock;              // 保护整个数据结构
            struct mutex mtx;             // 保护正在操作的文件
    
            wait_queue_head_t wq;         // sys_epoll_wait() 使用的等待队列
            wait_queue_head_t poll_wait;  // epoll 作为被监视文件时 file->poll() 使用的等待队列,使用较少
                                          // poll_wait 队列作用和 eventfd 文件中的 wqh 队列相同
    
            struct list_head rdllist;     // 就绪的文件链表,连接 epitem 上的 rdllink
            struct epitem *ovflist;       // 也是用来串联就绪的事件,作为 rdlist 的备胎使用
    
            struct rb_root_cached rbr;    // 所有关注的文件事件的红黑树,在内核空间维护
    
            /* wakeup_source used when ep_scan_ready_list is running */
            struct wakeup_source *ws;     // 不分析该功能,只知道为唤醒源就行
            struct user_struct *user;     // epoll 创建操作所属的用户
            struct file *file;            // epoll 关联的文件结构
    
            /* used to optimize loop detection check */
            int visited;
            struct list_head visited_list_link;
    
    #ifdef CONFIG_NET_RX_BUSY_POLL
            /* used to track busy poll napi_id */
            unsigned int napi_id;
    #endif
    };
    

    struct epitem

    struct epitem 每个文件描述符添加到 eventpoll 接口将产生一个 epitem项 被链接到 eventpoll 中的红黑树上。

    /*
     * Each file descriptor added to the eventpoll interface will
     * have an entry of this type linked to the "rbr" RB tree.
     * Avoid increasing the size of this struct, there can be many thousands
     * of these on a server and we do not want this to take another cache line.
     */
    struct epitem {
            union {
                    /* RB tree node links this structure to the eventpoll RB tree */
                    struct rb_node rbn;
                    /* Used to free the struct epitem */
                    struct rcu_head rcu;
            };
    
            struct list_head rdllink; // 用于连接到 eventpoll->rdllist 的链表,和 rdllist 一起使用
            struct epitem *next;      // 连接到 eventpoll->ovflist 的指针,和 ovflist 一起使用
            struct epoll_filefd ffd;  // 文件 file 结构 + fd,作为红黑树的节点
    
            int nwait;                // 附加在 poll 操作上活跃的等待队列的数量
            /* List containing poll wait queues */
            struct list_head pwqlist; // 注释是包含轮询等待队列的链表,但是实际上个人更倾向为这个链表只是为了连接 eppoll_entry 结构。
                                      // 和上面那个 nwait 一样,这两个变量的添加操作只会发生一次,就是调用 ep_insert() 的时候,但是 epitem 在一个 epoll 实例中只会调用一次。
    
            struct eventpoll *ep;     // 当前 epitem 的所有者
            struct list_head fllink;  // 连接文件结构的链表
    
            /* wakeup_source used when EPOLLWAKEUP is set */
            struct wakeup_source __rcu *ws;  // 唤醒源,不考虑
    
            /* The structure that describe the interested events and the source fd */
            struct epoll_event event;  // 用户传入的 event 结构
    };
    

    struct eppoll_entry

    struct eppoll_entry 为文件的等待队列项回调和epoll相关联的结构. 类似为poll(2) 中的 poll_table_entry

    /* Wait structure used by the poll hooks */
    struct eppoll_entry {
            struct list_head llink;   // 连接至 epitem 中的 pwqlist 链表中
            struct epitem *base;      // epitem 所属者
    
            wait_queue_entry_t wait;  // 等待队列项
            wait_queue_head_t *whead; // 等待队列头,关注文件的等待队列,如 eventfd->pwh
    };
    

    epoll(2) 相关的系统调用

    整个 fs/eventpoll.c 的代码量较多(2000+), 所以这里节选部分主要的代码进行分析, 一些对于参数的合法性的校验就不放出来了.

    epoll 的实现做了两种区分: 关注的文件是否为 epoll 类型, 我们先对非epoll文件进行分析, 这个部分代码比较直观易懂, 对epoll文件的处理考虑了多种情况, 留作之后分析.

    epoll_create(2)

    创建一个新的文件描述符, 对应一个 epoll 实例.

    1. 为 eventpoll 结构分配内存并且初始化
    2. 获取一个新的文件并且与 eventpoll 结构相关联.
    /*
     * Open an eventpoll file descriptor.
     */
    static int do_epoll_create(int flags)
    {
            int error, fd;
            struct eventpoll *ep = NULL;
            struct file *file;
    
            error = ep_alloc(&ep);  // 分配内存并初始化, 代码较直观, 不做分析
    
            fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
            if (fd < 0) {
                    error = fd;
                    goto out_free_ep;
            }
            file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
                                     O_RDWR | (flags & O_CLOEXEC));
    
            ep->file = file;
            fd_install(fd, file);
    }
    

    epoll_ctl(2)

    epoll_ctl 为epoll的控制函数, 根据函数的 @op 入参分发需要进行的操作.

    函数的功能主体比较清晰, 也分为两部分:

    1. 对监视文件为epoll经行循环检测
    2. 根据操作类型分发具体执行的函数
    /*
     * The following function implements the controller interface for
     * the eventpoll file that enables the insertion/removal/change of
     * file descriptors inside the interest set.
     */
    SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
                    struct epoll_event __user *, event)
    {
            // 加锁部分为对监视的文件是epoll时候进行的循环检测, 这部分后面分析, 这里只看非 epoll 文件的处理
            mutex_lock_nested(&ep->mtx, 0);
            if (op == EPOLL_CTL_ADD) {
                    if (!list_empty(&f.file->f_ep_links) ||
                                                    is_file_epoll(tf.file)) {
                            full_check = 1;
                            mutex_unlock(&ep->mtx);
                            mutex_lock(&epmutex);
                            if (is_file_epoll(tf.file)) {
                                    error = -ELOOP;
                                    if (ep_loop_check(ep, tf.file) != 0) {
                                            clear_tfile_check_list();
                                            goto error_tgt_fput;
                                    }
                            } else
                                    list_add(&tf.file->f_tfile_llink,
                                                            &tfile_check_list);
                            mutex_lock_nested(&ep->mtx, 0);
                            if (is_file_epoll(tf.file)) {
                                    tep = tf.file->private_data;
                                    mutex_lock_nested(&tep->mtx, 1);
                            }
                    }
            }
    
            /*
             * Try to lookup the file inside our RB tree, Since we grabbed "mtx"
             * above, we can be sure to be able to use the item looked up by
             * ep_find() till we release the mutex.
             */
            epi = ep_find(ep, tf.file, fd);  // 从红黑树中寻找操作的文件
    
            error = -EINVAL;
            switch (op) {
            case EPOLL_CTL_ADD:
                    if (!epi) {  // 不存在就插入到eventpoll中
                            epds.events |= EPOLLERR | EPOLLHUP;
                            error = ep_insert(ep, &epds, tf.file, fd, full_check);
                    } else
                            error = -EEXIST;
                    if (full_check)
                            clear_tfile_check_list();
                    break;
            case EPOLL_CTL_DEL:
                    if (epi)
                            error = ep_remove(ep, epi);
                    else
                            error = -ENOENT;
                    break;
            case EPOLL_CTL_MOD:
                    if (epi) {
                            if (!(epi->event.events & EPOLLEXCLUSIVE)) {
                                    epds.events |= EPOLLERR | EPOLLHUP;
                                    error = ep_modify(ep, epi, &epds);
                            }
                    } else
                            error = -ENOENT;
                    break;
            }
            if (tep != NULL)
                    mutex_unlock(&tep->mtx);
            mutex_unlock(&ep->mtx);
    
            return error;
    }
    

    ep_insert()

    分配一个 epitem 的内存并初始化, 再将该 epitem 添加到 eventpoll 中的红黑树上.

    初始化过程也包含了几个部分:

    1. 对 epitem 结构进行初始化, 设置各成员变量的值.
    2. 调用目标文件的file->f_op->poll() 函数设置等待队列项回调函数, 这个是实现 epoll_wait(2) 复杂度为 (O(1)) 最重要的一步, 关注的文件产生就绪事件就会调用该回调函数 ep_ptable_queue_proc
    3. 返回就绪事件掩码, 将当前 epitem 添加到 eventpoll->rdllist 中, 唤醒 epoll_wait(2) 线程
    /*
     * Must be called with "mtx" held.
     */
    static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
                         struct file *tfile, int fd, int full_check)
    {
            int error, pwake = 0;
            __poll_t revents;
            struct epitem *epi;
    
            if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
                    return -ENOMEM;
    
            /* Item initialization follow here ... */
    
    
            /* Initialize the poll table using the queue callback */
            epq.epi = epi;
            init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
    
            /*
             * Attach the item to the poll hooks and get current event bits.
             * We can safely use the file* here because its usage count has
             * been increased by the caller of this function. Note that after
             * this operation completes, the poll callback can start hitting
             * the new item.
             */
            revents = ep_item_poll(epi, &epq.pt, 1);
    
            /* Add the current item to the list of active epoll hook for this file */
            spin_lock(&tfile->f_lock);
            list_add_tail_rcu(&epi->fllink, &tfile->f_ep_links);  // 将当前 epitem 添加到监视文件的 f_ep_links 链表上.
            spin_unlock(&tfile->f_lock);
    
            /*
             * Add the current item to the RB tree. All RB tree operations are
             * protected by "mtx", and ep_insert() is called with "mtx" held.
             */
            ep_rbtree_insert(ep, epi); // 将当前 epitem 添加到eventpoll的红黑树中
    
            /* If the file is already "ready" we drop it inside the ready list */
            if (revents && !ep_is_linked(epi)) { // 产生就绪事件并且当前 epitem 未添加进 eventpoll 中(这个有点儿明显)
                    list_add_tail(&epi->rdllink, &ep->rdllist); // 添加至 ep->rdllist, 留意这两个链表是一起出现的
    
                    /* Notify waiting tasks that events are available */
                    if (waitqueue_active(&ep->wq))  // wq 队列是 epoll_wait(2) 使用的, 这里唤醒调用 epoll_wait(2) 进入阻塞状态的线程.
                            wake_up_locked(&ep->wq);
                    if (waitqueue_active(&ep->poll_wait))  // 这里不直接唤醒是加锁的原因, poll_wait 队列属于被监视文件使用, 不应该在epoll实例中唤醒
                            pwake++;
            }
    
            spin_unlock_irq(&ep->wq.lock);
    
            /* We have to call this outside the lock */
            if (pwake)
                    ep_poll_safewake(&ep->poll_wait);
    
    }
    

    ep_remove()

    作用和 ep_insert() 相反, 释放内存, 删除与其它资源相关联的连接, 在互斥量 eventpoll->mtx 加锁下进行.

    /*
     * Removes a "struct epitem" from the eventpoll RB tree and deallocates
     * all the associated resources. Must be called with "mtx" held.
     */
    static int ep_remove(struct eventpoll *ep, struct epitem *epi)
    {
            struct file *file = epi->ffd.file;
    
            lockdep_assert_irqs_enabled();
    
            /*
             * Removes poll wait queue hooks.
             */
            ep_unregister_pollwait(ep, epi);  // 删除 epitem->pwqlist 关联的等待项链表
    
            /* Remove the current item from the list of epoll hooks */
            spin_lock(&file->f_lock);
            list_del_rcu(&epi->fllink);  // 从监视文件中的 file->f_ep_links 链表中删除当前 epitem
            spin_unlock(&file->f_lock);
    
            rb_erase_cached(&epi->rbn, &ep->rbr);  // 从 eventpoll 中的红黑树中删除当前 epitem 节点
    
            spin_lock_irq(&ep->wq.lock);
            if (ep_is_linked(epi))
                    list_del_init(&epi->rdllink);  // 从 eventpoll 中的就绪队列 rdllist 中删除当前 epitem 节点
            spin_unlock_irq(&ep->wq.lock);
    
            /*
             * At this point it is safe to free the eventpoll item. Use the union
             * field epi->rcu, since we are trying to minimize the size of
             * 'struct epitem'. The 'rbn' field is no longer in use. Protected by
             * ep->mtx. The rcu read side, reverse_path_check_proc(), does not make
             * use of the rbn field.
             */
            call_rcu(&epi->rcu, epi_rcu_free);  // 释放当前 epitem 的内存
    
            atomic_long_dec(&ep->user->epoll_watches);  // eventpoll 所属用户监视的 epitem数量减一
    
            return 0;
    }
    

    ep_modify()

    调整关注文件的事件.

    /*
     * Modify the interest event mask by dropping an event if the new mask
     * has a match in the current file status. Must be called with "mtx" held.
     */
    static int ep_modify(struct eventpoll *ep, struct epitem *epi,
                         const struct epoll_event *event)
    {
            int pwake = 0;
            poll_table pt;
    
            lockdep_assert_irqs_enabled();
    
            // 设置 file->f_op->poll 的回调函数为NULL, 因为在insert中已经设置了文件等待队列项的回调函数
            init_poll_funcptr(&pt, NULL);
    
            /*
             * Get current event bits. We can safely use the file* here because
             * its usage count has been increased by the caller of this function.
             * If the item is "hot" and it is not registered inside the ready
             * list, push it inside.
             */
            if (ep_item_poll(epi, &pt, 1)) {  // 调用f_op->poll() 获取文件的就绪事件
                    spin_lock_irq(&ep->wq.lock);
                    if (!ep_is_linked(epi)) {  // 未添加至 eventpoll 接口的就绪队列中
                            list_add_tail(&epi->rdllink, &ep->rdllist); // 添加
                            ep_pm_stay_awake(epi);  // 电源管理的函数, 不看
    
                            /* Notify waiting tasks that events are available */
                            if (waitqueue_active(&ep->wq))
                                    wake_up_locked(&ep->wq); // 唤醒调用 epoll_wait(2) 的线程
                            if (waitqueue_active(&ep->poll_wait))  // 分析同 ep_insert()
                                    pwake++;
                    }
                    spin_unlock_irq(&ep->wq.lock);
            }
    
            /* We have to call this outside the lock */
            if (pwake)
                    ep_poll_safewake(&ep->poll_wait);
    
            return 0;
    }
    

    epoll_wait(2)

    等待就绪的事件。

    ep_events_available 为检查是否存在就绪事件,其实就是检查 rdllist 和 ovflist 是否有被修改过,复杂度为 (O(1)).

    static inline int ep_events_available(struct eventpoll *ep)
    {
            return !list_empty_careful(&ep->rdllist) ||
                    READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR;
    }
    

    epoll_wait(2) 的主要逻辑由 ep_poll() 实现,核心逻辑分为两部分

    1. 检查就绪事件是否存在,存在执行 2,不存在根据超时时间进入阻塞状态和直接返回。
    2. 将就绪事件复制到用户空间,若是复制失败,在条件(见代码分析)满足的情况下执行 1,成功则返回。
    /*
     * Implement the event wait interface for the eventpoll file. It is the kernel
     * part of the user space epoll_wait(2).
     */
    static int do_epoll_wait(int epfd, struct epoll_event __user *events,
                             int maxevents, int timeout)
    {
            /*
             * At this point it is safe to assume that the "private_data" contains
             * our own data structure.
             */
            ep = f.file->private_data;
    
            /* Time to fish for events ... */
            error = ep_poll(ep, events, maxevents, timeout);
    }
    
    /*
     * ep_poll - Retrieves ready events, and delivers them to the caller supplied
     *           event buffer.
     */
    static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
                       int maxevents, long timeout)
    {
            int res = 0, eavail, timed_out = 0;
            u64 slack = 0;
            bool waiter = false;
            wait_queue_entry_t wait;
            ktime_t expires, *to = NULL;
    
            lockdep_assert_irqs_enabled();
    
            // 时间处理,略过
    
    fetch_events:  // 这为一整个获取就绪事件逻辑的开端
    
            if (!ep_events_available(ep))  // 无就绪事件
                    ep_busy_loop(ep, timed_out);  // 中断缓解技术对 中断频繁的设置
    
            eavail = ep_events_available(ep);  // 有就绪事件
            if (eavail)
                    goto send_events;  // 直接goto发送数据
    
    
            /*
             * We don't have any available event to return to the caller.  We need
             * to sleep here, and we will be woken by ep_poll_callback() when events
             * become available.
             */
            if (!waiter) {  // 无数据,需要等待
                    waiter = true;  // 设置等待标识
                    init_waitqueue_entry(&wait, current); // 初始化等待队列项
    
                    spin_lock_irq(&ep->wq.lock);
                    __add_wait_queue_exclusive(&ep->wq, &wait); // 投入到 ep->wq 的等待队列中
                    spin_unlock_irq(&ep->wq.lock);
            }
    
            for (;;) {  // 进入无限循环
                    /*
                     * We don't want to sleep if the ep_poll_callback() sends us
                     * a wakeup in between. That's why we set the task state
                     * to TASK_INTERRUPTIBLE before doing the checks.
                     */
                    set_current_state(TASK_INTERRUPTIBLE);  // 设置可中断运行状态
                    /*
                     * Always short-circuit for fatal signals to allow
                     * threads to make a timely exit without the chance of
                     * finding more events available and fetching
                     * repeatedly.
                     */
                    if (fatal_signal_pending(current)) {  // 先判断致命错误信号
                            res = -EINTR;
                            break;
                    }
    
                    eavail = ep_events_available(ep);  // 再判断是否有就绪事件的产生,有的话推出循环
                    if (eavail)
                            break;
                    if (signal_pending(current)) {  // 非致命错误信号产生,中断去处理该中断
                            res = -EINTR;
                            break;
                    }
    
                    // 超时调度
                    if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS)) {
                            timed_out = 1;
                            break;
                    }
            }
    
            __set_current_state(TASK_RUNNING);
    
    send_events:  // 将就绪事件复制到用户空间逻辑开端
            // 1.没有错误产生 2.有就绪事件 3.事件复制到用户空间失败 4.未超时
            // 满足以上4个条件的情况下重新进行获取就绪事件逻辑
            if (!res && eavail &&
                !(res = ep_send_events(ep, events, maxevents)) && !timed_out)
                    goto fetch_events;
    
            // 在等待标志设置的情况下,需要把已添加等待队列节点删除。
            if (waiter) {
                    spin_lock_irq(&ep->wq.lock);
                    __remove_wait_queue(&ep->wq, &wait);
                    spin_unlock_irq(&ep->wq.lock);
            }
    
            return res;
    }
    

    ep_send_events 将就绪的事件复制至用户空间,ep_send_events_proc 为实际的执行函数,ep_scan_ready_list 为辅助函数,这个函数放在后面具体说明,这里只看 ep_send_events_proc 的实现。

    static int ep_send_events(struct eventpoll *ep,
                              struct epoll_event __user *events, int maxevents)
    {
            struct ep_send_events_data esed;
    
            esed.maxevents = maxevents;
            esed.events = events;
    
            ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
            return esed.res;
    }
    
    
    static __poll_t ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
                                   void *priv)
    {
            struct ep_send_events_data *esed = priv;
            __poll_t revents;
            struct epitem *epi, *tmp;
            struct epoll_event __user *uevent = esed->events;
            struct wakeup_source *ws;
            poll_table pt;
    
            init_poll_funcptr(&pt, NULL);  // 初始化poll_table, 但是并不设置 file->f_op->poll 的回调函数
            esed->res = 0;
    
            /*
             * We can loop without lock because we are passed a task private list.
             * Items cannot vanish during the loop because ep_scan_ready_list() is
             * holding "mtx" during this call.
             */
            lockdep_assert_held(&ep->mtx);
    
            // head 实际上为 rdllist,遍历就绪文件链表
            list_for_each_entry_safe(epi, tmp, head, rdllink) {
                    if (esed->res >= esed->maxevents) // 超过用户的提供的缓冲区大小,maxevents 为 epoll_wait(2) 的第3个参数
                            break;
    
                    // __pm_stay_awake(ep->ws);
                    // 为电源保持唤醒状态的处理,略过这部分逻辑
    
                    list_del_init(&epi->rdllink);  // 从就绪文件链表中删除当前事件
    
                    /*
                     * If the event mask intersect the caller-requested one,
                     * deliver the event to userspace. Again, ep_scan_ready_list()
                     * is holding ep->mtx, so no operations coming from userspace
                     * can change the item.
                     */
                    revents = ep_item_poll(epi, &pt, 1);  // 调用 file->f_op->poll() 获取就绪事件的掩码
                    if (!revents)  // 无关注的就绪事件,抬走下一个就绪文件
                            continue;
    
                    // 复制就绪事件至用户空间
                    if (__put_user(revents, &uevent->events) ||
                        __put_user(epi->event.data, &uevent->data)) {
                            list_add(&epi->rdllink, head);  // 复制失败,将当前就绪文件重新链接至就绪文件链表中
                            ep_pm_stay_awake(epi);
                            if (!esed->res)  // 如果一个事件都没有复制,就产生致命错误,毕竟连个毛都没有捞着有点气
                                    esed->res = -EFAULT;
                            return 0;
                    }
                    esed->res++;  // 成功复制的数量
                    uevent++;     // 用户空间的缓冲区增长一下
                    if (epi->event.events & EPOLLONESHOT)  // 用户设置了 EPOLLONESHOT的情况下
                            epi->event.events &= EP_PRIVATE_BITS; // 重新设置关注的事件,见 ep_poll_callback 分析
                    else if (!(epi->event.events & EPOLLET)) {
                            // 未设置边缘触发模式,则将当前就绪文件添加回就绪文件链表中
                            // 这里就区分了边缘触发和水平触发,水平触发必须每次epoll_wait(2)调用都检查就绪文件的事件
                            list_add_tail(&epi->rdllink, &ep->rdllist);
                            ep_pm_stay_awake(epi);
                    }
            }
    
            return 0;
    }
    

    相关就绪事件逻辑

    ep_scan_ready_list 用来扫描就绪文件链表上的就绪文件并处理;ep_poll_callback 用来向就绪文件链表中添加就绪文件。

    ep_scan_ready_list

    就绪文件是否存在判断了两个变量,rdllist 上面分析过了,还有一个备胎的就绪文件链表 ovflist,备胎 ovflist 是在 rdllist 被占用的情况下使用的。

    ep_scan_ready_list 对两个就绪文件列表进行扫描找出就绪的文件,rdllist 是单独放在回调函数中进行处理的,ovflist 是直接在 ep_scan_ready_list 中处理的。

    1. 在对 rdllist 链表处理的回调函数执行前,改变 ovflist 的状态,使得在回调函数执行期间ovflist可以串联起就绪的文件。
    2. 执行回调函数。
    3. 遍历ovflist,将链表上的就绪文件对应的 epitem 插入到 rdllist 上,并且唤醒调用 epoll_wait(2) 线程。
    /**
     * ep_scan_ready_list - Scans the ready list in a way that makes possible for
     *                      the scan code, to call f_op->poll(). Also allows for
     *                      O(NumReady) performance.
     * Returns: The same integer error code returned by the @sproc callback.
     */
    static __poll_t ep_scan_ready_list(struct eventpoll *ep,
                                  __poll_t (*sproc)(struct eventpoll *,
                                               struct list_head *, void *),
                                  void *priv, int depth, bool ep_locked)
    {
            __poll_t res;
            int pwake = 0;
            unsigned long flags;
            struct epitem *epi, *nepi;
            LIST_HEAD(txlist);  // 初始化一个链表,将 eventpoll->rdllist 转移到该链表上
    
            /*
             * Steal the ready list, and re-init the original one to the
             * empty list. Also, set ep->ovflist to NULL so that events
             * happening while looping w/out locks, are not lost. We cannot
             * have the poll callback to queue directly on ep->rdllist,
             * because we want the "sproc" callback to be able to do it
             * in a lockless way.
             */
            spin_lock_irqsave(&ep->lock, flags);
            list_splice_init(&ep->rdllist, &txlist);  // txlist = rdllist; init(rdllist)
            // 这里做一个状态的转换,在文件的等待列队项回调函数(ep_poll_callback)执行时会判断ovflist是否发生改变,
            // 而NULL则为本函数后面的遍历操作使用
            ep->ovflist = NULL;
            spin_unlock_irqrestore(&ep->lock, flags);
    
            /*
             * Now call the callback function.
             */
            res = (*sproc)(ep, &txlist, priv);
    
            spin_lock_irqsave(&ep->lock, flags);
            /*
             * During the time we spent inside the "sproc" callback, some
             * other events might have been queued by the poll callback.
             * We re-insert them inside the main ready-list here.
             */
            for (nepi = ep->ovflist; (epi = nepi) != NULL;
                 nepi = epi->next, epi->next = EP_UNACTIVE_PTR) { // 每次访问的 epi->next 回到最初状态
                    /*
                     * We need to check if the item is already in the list.
                     * During the "sproc" callback execution time, items are
                     * queued into ->ovflist but the "txlist" might already
                     * contain them, and the list_splice() below takes care of them.
                     */
                    // 保留这段代码的注释
                    if (!ep_is_linked(&epi->rdllink)) { // 未被包含进 txlist(rdllist)
                            list_add_tail(&epi->rdllink, &ep->rdllist);  // 添加至正统就绪文件链表 rdllist 中
                            ep_pm_stay_awake(epi);
                    }
            }
            /*
             * We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after
             * releasing the lock, events will be queued in the normal way inside
             * ep->rdllist.
             */
            ep->ovflist = EP_UNACTIVE_PTR;  // 恢复 ovflist的状态, 备胎的寿命到此结束,投下一次胎(下次调用 ep_scan_ready_list)
    
            /*
             * Quickly re-inject items left on "txlist".
             */
            list_splice(&txlist, &ep->rdllist);  // 把对txlist 处理的结果,加入到rdllist,
            __pm_relax(ep->ws);
    
            if (!list_empty(&ep->rdllist)) {  // 如果还有就绪事件,则唤醒epoll_wait(2)线程
                    /*
                     * Wake up (if active) both the eventpoll wait list and
                     * the ->poll() wait list (delayed after we release the lock).
                     */
                    if (waitqueue_active(&ep->wq))
                            wake_up_locked(&ep->wq);
                    if (waitqueue_active(&ep->poll_wait))
                            pwake++;
            }
            spin_unlock_irqrestore(&ep->lock, flags);
    
            if (!ep_locked)
                    mutex_unlock(&ep->mtx);
    
            /* We have to call this outside the lock */
            if (pwake)
                    ep_poll_safewake(&ep->poll_wait);
    
            return res;
    }
    

    ep_poll_callback

    以上为对就绪链表有数据的处理,ep_poll_callback 为向就绪链表添加数据的回调函数。

    ep_poll_callback() 在 ep_insert() 中被设置为目标文件的等待队列项回调函数,当文件的状态发生改变时,会调用 ep_poll_callback() 把目标文件对应的 epitem 添加至就绪链表中。

    ewake 为独占唤醒标志,ep_poll_callback 在 __wake_up_comm 中被调用

    /*
     * This is the callback that is passed to the wait queue wakeup
     * mechanism. It is called by the stored file descriptors when they
     * have events to report.
     */
    static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
    {
            int pwake = 0;
            unsigned long flags;
            struct epitem *epi = ep_item_from_wait(wait);
            struct eventpoll *ep = epi->ep;
            __poll_t pollflags = key_to_poll(key);
            int ewake = 0;
    
            spin_lock_irqsave(&ep->lock, flags);
    
            if (!(epi->event.events & ~EP_PRIVATE_BITS))  // 对EPOLLONESHOT标志的处理
                    goto out_unlock;
    
            // ep->ovflist 的状态修改后作为备胎使用
            if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
                    if (epi->next == EP_UNACTIVE_PTR) {  // 未连接到 ovflist 中
                            epi->next = ep->ovflist;     // 头插法插入链表中
                            ep->ovflist = epi;
                    }
                    goto out_unlock;  // 不能进入正统就绪链表 rdllist的处理逻辑中,其实这里可用不用goto的
            }
    
            /* If this file is already in the ready list we exit soon */
            if (!ep_is_linked(&epi->rdllink)) {  // 当前epitem未连接至就绪列表中时,添加至就绪文件链表 rdllist 中
                    list_add_tail(&epi->rdllink, &ep->rdllist);
                    ep_pm_stay_awake_rcu(epi);
            }
    
            // EPOLLEXCLUSIVE 标志的处理,决定返回的参数
            // pollflags 为目标文件的就绪的事件掩码,可阅读 eventfd_read 的源码
            // 1.关注的事件中出现了读写就绪的事件 2.就绪的事件不为读写事件,满足其中之一则独占唤醒的标志置 1
            if (waitqueue_active(&ep->wq)) {
                    if ((epi->event.events & EPOLLEXCLUSIVE) &&
                                            !(pollflags & POLLFREE)) {
                            switch (pollflags & EPOLLINOUT_BITS) {
                            case EPOLLIN:
                                    if (epi->event.events & EPOLLIN)
                                            ewake = 1;
                                    break;
                            case EPOLLOUT:
                                    if (epi->event.events & EPOLLOUT)
                                            ewake = 1;
                                    break;
                            case 0:   
                                    ewake = 1;
                                    break;
                            }
                    }
                    wake_up_locked(&ep->wq);
            }
            if (waitqueue_active(&ep->poll_wait))
                    pwake++;
    
    out_unlock:
            spin_unlock_irqrestore(&ep->lock, flags);
    
            /* We have to call this outside the lock */
            if (pwake)
                    ep_poll_safewake(&ep->poll_wait);  // 分析同上
    
            if (!(epi->event.events & EPOLLEXCLUSIVE))  // 未设置独占唤醒标志
                    ewake = 1;
    
            return ewake;
    }
    

    文件状态发生改变调用文件的等待队列项上的回调函数 func,在epoll中也就是 ep_poll_callback,ret 为回调函数的独占唤醒标志。
    当等待项设置了WQ_FLAG_EXCLUSIVE标志,只要回调函数返回了独占唤醒标志,在独占唤醒计数消耗完时,退出循环。

    如果当前文件被多个 epoll 监视,那么这个文件的等待队列上就有多个 ep_poll_callback 回调函数,现在设置了等待项的标志 wait_entry->flags |= WQ_FLAG_EXCLUSIVE 后,只会执行 nr_exclusive 个回调函数,而 nr_exclusive 在epoll中为 1,所以就只会把就绪的文件添加到一个 epoll 中,这样避免了惊群效应.

    代码位置 kernel/sched/wait.c, __wake_up_comm()
    
    list_for_each_entry_safe_from(curr, next, &wq_head->head, entry) {
            unsigned flags = curr->flags;
            ret = curr->func(curr, mode, wake_flags, key);
            if (ret < 0)
                    break;
            if (ret && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
                    break;
    }
    

    epoll 间的相互影响及处理

    产生这个最根本的原因就是 epoll作为一个文件既可以监视其他文件,也可以被其他epoll监视。这样就产生了一个监视的有向图

    ep_eventpoll_poll

    ep_eventpoll_poll 文件的poll操作,也就是file->f_op->poll(). 调用该函数可以获取就绪文件的事件掩码,但是 epoll 文件只提供读就绪事件,并且读就绪事件是由非epoll文件的就绪事件决定的。也就是说当一个epoll文件被 select(2)/poll(2)/epoll(2) 监听时,必须该epoll已经监听了其他的非epoll文件(如eventfd), 在调用 该epoll file->f_op->poll() 时才可能返回可读的就绪事件。

    static __poll_t ep_eventpoll_poll(struct file *file, poll_table *wait)
    {
            struct eventpoll *ep = file->private_data;
            int depth = 0;
    
            /* Insert inside our poll wait queue */
            poll_wait(file, &ep->poll_wait, wait);
    
            /*
             * Proceed to find out if wanted events are really available inside
             * the ready list.
             */
            return ep_scan_ready_list(ep, ep_read_events_proc,
                                      &depth, depth, false);
    }
    

    对单独的 epitem 执行 poll 操作,获取就绪的文件事件掩码。

    1. 如果是非 epoll 文件,则执行 file->f_op->poll 操作。
    2. 如果是 epoll 文件,则扫描该 epoll 中就绪文件链表上的 epitem 是否就绪,这里产生了一个递归。

    1 是递归的基准情况,而 ep_scan_ready_list 负责为向前推进

    
    static __poll_t ep_item_poll(const struct epitem *epi, poll_table *pt,
                                     int depth)
    {
            struct eventpoll *ep;
            bool locked;
    
            pt->_key = epi->event.events;
            if (!is_file_epoll(epi->ffd.file))
                    return epi->ffd.file->f_op->poll(epi->ffd.file, pt) &
                           epi->event.events;
    
            ep = epi->ffd.file->private_data;
            poll_wait(epi->ffd.file, &ep->poll_wait, pt);
            locked = pt && (pt->_qproc == ep_ptable_queue_proc);
    
            return ep_scan_ready_list(epi->ffd.file->private_data,
                                      ep_read_events_proc, &depth, depth,
                                      locked) & epi->event.events;
    }
    
    
    // 实际执行读取事件的函数
    static __poll_t ep_read_events_proc(struct eventpoll *ep, struct list_head *head,
                                   void *priv)
    {
            struct epitem *epi, *tmp;
            poll_table pt;
            int depth = *(int *)priv;
    
            init_poll_funcptr(&pt, NULL);
            depth++;
    
            // 遍历该 epoll 中的 rdllist(参数为head)
            list_for_each_entry_safe(epi, tmp, head, rdllink) {
                    if (ep_item_poll(epi, &pt, depth)) {  // 获取该 epitem 的就绪事件,若就绪,则返回为可读的就绪事件掩码
                            return EPOLLIN | EPOLLRDNORM;
                    } else {  // 未就绪抬走下一个 epitem
                            /*
                             * Item has been dropped into the ready list by the poll
                             * callback, but it's not actually ready, as far as
                             * caller requested events goes. We can remove it here.
                             */
                            __pm_relax(ep_wakeup_source(epi));
                            list_del_init(&epi->rdllink);  // 无就绪事件,将当前epitem 从所有者 eventpoll 的就绪链表中删除
                    }
            }
    
            return 0;  // 当前这个 epitem 没有就绪的事件产生。
    }
    

    深度递归调用及死循环的检测

    现在有三个文件

    1. eventfd efd
    2. epoll epfd1
    3. epoll epfd2

    操作如下:

    • epoll_ctl(epfd1, EPOLL_CTL_ADD, efd, IN | OUT);
    • epoll_ctl(epfd2, EPOLL_CTL_ADD, epfd1, IN | OUT);

    现在这三者的关系如下:

    • epfd1 (in) struct_ctx(efd).wqh
    • epitem(efd) (in) eventpoll(epfd1) , epfd2 (in) eventpoll(epfd1).poll_wait

    现在efd就绪,产生了 IN | OUT 事件,这个时候调用 ep_poll_callback(epfd1) 将 epitem(efd) 添加到 eventpoll(epfd1).rdllist 上,唤醒 epoll_wait(2) 和 eventpoll(epfd1).poll_wait 上的等待项, 这里再调用 ep_poll_callback(epfd2) 将 epitem(epfd1) 添加至 eventpoll(epfd2).rdllist 上,唤醒 epoll_wait(2).

    关键点到了,如果现在epfd1 也监视 epfd2

    • 操作 epoll_ctl(epfd1, EPOLL_CTL_ADD, epfd2, IN | OUT).
    • 那么 ep_poll_callback(epfd1) (in) eventpoll(epfd2).poll_wait.

    在 ep_poll_callback(epfd2) 执行时,又会唤醒 eventpoll(epfd2).poll_wait 上的等待项,也就是 ep_poll_callback(epfd1). 所以就有可能出现死循环递归。

    ep_call_nested 函数用来检测嵌套调用,就是针对 epitem 为 epoll 文件的处理。
    我们可以将 epoll 比作一个树里面的一个节点,eventfd 这种文件只能作为叶节点使用,而 epoll 可以不是叶节点。现在我们对这棵树(如果出现相互监视,就变成了图)进行遍历,用 visited 作为已访问标志,检测到其结构中的epitem的文件类型只要是epoll 文件就继续向前推进(访问其子epitem),每次向前推进的时候进行检测,判断是否出现死循环或者递归深度超出范围。

    和上面的 ep_scan_ready_list 处理逻辑有一点相近,就是遍历这些 epoll 文件形成图

    static int ep_loop_check_proc(void *priv, void *cookie, int call_nests)
    {
            int error = 0;
            struct file *file = priv;
            struct eventpoll *ep = file->private_data;
            struct eventpoll *ep_tovisit;
            struct rb_node *rbp;
            struct epitem *epi;
    
            mutex_lock_nested(&ep->mtx, call_nests + 1);
            ep->visited = 1;  // 优化处理,已访问标志
            list_add(&ep->visited_list_link, &visited_list);
            for (rbp = rb_first_cached(&ep->rbr); rbp; rbp = rb_next(rbp)) {
                    epi = rb_entry(rbp, struct epitem, rbn);
                    if (unlikely(is_file_epoll(epi->ffd.file))) {  // epoll 文件
                            ep_tovisit = epi->ffd.file->private_data;
                            if (ep_tovisit->visited)
                                    continue;
                            // 继续向前推进,递归检测
                            error = ep_call_nested(&poll_loop_ncalls, EP_MAX_NESTS,
                                            ep_loop_check_proc, epi->ffd.file,
                                            ep_tovisit, current);
                            if (error != 0)
                                    break;
                    } else {
                            // 该item未添加至文件检测链表中(唤醒风暴检测使用),是的 epoll 虽然叫文件,可是这里并不是一等公民。
                            if (list_empty(&epi->ffd.file->f_tfile_llink))
                                    list_add(&epi->ffd.file->f_tfile_llink,
                                             &tfile_check_list);
                    }
            }
            mutex_unlock(&ep->mtx);
    
            return error;
    }
    
    static int ep_call_nested(struct nested_calls *ncalls, int max_nests,
                              int (*nproc)(void *, void *, int), void *priv,
                              void *cookie, void *ctx)
    {
            int error, call_nests = 0;
            unsigned long flags;
            struct list_head *lsthead = &ncalls->tasks_call_list;
            struct nested_call_node *tncur;
            struct nested_call_node tnode;
    
            spin_lock_irqsave(&ncalls->lock, flags);
    
            list_for_each_entry(tncur, lsthead, llink) {
                    // call_nests 为嵌套的调用深度,cookie 为 eventpoll 结构,ctx 为当前的任务 struct_task,不懂为何呀用当前任务做限定。
                    if (tncur->ctx == ctx &&
                        (tncur->cookie == cookie || ++call_nests > max_nests)) {
                            error = -1;
                            goto out_unlock;
                    }
            }
    
            /* Add the current task and cookie to the list */
            tnode.ctx = ctx;
            tnode.cookie = cookie;
            list_add(&tnode.llink, lsthead);  // 满足条件就添加到静态链表中
    
            spin_unlock_irqrestore(&ncalls->lock, flags);
    
            /* Call the nested function */
            error = (*nproc)(priv, cookie, call_nests);  // 继续调用向前推进
    
            /* Remove the current task from the list */
            spin_lock_irqsave(&ncalls->lock, flags);
            list_del(&tnode.llink);
    out_unlock:
            spin_unlock_irqrestore(&ncalls->lock, flags);
    
            return error;
    }
    

    唤醒风暴的处理

    在进行插入的时候调用该检测操作,作为预防使用;而 EPOLLEXCLUSIVE 产生就绪事件后的处理,两者作用不能混淆。

    唤醒风暴的检查为设定一个限制,epoll 允许唤醒的最大深度为 5,一个文件最多唤醒 path_limits[深度] 的epoll描述符。牵扯到递归的深度,自然是少不了 ep_call_nested 这个检测函数了。
    先看 reverse_path_check 的返回,只有两种情况:

    1. -1 超出最该深度允许唤醒的epoll描述符
    2. 0 在正常范围内
    #define PATH_ARR_SIZE 5
    static const int path_limits[PATH_ARR_SIZE] = { 1000, 500, 100, 50, 10 };
    static int path_count[PATH_ARR_SIZE];
    
    static int path_count_inc(int nests)
    {
            /* Allow an arbitrary number of depth 1 paths */
            if (nests == 0)
                    return 0;
    
            if (++path_count[nests] > path_limits[nests])
                    return -1;
            return 0;
    }
    
    static void path_count_init(void)
    {
            int i;
    
            for (i = 0; i < PATH_ARR_SIZE; i++)
                    path_count[i] = 0;
    }
    
    static int reverse_path_check_proc(void *priv, void *cookie, int call_nests)
    {
            int error = 0;
            struct file *file = priv;
            struct file *child_file;
            struct epitem *epi;
    
            /* CTL_DEL can remove links here, but that can't increase our count */
            rcu_read_lock();
            // 遍历该文件上的 epoll 节点 epitem
            list_for_each_entry_rcu(epi, &file->f_ep_links, fllink) {
                    child_file = epi->ep->file;  // 该 epitem 所属 epoll 实例
                    if (is_file_epoll(child_file)) {  // 文件应该必为 epoll 的
                            if (list_empty(&child_file->f_ep_links)) {  // epoll 未被监视
                                    if (path_count_inc(call_nests)) {  // 判断是否满足调用深度的条件
                                            error = -1;
                                            break;  // 不满足直接返回
                                    }
                            } else {  // 被监视,那就继续调用,往前推进
                                    error = ep_call_nested(&poll_loop_ncalls,
                                                            EP_MAX_NESTS,
                                                            reverse_path_check_proc,
                                                            child_file, child_file,
                                                            current);
                            }
                            if (error != 0)
                                    break;
                    } else {
                            printk(KERN_ERR "reverse_path_check_proc: "
                                    "file is not an ep!
    ");
                    }
            }
            rcu_read_unlock();
            return error;
    }
    
    static int reverse_path_check(void)
    {
            int error = 0;
            struct file *current_file;
    
            /* let's call this for all tfiles */
            // 遍历监视的文件
            list_for_each_entry(current_file, &tfile_check_list, f_tfile_llink) {
                    path_count_init();  // 初始化调用深度的次数为0
                    // 检验可能发生递归的调用
                    error = ep_call_nested(&poll_loop_ncalls, EP_MAX_NESTS,
                                            reverse_path_check_proc, current_file,
                                            current_file, current);
                    if (error)
                            break;
            }
            return error;
    }
    

    引子的解答

    1. epoll(2) 得到就绪事件的复杂度为何是 (O(1))
      - epoll_wait(2) 只扫描就绪文件队列,不用对所有的文件进行判断,见 epoll_wait(2) 的分析。
    2. epoll(2) 和普通的文件相比的区别在哪里,比如和 eventfd(2) 比较
      - 少了 read(2)/write(2) 等文件操作
      - epoll 作为被监视文件只有可读就绪事件,eventfd 拥有读写就绪事件。
      - eventfd 的就绪事件来自文件自身的状态(计数)变化,而epoll的就绪来自监视文件的状态的变化
    3. epoll(2) 相对 poll(2)/select(2) 多提供了 EPOLLET 的触发模式,实现是如何做到的。
      - 区别在于每次调用 epoll_wait(2)在复制就绪事件至用户空间后,水平触发模式会将该文件添加回就绪链表。
    4. epoll(2) 相互关注时,有就绪事件到来会产生相互唤醒的问题,为何会出现这样的问题
      - 见 epoll 间的相互影响及处理
    5. 对于问题 4,内核是如何解决这种相互唤醒的问题。
      - 同 4 解答

    新的问题

    循环检测的时候为何需要限定单个线程(任务)间的 epoll 不同,这个猜测可能和唤醒的机制有关,作为一个问题留下。

    参考

    1. epoll: add EPOLLEXCLUSIVE flagEPOLLEXCLUSIVE 标志的提交代码。
    2. linux 内核poll/select/epoll实现剖析 ,对epoll很好的分析,代码稍微有点旧了,不过还是非常值得一看。
    3. eventfd 源码分析,上一篇对eventfd的分析。
  • 相关阅读:
    PAT Advanced 1067 Sort with Swap(0, i) (25分)
    PAT Advanced 1048 Find Coins (25分)
    PAT Advanced 1060 Are They Equal (25分)
    PAT Advanced 1088 Rational Arithmetic (20分)
    PAT Advanced 1032 Sharing (25分)
    Linux的at命令
    Sublime Text3使用指南
    IntelliJ IDEA创建第一个Groovy工程
    Sublime Text3 安装ftp插件
    Sublime Text3配置Groovy运行环境
  • 原文地址:https://www.cnblogs.com/shuqin/p/11772651.html
Copyright © 2011-2022 走看看