zoukankan      html  css  js  c++  java
  • I/O多路复用 SELECT POLL 内核实现

    等待队列

    先补充个基础知识――等待队列

    认识

    定义

    wait_queue_head_t wait_queue;    

    初始化        

    init_waitqueue_head(&wait_queue);           

    等待

    wait_event(queue, condition)   等待某个条件而进入睡眠

    wait_event_interruptible(queue, condition)  等待某个条件而进入睡眠并允许信号中断睡眠

    wait_event_timeout(queue, condition,timeout) 等待某个条件而进入睡眠 最多等待timeout时间

    wait_event_interruptible_timeout(queue, condition,timeout)

    唤醒

    void wake_up(wait_queue_head_t *queue);   唤醒阻塞在该等待队列上的进程

    void wake_up_interruptible(wait_queue_head_t *queue);

    使用

    假设你的设备驱动程序在中断中接收数据,为用户空间提供读取的操作。

    你可以这样处理:

    1、为简单说明,不考虑同步。

    read()

    {

                If(len > 0)

                        Read...

                        Return len;

        }else {

                    Return 0;

        }

    }

    Irq_handler()

    {

           Recv...

    Add Len

        }

    这是一种非阻塞的实现

    2、

    Read()

    {

              If(wait_event_interruptible(wait_queue, len > 0)) {

                  Return error;

    }

       

    Read...

    Return len;

    }

    Irq_handler()

    {

            recv

            Add len

            wake_up_interruptible(&wait_queue);

    }

    利用等待队列实现的阻塞方式,无数据会把自己放到等待队列中进入睡眠,当数据到来发生中断时,在中断中唤醒睡眠中等待队列上的进程进行处理。当然阻塞其实是和睡眠无关的,这里你无数据可以忙等,但睡眠是更优雅的方式。

    进一步分析

    wait_event

    跟进wait_event(queue, condition)会发现他定义了一个wait_queue_t __wait {.private = current, .func = autoremove_wake_function, },然后将__wait放到了等待队列queue中,即放到了queue的task_list链表中。

    接下来设置当前进程的状态为TASK_UNINTERRUPTIBLE,并调用schedule(),调度并切换到一个新的进程开始运行。

    设置为TASK_UNINTERRUPTIBLE的进程,不会再被系统调度执行,会一直死在这里。到此,该进程让出了CPU不再执行,可以认为他进入了睡眠。

    wake_up

    跟进wake_up(queue),他其实遍历queue的task_list链表,对每个结点(wait_queue_t类型),调用其func函数。

    而此时queue里面应该放着wait_event时放入的__wait,于是wake_up调用了__wait->func函数,__wait->func即autoremove_wake_function函数。

    跟进autoremove_wake_function,发现函数里面调用了try_to_wake_up,其参数就是__wait中赋予的current值,这样就实现了在其他进程或中断中,唤醒之前睡眠的进程。

    try_to_wake_up中的处理比较复杂,不再继续跟了,我们可以确定try_to_wake_up将之前睡眠的进程状态设为TASK_RUNING,这样之前的进程就可以继续被调度执行了,即被唤醒了。

    执行完try_to_wake_up后,将__wait从queue中删除,wake_up的工作就完成了。

    再次回到wait_event

    之前我们知道,进程在调用schedule后就睡了,然后被其他进程或者中断wake_up唤醒了,那么进程唤醒后应该继续在schedule后继续执行。

    继续跟进,schedule返回后,会首先判断条件condition是否成立,如果不成立,再次定义__wait,然后添加到等待队列,schedule睡眠。如果成立,那么wait_event执行完成,进程等待的条件满足,可以继续处理了。

    wait_event_timeout

    wait_event_timeout与wait_event的不同是wait_event调用的是schedule,而wait_event_timeout调用的是schedule_timeout。

    schedule_timeout里面又调用了schedule,但在调用之前,他定义了一个定时器,定时器在指定的timeout超时时,调用wake_up_process,进而调用try_to_wake_up唤醒进程。也就是说wait_event_timetou除了依赖于其他进程或中断唤醒自己,本身还有个定时器可以唤醒自己。

    select

    我们知道select同时可以监视多个描述符,只要任一个有事件,就可以直接返回处理。如果都没有事件则select睡眠等待,并且任一一个描述符有事件就可以唤醒select。其实现是基于等待队列的。原理简单的讲就是每个描述符都对应一个等待队列,每个描述符对应的驱动都提供一个poll方法。Select调用描述符的poll方法,检查是否有事件,当没有事件时,定义一个wait_queut_t的对象,放到描述符的等待队列中。当select检查到没有事件进入睡眠后,任一个描述符有事件,执行唤醒等待队列的操作就可以唤醒select。

    Select的系统调用sys_select,在fs/select.c中(linux 2.6.27内核),其调用路径为sys_select -> core_sys_select -> do_select。接下来我们看下slect系统调用的具体实现,代码比较多,只捡重点的部分看,其他细节有时间再研究。

    用户空间在使用select时,会定义fd_set类型的变量,对应于不同的事件有readset、writeset、exceptset,其实他们都是unsigned long类型的数组,数组中的每一位标识一个fd,我们常用的FD_SET(fd, set),是将set中的数组的第fd位设为1。我们关心fd的那几个事件,就将相应的set的第fd位置一,传给内核,通知内核帮我监视,有情况告诉我。通过看内核对fd_set的定义,可以看出fd_set是一个1024位的数组,也就是最多支持1024个fd,如果需要支持更多的fd,需要修改代码重新编译内核了。

    内核空间中,core_sys_select函数首先定义了一个long类型的数组,如果fd个数多,数组不够,他会调用kmalloc,动态申请一个数组。数组的使用分为六块,如下图所示,每块其实都是一个小的fd_set,只是fd_set是固定长度(1024位,注意是位不是字节)的数组,但这里每块的长度是和真实的fd的个数有关的。

    接下来core_sys_select调用get_fd_set将用户空间传递的readset、writeset、exceptset拷贝到in、out、ex中,然后调用do_select,将这个大数组传给他。do_select通过in、out、ex里面的位标识,得到要监视哪些fd,监视哪些事件(read、write、except),将监视的结果记录到res_in、res_out、res_ex中。返回到core_sys_select,程序调用set_fd_set将res_in、res_out、res_ex中的结果,拷贝到用户空间。select系统调用返回,就获得事件处理了。

    上一步提到了do_select,我们进一步研究研究他。

    首先设置当前进程状态 set_current_state(TASK_INTERRUPTIBLE);(这块我还不是很了解,内核没有抢占吗,如果设置状态后,切换出去了,岂不永远都切不回来了,一是此时还没添加唤醒的处理,不会有其他进程唤醒他,二是CPU不会调度TASK_INTERRUPTIBLE状态的进程执行。那么这里是没有内核抢占还是设置了TASK_INTERRUPTILBE的进程不会没抢占?

    然后循环扫描的in、out、ex中的信息(哪些fd关心read事件、哪些fd关心write事件、哪些fd关心except事件),调用具体的fd的驱动相关的poll函数获取fd的事件的状态,根据返回的状态,将结果设置到res_in、res_out、res_ex。其实很简单,如果in中的第n位为一,标识fd=n的描述符关心read事件,在调用fd=n对应的驱动的poll之后,如果有read事件,则将res_in中的n位置一。

    (cond_resched这个函数是做什么的?)

    在处理完一轮后(处理完了in、out、ex中的请求),如果fd请求的事件发生了,则返回,如果都没有发生则调用schedule_timeout,进入睡眠,等待事件到来时被唤醒。

    好,我们看看,do_select是怎样在有事件时被唤醒的。在这之前,我们先想想如果我们自己来做,如何利用等待队列实现。大体思路,我们应该定义一个等待队列wait_queue_head_t queue,select在没有事件时,定义一个wait_queue_t的对象wait放到queue中,然后调度schedule进入睡眠。在驱动中,当事件到来时,遍历等待在queue的wait并唤醒。其实内核实现就是这个思路,支持阻塞IO的驱动实现中,通常会定义三个等待队列,对应于read、write、except,select调用到poll中时,如果没有事件,会定义一个wait_queue_t的wait放到等待队列中,当驱动检查到事件发生时,会唤醒睡在等待队列上的进程。

    接下来看看select在睡之前做了哪些准备工作,怎样将wait加入到等待队列中的。

    先了解一下do_select中使用的一个数据结构

    struct poll_wqueues {

      poll_table pt;

      struct poll_table_page *table;

      struct task_struct *polling_task;

      int triggered;

      int error;

      int inline_index;

      struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES];

    }

    do_select声明了这样类型的一个对象table,然后初始化其成员polling_task = current, pt->qproc = __pollwait。

    接下来在调用各fd对应的驱动的poll时,将table.pt(poll_table类型)作为参数传入。

    我们知道各个驱动模块实现的各自的poll函数中,如果自己没有read、write、except事件,会调用poll_wait函数,参数wait_address是驱动中声明的等待队列,p是调用poll时传入的table.pt。以下是poll_wait的实现:

    static inline void poll_wait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p)

    {

      p->qproc(file, wait_address, p);

    }

    可以看到poll_wait中调用的p->qproc就是之前初始化poll_wqueue时,指定的__pollwait函数。

    static void __pollwait(struct file *filp,wait_queue_head_t *wait_address, poll_table *p);

    struct poll_table_entry {

      struct file *filp;

      unsigned long key;

      wait_queue_t wait;

      wait_queue_head_t *wait_address;

    }

    __pollwait中首先获取一个poll_table_entry类型的变量entry,获取其实是在poll_wqueue的inline_entries中拿的。然后初始化entry,entry->file = file;entry->key = p->key;entry->wait.func = pollwake,最后将entry->wait添加到等待队列wait_address中。

    所有的准备工作做好了,如果没有事件产生,do_select调度schedule进入了睡眠。

       

    唤醒一般在中断或者软中断中处理的,一般在检查到事件到来时,驱动中会调用wake_up函数,参数为驱动中定义的等待队列。

    追踪wake_up函数,最终调用了__wake_up_common,在这个函数中,遍历wait_queue_head_t中的结点,每个结点是wait_queue_t类型,调用每个结点的func指针指向的函数。前面我们知道func指针指向了pollwake,pollwake最终通过调用try_wake_up唤醒了进程。

    pollwake->__pollwake->default_wake_function->try_to_wake_up

    wait_queue_t中记录了要被唤醒的进程的task_struct结构,因此通过以上系列调用,最终实现了睡眠进程的唤醒。

    POLL

    poll与select的流程基本一致,其调用路径为sys_poll->do_sys_poll->do_poll->do_pollfd

    do_sys_poll将用户空间的pollfd拷贝到内核空间,初始化poll_wqueues table对象,其使用与select相同。调用do_poll,取得需监视的fd的状态,然后将状态拷贝到用户空间,返回。

    do_poll与do_select类似,查询事件,没事件睡眠。只是do_poll中使用pollfd,do_select使用long类型中的每一位记录状态。

    do_pollfd实现对poll的调用,然后将状态记录到pollfd中。

    我们看看select与poll的不同

    select使用fd_set记录要检查的描述符,该结构本身是1024位,也就限制了最多只能检测1024个描述符。

    poll使用pollfd结构的数组,检测多少个描述符,就传递多大的数组就可以了。

    struct pollfd {

        int fd;

        short events;

        short revents;

    };

    select使用的fd_set记录输入输出,每次返回后,返回的结果就把系统调用时传入的信息给覆盖掉了,因此每次调用select都需要给fd_set赋值。

    poll使用pollfd结构,events记录要检测的事件,revents记录结果,pollfd初始化一次就可以了,以后每次poll调用不需要重新初始化pollfd。

    不知不觉写这么多了,epoll的探究再开一片吧。

    由于也是边查资料边看代码边整理,是一个学习的过程,思路有点跳跃不连贯,欢迎拍砖,接下来我会再次整理,屡屡思路。

  • 相关阅读:
    Delphi ADOQuery连接数据库的查询、插入、删除、修改
    Delphi开发的一些技巧
    获取的数据载入listview控件中
    第一个Directx程序
    edit编辑框只能输入数字和一个小数点
    (原创) 一个通用的C++ 消息总线框架
    springboot 集成logback 及配置,日志格式,重复打印配置
    算法09未排序数组中累加和为给定值的最长子数组长度
    算法12猫狗队列
    算法06由两个栈组成的队列
  • 原文地址:https://www.cnblogs.com/jintianfree/p/3901741.html
Copyright © 2011-2022 走看看