zoukankan      html  css  js  c++  java
  • epoll源码分析

    epoll源码分析

    最近在使用libev过程中遇到一个场景:一个fd从一个ev_loop迁移到另一个ev_loop,会出现这个fd同时存在两个epoll的瞬间。
    不禁要问了,一个fd同时被两个epoll监视的行为是怎样的,epoll嵌套使用是怎样实现的?为此,整理了以前读的epoll源码。

    概述

    epoll的扩展性和性能关键在于两个数据结构: 0) 一个rbtree; 1) 一个ready list.
    epoll是有状态的, 内核中维护了一个数据结构用来管理所要监视的fd,这个数据结构是eventpoll.
    在eventpoll中有一颗红黑树, 用来快速的查找和修改要监视的fd,每个节点被封装成epitem结构.
    在eventpoll中有一个列表, 用来收集已经发生事件的epitem, 这个list叫ready list.

    epoll系统的初始化

    eventpoll_init()
    {
        eventpoll_mnt = kern_mount(&eventpoll_fs_type);
    
        epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem),
            0, SLAB_HWCACHE_ALIGN|EPI_SLAB_DEBUG|SLAB_PANIC,
        NULL, NULL);
    
        pwq_cache = kmem_cache_create("eventpoll_pwq",
        sizeof(struct eppoll_entry), 0,
        EPI_SLAB_DEBUG|SLAB_PANIC, NULL, NULL);
    
        error = register_filesystem(&eventpoll_fs_type);
        eventpoll_mnt = kern_mount(&eventpoll_fs_type);
    }
    

    init初始化代码很简单:
    1. 申请epitem的缓冲;
    2. 申请eppoll_entry的缓冲;
    3. 把epoll和文件系统关联起来.

    下图是 epoll和VFS的关联:
    epoll_

    epoll创建 - epoll_create

    SYSCALL_DEFINE1(epoll_create1, int, flags)
    {
        int error, fd;
        struct eventpoll *ep = NULL;
        struct file *file;
    
        error = ep_alloc(&ep);
        fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
        file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep, (flags & O_CLOEXEC));
        fd_install(fd, file);
        ep->file = file;
        return fd;
    }
    
    1. error = ep_alloc(&ep); 分配一个epollevent结构体;
    2. 把ep和文件系统的inode, file关联起来。

    epoll添加事件 - epoll_ctl

    asmlinkage long
    sys_epoll_ctl(int epfd, int op, int fd, struct epoll_event __user *event)
    {
        struct file *file, *tfile;
        struct eventpoll *ep;
        struct epitem *epi;
        struct epoll_event epds;
    
        tfile = fget(fd);
        // 判断epfd是否是一个epoll
        if (file == tfile || !is_file_epoll(file))
            goto eexit_3;
    
        // 从private_data中取出eventpoll指针
        // 并且上锁, 因此一个epoll_ctl是线程安全的
        ep = file->private_data;
        down_write(&ep->sem);
    
        // 尝试着从红黑树ep->rbr上找到tfile对应的一个epitem
        epi = ep_find(ep, tfile, fd);
    
        error = -EINVAL;
        switch (op) {
            case EPOLL_CTL_ADD:
            if (!epi) { // 如果是ADD操作,并且这个fd不在eventpoll里,则执行插入操作,注意:内核会主动加上POLLERR和POLLHUP事件
                epds.events |= POLLERR | POLLHUP;
            error = ep_insert(ep, &epds, tfile, fd);
            } else // 否则设置error
            error = -EEXIST;
            clear_tfile_check_list();
            break;
            case EPOLL_CTL_DEL:
            if (epi)
                error = ep_remove(ep, epi);
            else
                error = -ENOENT;
            break;
        case EPOLL_CTL_MOD:
           if (epi) {
               epds.events |= POLLERR | POLLHUP;
               error = ep_modify(ep, epi, &epds);
           } else
               error = -ENOENT;
               break;
           }
        }
    }
    

    ep_insert插入事件

    下图是epoll的数据结构。root指向红黑树的树根;rdlist指向待收割事件的列表ready list:
    epoll__tree_readylist

    static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
    struct file *tfile, int fd)
    {
        int error, revents, pwake = 0;
        unsigned long flags;
        struct epitem *epi;
        struct ep_pqueue epq;
    
        // 从slab中分配一个epitem
        if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
            return -ENOMEM;
    
        // 初始化epi
        INIT_LIST_HEAD(&epi->rdllink);
        INIT_LIST_HEAD(&epi->fllink);
        INIT_LIST_HEAD(&epi->pwqlist);
        epi->ep = ep;
        ep_set_ffd(&epi->ffd, tfile, fd);
        epi->event = *event;
        epi->nwait = 0;
        epi->next = EP_UNACTIVE_PTR;
    
        // 调用tcp_poll
        // 在tcp_sock->sk_sleep中插入一个等待者
        epq.epi = epi;
        init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
    
        // 0) 向fd添加一个回调让其有事件发生时通知epoll;
        // 1) 同时, 可能此时已经有事件存在了, revents返回这个事件
        revents = tfile->f_op->poll(tfile, &epq.pt);
    
    
        // 把这个epi添加到红黑树中
        ep_rbtree_insert(ep, epi);
    
        error = -EINVAL;
        if (reverse_path_check())
            goto error_remove_epi;
    
        spin_lock_irqsave(&ep->lock, flags);
    
    
        // 如果此时有事件到来,并且没有把epi添加到就绪队列,则添加到epoll的就绪队列
        if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
            list_add_tail(&epi->rdllink, &ep->rdllist);
    
        // 并且唤醒一个正在等在这个epoll管理的fd的进程
        if (waitqueue_active(&ep->wq))
            wake_up_locked(&ep->wq);
        // 并且唤醒一个正在等在这个epoll本身的进程
        if (waitqueue_active(&ep->poll_wait))
            pwake++;
        }
    
        spin_unlock_irqrestore(&ep->lock, flags);
    
        atomic_inc(&ep->user->epoll_watches);
    
        // 在ep->lock锁的外面唤醒嵌套epoll
        if (pwake)
            ep_poll_safewake(&ep->poll_wait);
        return 0;
    }
    

    在插入一个fd到epoll中会显示调用一次poll, 对于tcp来说是tcp_poll.
    来看看poll是如何初始化和被调用的.

    tcp_poll

    下图是网卡硬件中断触发epoll_wait返回的调用路径:
    _

    // tcp 协议初始化
    static struct inet_protosw inetsw_array[] =
    {
      {
        .type =       SOCK_STREAM,
        .protocol =   IPPROTO_TCP,
        .prot =       &tcp_prot,
        .ops =        &inet_stream_ops,
        .capability = -1,
        .no_check =   0,
        .flags =      INET_PROTOSW_PERMANENT |
        INET_PROTOSW_ICSK,
      },
    ...
    ...
    ...
    }
    
    // tcp_poll才是最终的调用函数
    const struct proto_ops inet_stream_ops = {
        .family                       = PF_INET,
        .owner                = THIS_MODULE,
        .bind                 = inet_bind,
        .accept               = inet_accept,
        .poll                 = tcp_poll,
        .listen                           = inet_listen
    }
    

    tcp_poll的逻辑

    static unsigned int sock_poll(struct file *file, poll_table * wait)
    {
        struct socket *sock;
        sock = file->private_data;
        return sock->ops->poll(file, sock, wait);
    }
    
    // 0) 注册事件到tcp中;
    // 1) 返回此时已经发生的事件.
    unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
    {
        unsigned int mask;
        struct sock *sk = sock->sk;
        struct tcp_sock *tp = tcp_sk(sk);
    
        // 注册一个回调到sk->sk_sleep中
        // 注意, wait为空时忽略注册动作
        poll_wait(file, sk->sk_sleep, wait);
    
        // 如果是监听套接字,则inet_csk_listen_poll
        if (sk->sk_state == TCP_LISTEN)
            return inet_csk_listen_poll(sk);
    
        mask = 0;
        if (sk->sk_err)
            mask = POLLERR;
    
        // copied_seq 和 rcv_nxt 不相等,则说明有未读数据出现了
        if ((1 << sk->sk_state) & ~(TCPF_SYN_SENT | TCPF_SYN_RECV)) {
            if ((tp->rcv_nxt != tp->copied_seq) && (tp->urg_seq != tp->copied_seq || tp->rcv_nxt != tp->copied_seq + 1 || sock_flag(sk, SOCK_URGINLINE) || !tp->urg_data))
            mask |= POLLIN | POLLRDNORM;
    
            if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
            mask |= POLLOUT | POLLWRNORM;
            }
        }
    }
    

    看看如何注册回调到tcp socket中

    // 反向调用poll_table->qproc,注册一个poll_callback
    static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
    {
        if (p && wait_address)
            p->qproc(filp, wait_address, p);
    }
    
    
    // 注册poll_callback到sock->sk_sleep上
    // 0) file是sock对应的file句柄;
    // 1) whead是sock->sk_sleep
    static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt)
    {
        struct epitem *epi = ep_item_from_epqueue(pt);
        struct eppoll_entry *pwq;
    
        if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, SLAB_KERNEL))) {
            init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
        pwq->whead = whead;
        pwq->base = epi;
        add_wait_queue(whead, &pwq->wait);
        list_add_tail(&pwq->llink, &epi->pwqlist);
        epi->nwait++;
        } else {
            /* We have to signal that an error occurred */
        epi->nwait = -1;
        }
    }
    
    

    // 只要socket上有事件发生就会回调上面注册的回调

    poll_callback的回调

    数据包到达:
    PKT Arrive INT
    --> Driver
    --> 0) alloc_skb; 1) netif_rx
    --> RX_SOFTIRQ
    --> net_rx_action软中断处理函数 (dev->poll)
    --> process_backlog
    --> netif_receive_skb
    --> tcp_v4_rcv()
    --> tcp_v4_do_rcv
    --> tcp_rcv_state_process
    --> sock_def_wakeup
    --> ep_poll_callback

    回调

    static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
    {
    
        int pwake = 0;
        unsigned long flags;
    
        // 通过wait找到epoll_entry
        // 通过epoll_entry->base找到epitem
        struct epitem *epi = ep_item_from_wait(wait);
        struct eventpoll *ep = epi->ep;
    
        spin_lock_irqsave(&ep->lock, flags);
    
        if (!(epi->event.events & ~EP_PRIVATE_BITS))
            goto out_unlock;
    
        if (key && !((unsigned long) key & epi->event.events))
            goto out_unlock;
    
        // 把当前epitem添加到ready list中,等待收割
        if (!ep_is_linked(&epi->rdllink))
            list_add_tail(&epi->rdllink, &ep->rdllist);
    
        // 在收到数据包的回调中唤醒等待在epll上的进程    
        if (waitqueue_active(&ep->wq))
            wake_up_locked(&ep->wq);
        // 唤醒嵌套epoll的进程
        if (waitqueue_active(&ep->poll_wait))
            pwake++;
    
        pin_unlock_irqrestore(&ep->lock, flags);
    
        if (pwake)
            ep_poll_safewake(&ep->poll_wait);
    
    return 1;
    }
    

    下面看看用户态如何收割事件.

    epoll事件收割 - epoll_wait

    SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events, int, maxevents, int, timeout)
    {
        error = ep_poll(ep, events, maxevents, timeout);                 
    }
    
    static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout)
    {
        int res = 0, eavail, timed_out = 0;
    
        if (timeout > 0) {
            struct timespec end_time = ep_set_mstimeout(timeout);
    
        slack = select_estimate_accuracy(&end_time);
        to = &expires;
        *to = timespec_to_ktime(end_time);
        } else if (timeout == 0) {
            timed_out = 1;
        spin_lock_irqsave(&ep->lock, flags);
        goto check_events;
        }
    
        fetch_events:
        spin_lock_irqsave(&ep->lock, flags);
    
         // 如果ready list为空
         if (!ep_events_available(ep)) {
             init_waitqueue_entry(&wait, current);
         wait.flags |= WQ_FLAG_EXCLUSIVE;
         // 把当前进程添加到等待队列中
         __add_wait_queue(&ep->wq, &wait);
    
         for (;;) {
             // 设置进程的状态为TASK_INTERRUPTIBLE,以便在ep_poll_callback将其唤醒
             set_current_state(TASK_INTERRUPTIBLE);
             // ready list非空
             if (ep_events_available(ep) || timed_out)
                 break;
    
                // 有信号返回EINTR 
            if (signal_pending(current)) {
                res = -EINTR;
            break;
                }
    
            // 解锁准备调度出去
            spin_unlock_irqrestore(&ep->lock, flags);
            if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
                timed_out = 1;
                // 再次运行后,第一件事就是获取锁    
            spin_lock_irqsave(&ep->lock, flags);
            }
        __remove_wait_queue(&ep->wq, &wait);
    
        set_current_state(TASK_RUNNING);
        }
    
        eavail = ep_events_available(ep);
        // 开始收割事件
        ep_send_events(ep, events, maxevents);
    }
    
    
    static int ep_send_events(struct eventpoll *ep, struct epoll_event __user *events, int maxevents)
    {
        struct ep_send_events_data esed;
    
        esed.maxevents = maxevents;
        esed.events = events;
    
        return ep_scan_ready_list(ep, ep_send_events_proc, &esed);
    }
    
    static int ep_scan_ready_list(struct eventpoll *ep, int (*sproc)(struct eventpoll *, struct list_head *, void *), void *priv)
    {
        int error, pwake = 0;
        unsigned long flags;
        struct epitem *epi, *nepi;
        LIST_HEAD(txlist);
    
        // 上锁,和epoll_ctl, epoll_wait互斥
        mutex_lock(&ep->mtx);
    
        // 原子的置换readlist 到 txlist中
        // 并且开启ovflist, 使得在sproc执行过程中产生的事件存入其中, 是一个事件的临时停靠点
        spin_lock_irqsave(&ep->lock, flags);
        list_splice_init(&ep->rdllist, &txlist);
        ep->ovflist = NULL;
        spin_unlock_irqrestore(&ep->lock, flags);
    
        // 开始调用sproc组织事件到用户空间的数组中
        error = (*sproc)(ep, &txlist, priv);
    
        spin_lock_irqsave(&ep->lock, flags);
        for (nepi = ep->ovflist; (epi = nepi) != NULL;
            nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
            // 把sproc执行期间产生的事件加入到ready list中, 但是有可能这些新诞生的事件到目前为止还在txlist中
        // 也就是, 有可能sproc并没有消耗完本次的ready list,那么剩下的事件要等到下次epoll_wait来收割
        // 所以,
        //     0) 需要去重, 这是通过ep_is_linked(&epi->rdllink)来做到的, 因为如果这个epi在txlist中, 它的rdllikn非空;
        //     1) 需要把还没有被收割到用户空间的事件再次的放入ready list中, 并且要保证这些事件在新诞生的事件的前面, 这是通过list_splice做到的.
        if (!ep_is_linked(&epi->rdllink))
            list_add_tail(&epi->rdllink, &ep->rdllist);
        }
    
        // 关闭ovflist 
        ep->ovflist = EP_UNACTIVE_PTR;
    
        list_splice(&txlist, &ep->rdllist);
    
        // 唤醒
        if (!list_empty(&ep->rdllist)) {
            if (waitqueue_active(&ep->wq))
            wake_up_locked(&ep->wq);
        if (waitqueue_active(&ep->poll_wait))
            pwake++;
        }
        spin_unlock_irqrestore(&ep->lock, flags);
    
        mutex_unlock(&ep->mtx);
    
        if (pwake)
            ep_poll_safewake(&ep->poll_wait);
    
        return error;
    }
    

    事件是怎么被收割到用户空间的

    static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head, void *priv)
    {
        struct ep_send_events_data *esed = priv;
        int eventcnt;
        unsigned int revents;
        struct epitem *epi;
        struct epoll_event __user *uevent;
    
        // 这个函数不需要再上锁了
        // 收割事件的个数上限是esed->maxevents
        for (eventcnt = 0, uevent = esed->events; !list_empty(head) && eventcnt < esed->maxevents;) {
            epi = list_first_entry(head, struct epitem, rdllink);
    
        // 已经被收割的事件要从txlist中移除掉, 很重要.
        // 因为,并不是txlist上的所有的事件都会被收割到用户空间
        // 剩下的未收割的事件要再次的放回到ready list
        list_del_init(&epi->rdllink);
    
        // 显示的tcp_poll一次事件, 看看这个fd上发生了什么事情, 并和自己关心的事件做交集
        revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) & epi->event.events;
    
        if (revents) {
            // 回传到用户空间            
            if (__put_user(revents, &uevent->events) ||__put_user(epi->event.data, &uevent->data)) {
                list_add(&epi->rdllink, head);
            return eventcnt ? eventcnt : -EFAULT;
            }
            eventcnt++;
            uevent++;
            if (epi->event.events & EPOLLONESHOT)
                epi->event.events &= EP_PRIVATE_BITS;
            else if (!(epi->event.events & EPOLLET)) {
                // 如果是LT模式要再次放入到ready list中
            // 难道这个事件就一直在ready list中了? 用户态的epoll_wait岂不是每次都会收割到事件?什么时候会被剔除掉?
            // 非也(以读事件为例):
            //     0) 如果用户态在epoll_wait中获取到了一个epi事件, 并没有处理, 那么这个事件是一直存在在fd上的(举个例子: 可读状态会一直处于可读, rcv_nxt>copied_seq)
            //     1) 用户态代码不读取数据或仅仅读取了部分数据, 为了保证LT语义, 下次epoll_wait时候能够再次获取到改epi, 这个epi必须要保存到ready list中;
            //     2) 用户态代码一直读取这个fd上的数据直到EGAIN, 下次epoll_wait的时候任然会从ready list中碰到这个事件, 但此时tcp_poll不会返回可读事件了, 所以此后会从ready list中剔除掉.
            //     3) 也就是, epoll事件的剔除是发生在下一次epoll_wait中
            list_add_tail(&epi->rdllink, &ep->rdllist);
            }
            }
        }
        return eventcnt;
    }
    

    自问自答

    2_epoll
    问: 一个fd加入到多个epoll行为如何?
    答: 统一个fd通过epoll_ctl添加到两个epoll中,在epoll_ctl流程中会通过tcp_poll调用在fd->sock->sk_sleep中插入一个回调。也就是说,两次epoll_ctl就会往同一个fd的sk_sleep中插入两个回调。在有事件到来时会遍历sk_sleep上所有的回调。所以,会触发两次epoll_wait返回。
    问:epoll的LT和ET如何实现的?和具体的poll()有关吗?
    答:具体的poll()函数是无感知LT和ET的。tcp_poll在state change时候会回调sk_sleep上的回调。epoll在收割事件的时候会判断是ET还是LT,如果是ET则把epi从ready list移除掉,并且加入到用户态的events数组中,所以下次epoll_wait就不会收割到这个事件了,除非state change又发生了变化触发了回调;如果是LT除了把epi加入到用户态的events数组中,还会再次加入到ready list之后,下次epoll_wait会再次返回,但是并不会始终返回。

  • 相关阅读:
    Redis 学习(二十)服务器
    Redis 学习(十八)连接
    Redis学习(十七) 脚本
    Redis学习(十六)事务
    Redis学习(十四) 发布订阅
    python中如何使用requests模块下载文件并获取进度提示?
    Python实例获取mp3文件的tag信息
    python 视频处理,提取视频相关帧,读取Excel
    爬虫数据采集技术趋势-智能化解析
    Python中文转拼音代码(支持全拼和首字母缩写)
  • 原文地址:https://www.cnblogs.com/diegodu/p/9377535.html
Copyright © 2011-2022 走看看