zoukankan      html  css  js  c++  java
  • 【深入浅出Linux网络编程】 “基础 -- 事件触发机制”

    回顾一下“"开篇 -- 知其然,知其所以然"”中的两段代码,第一段虽然只使用1个线程但却也只能处理一个socket,第二段虽然能处理成百上千个socket但却需要创建同等数量的线程,分开来看都不完美,如果1个线程能够处理成百上千个socket就太好了!

    问题在于,当前的实现中1个线程只能阻塞的recv等待网络数据的到来,recv在数据到来之前会挂起并让出cpu直到数据到来后才能继续执行,在此之前cpu是空闲的,并且你也无法获得cpu使用权。

    如果可以趁着这个socket数据没到来之前先处理其他socket而不是苦苦等待一个socket,那一个线程是不是就可以处理多个socket的呢?答案是肯定的。

    通过设置socket为非阻塞模式(O_NONBLOCK),我们在调用recv的时候就不会因为没有数据而挂起了,recv会立即返回并在没有数据的情况下设置errno=EWOULDBLOCK,通过检查返回值和errno,我们便可以获知recv发生了什么。

    【初窥门径】在这个前提下,我们如何在1个线程中同时管理多个socket呢?没那么复杂,我们只需要写一个while(1)死循环,不停的遍历所有的socket,对每个socket调用非阻塞的recv尝试读取一段数据进行处理,并通过send返回应答即可,大致代码如下:

    int main()
    {
        ...
        fcntl(listen_fd, O_NONBLOCK...); /* 设置非阻塞 */
        listen(listen_fd); /* 监听套接字*/
    
        int fd_array[10000] = {0}; /*fd下标的数组*/
        while (1) {
    sleep(1); // 睡眠1秒, 避免cpu负载过高 new_fd = accept(listen_fd); /* 尝试accept一个新socket*/ if (new_fd >= 0) { ... ... fcntl(new_fd, O_NONBLOCK...); fd_array[new_fd]= new_fd; ... } foreach(fd in fd_array) { if (fd == 0) continue; int n = recv(fd, request); /* 尝试从socket recv一段数据*/ if (n > 0) { ... /* 处理request */ send(fd, response); } if (fd has error) { close(fd); fd_array[fd] = 0; } } } }

    首先启动了监听套接字,并设置了非阻塞,然后进入while(1)死循环。在每次循环头部首先调用accept尝试获取一个socket,由于非阻塞的原因,如果没有新连接会立即返回-1,否则设置新socket为非阻塞并放入fd_array数组中记录。接着,由于你不知道哪些socket有数据,于是只能遍历所有曾经accept获得到socket,调用非阻塞recv尝试读取数据,如果的确读到了数据则处理并send返回应答,如果socket发生了错误则关闭socket。

    这段代码成功的实现了1个线程处理多个socket的目标,是完全可行的,但并不完美。你可以回顾一下代码,其中的while(1)死循环将导致这个线程毫不停歇的对socket一遍又一遍的轮询,无论socket是否真的有数据到来,这样简单粗暴的实现会让程序总是100%cpu满负载运转,造成不必要的资源浪费(假设机器只有1颗cpu,还有一堆进程等待cpu调度,势必会对其他进程造成极大的影响)。

    我们还是思考怎么解决这个现状,切忌天马行空。既然while(1)忙轮询造成cpu负载高,那是否可以在while(1)头部sleep一会呢,当然可以通过sleep让出cpu给其他进程使用,但如果sleep太久导致socket数据不被及时处理也会是一个大问题,所以还必须保证sleep挂起的时间足够短,索性就sleep 1毫秒,问题差不多就解决了。

    讲到这里,总算抛足了砖头该看看玉了。回顾一下我们一步一步改进的过程,总算到了这个节骨眼上,貌似基于手头上的工具很难再有所改进了。其实,linux内核开发者也注意到了这一点,为了解决这个切实的问题在内核中实现了一系列的api,目的就是避免我们忙轮询所有socket,转而由内核主动通知哪些socket有数据可读,我们在编码时就不必为遍历socket和sleep多少秒纠结了,新的api会sleep直到某些socket有数据可读才返回,并且直接告诉我们具体是哪些socket可读从而避免了遍历所有socket。

    为了避免误导,这里提示一下:上述只提到了非阻塞模式下的recv操作,没有提到send。实际上,阻塞模式下的socket调用send同样会阻塞,这是由于TCP协议栈滑动窗口已满造成的,可以简单理解为数据拥塞的情况下导致send同样阻塞。在非阻塞模式下,调用socket的send会因为数据拥塞而返回失败,errno同样为EWOULDBLOCK,数据没发送出去只能不停的重试去send,和轮询recv的道理是类似的。为了避免引入太多阅读理解负担,所以在这里理解到这个程度已经完全足够了。


    了解了背景,接下来直奔主题,看看新的api怎么用,怎么结合到之前的代码中。这里有个背景需要介绍一下,linux内核在实现这个功能的时候也并不是一步就做到了今天的样子,它至少经历了select,poll 两个版本的API实现后,才有了今天广泛使用的API:kqueue(freebsd), epoll(linux)。由于我们主要接触的都是linux系统,并且两者从原理上大同小异,所以对freebsd上的kqueue不做介绍,而对于select和poll两个版本的实现由于已经基本没有实用价值,所以暂时不做介绍,有兴趣可以在看完epoll之后搜索引擎简单了解一下。

    【春暖花开】我们马上看一段epoll的使用片段(通过man epoll你可以在manpage里看到epoll这段代码),并与我们上面的非阻塞版本代码进行比较,看清两份代码实现之间的差异,然后逐个介绍其中涉及的API:

           struct epoll_event ev, *events;
    
           for(;;) { // 相当于我们的while(1)
               nfds = epoll_wait(kdpfd, events, maxevents, -1); // 相当于我们的sleep(1)
    
               for(n = 0; n < nfds; ++n) { // 相当于我们的for遍历所有socket
                   if(events[n].data.fd == listener) {   // 相当于我们尝试accept新连接
                       client = accept(listener, (struct sockaddr *) &local,
                                       &addrlen);
                       if(client < 0){
                           perror("accept");
                           continue;
                       }
                       setnonblocking(client);
                       ev.events = EPOLLIN | EPOLLET;
                       ev.data.fd = client;
                       if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, client, &ev) < 0) {
                           fprintf(stderr, "epoll set insertion error: fd=%d0,
                                   client);
                           return -1;
                       }
                   }
                   else
                       do_use_fd(events[n].data.fd);  // 相当于我们recv,处理,send一个socket
               }
           }

    可以看一下代码中的注释, 比对我们实现的非阻塞忙轮询版本代码, 会发现代码逻辑基本能够一一对应,一方面要accept新的socket,一方面要处理已有socket的读与写。为了学习epoll,我们需要关注差异在哪里,以及差异带来了什么好处,解决了什么问题。

    首先笨拙的sleep被换成了epoll_wait,它的第1参数kdpfd是epoll的句柄(epoll_create创建),这个句柄中此前被注册了希望被epoll管理的socket(epoll_ctl注册)。当epoll_wait被调用后,会检查注册其上的socket是否有数据到来或者是否有剩余空间发送数据,如果都没有则会挂起,就像sleep一样睡眠,但与sleep的最大区别在于sleep多久是我们拍脑袋指定一个很小的数值,而epoll_wait会在任意socket可读或者可写的时候返回,这是由内核检测注册其上的socket并在满足条件时唤醒epoll_wait返回的,这就解决了sleep少则cpu繁忙sleep多则增加socket处理延迟的麻烦问题。

    epoll_wait的第2,3个参数分别指定了一个struct epoll_event数组events和数组的大小maxevents,这是一个in/out参数,也就是epoll_wait在返回前会对数组内容进行赋值,其中记录的是发生了可读或者可写或者错误事件的socket以及具体发生的事件类型。这里的新名词”可读事件"表示有数据到来,"可写事件"表示内核缓冲区有剩余发送空间,“错误事件“表示socket发生了一些网络错误。既然epoll_wait在返回时把发生读写事件的socket写到了数组里,我们还需要遍历所有socket吗?当然不必了!借助epoll_wait,我们不必在那些没有任何动静的socket上做无用的recv和send尝试,只要是epoll_wait记录在数组里的socket一定是发生了特定的事件,这又帮我们解决了一个大麻烦。

    for(n = 0; n < nfds; ++n) 遍历struct epoll_event数组,对于listener这个监听socket,调用accept得到新连接,并通过调用epoll_ctl注册到epoll句柄上以便之后的epoll_wait可以检测该socket的读写事件,对于非监听socket则调用do_use_fd函数去读写与请求处理,这里manpage并没有给出什么实际的代码,因为那些与epoll已经没有必然联系了。

    现在你对epoll应该有了一个差不多的认识,但涉及到的结构体和API还没有详细的去看参数与返回值,使用上要注意什么也没有涉及。 在详细学习API之前,首先记住一点概念,epoll监听的是fd(文件描述符)的读,写,错误事件,与socket或者说tcp socket还是udp socket没有必然联系,epoll负责的仅仅是”事件触发“,正合本篇博客标题。

    1,创建一个epoll句柄:

    int epoll_create(int size)

    这里的size参数意义为epoll管理的fd个数的一个建议值,简单说就是预分配多少个fd的管理空间,如果不足会扩容,所以称为建议值,一般填个1000,10000的都无所谓。

    2,向epoll句柄注册,删除,修改socket:

    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
                
    typedef union epoll_data {
                     void *ptr;
                     int fd;
                     __uint32_t u32;
                     __uint64_t u64;
    } epoll_data_t;
    
    struct epoll_event {
                     __uint32_t events;  /* Epoll events */
                     epoll_data_t data;  /* User data variable */
    };
    
    EPOLL_CTL_ADD // 注册fd
    EPOLL_CTL_MOD // 修改fd
    EPOLL_CTL_DEL // 删除fd

    这个函数有3种功能,一个是注册(EPOLL_CTL_ADD)fd到epoll,一个是从epoll删除fd(EPOLL_CTL_DEL),一个是向epoll修改一个已注册的fd(EPOLL_CTL_MOD)。

    第1个参数epfd是epoll句柄,第二个参数op是指上述3个操作类型之一,第三个参数是一个结构体,epoll_event的第一个成员events表示希望epoll监测fd的什么事件,常用包含:EPOLLIN(可读),EPOLLOUT(可写),EPOLLERR(错误),EPOLLHUP(也是错误),你可以通过位或的方式同时包含多个事件。data是一个union,你可以使用其中的一个字段记录一些信息,也就是一个用户参数,在epoll_wait返回的epoll_event数组中会返回给调用者使用。

    3,检测fd事件并返回相关信息:

    int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)

     epfd是epoll句柄(epoll_create),events是用户分配的数组,maxevents是数组的大小,timeout表示多少毫秒没有任何socket发生事件则超时返回的时间,-1表示不超时,函数返回在events数组里填充了多少个fd,于是我们就可以访问events数组里特定数量的fd进行处理了。

    4,关闭epoll句柄:

    int close(int fd);

    也就是关闭epoll_create的返回值,注册其上的fd不会被关闭,仅仅是从这个句柄上取消了注册。

    我们使用epoll完成事件触发所需要做的所有操作都是依靠上述4个接口而已,在上面的代码示例里也对其使用方法和时机有大概的了解了。参考manpage,你应该有能力实现一个用epoll监听fd=0(终端标准输入)的程序,并将读到的文本回显到终端上的小demo,如果你感兴趣可以在这里停下自己去探索一下再回来。

    再次回顾一下上述manpage里的示例代码,在for循环遍历epoll_wait返回的fd数组的时候有一处if else的判定,对于fd=listener则调用了accept相关的逻辑,对于其他的则调用了do_use_fd的逻辑,也就是用户使用epoll的时候必须对epoll_wait返回的fd属于什么应用逻辑进行区分对待,从代码来看会令代码比较冗长,缺乏共性的提取和问题的抽象,用起来并不方便。代码里为了像epoll注册一个fd,需要对struct epoll_event结构各字段赋值,然后调用epoll_ctl,显得过于繁琐。

    为了方便自己使用,我们考虑对epoll进行一定程度的封装和抽象,对接口进行简化,对过程进行抽象,对细节进行隐藏,让epoll用起来轻松愉快~ 

    在开始前,首先使用git把代码拉取下来,我的代码上传在code.csdn.net,你可以通过如下命令获取代码:

    git clone git://code.csdn.net/qq120848369/simple_io.git

    这是我在这系列博客前不久开发的一个小项目,名字叫做simple_io,顾名思义就是代码实现简单,使用简单,并且阅读简单。

    在“基础 -- 事件触发机制”章节里,我们只研究epoll自身,学习如何使用它的各个API,以及真实项目实践中是如何抽象与封装epoll的,即只需要关注sio.h和sio.c两个文件即可,它们对epoll进行了封装与抽象,通过阅读sio.c你可以完全掌握epoll用法与机制,我只会讲解sio实现了什么(接口),为什么实现,但至于如何实现则需要读者自己对照阅读代码,代码并不长,只有200行。

    1,创建与释放epoll句柄:

    struct sio *sio_new();
    void sio_free(struct sio *sio);

    2,向epoll注册一个fd,并提供一个事件回调函数以及用户参数,返回一个fd句柄:

    struct sio_fd *sio_add(struct sio *sio, int fd, sio_callback_t callback, void *arg);

    3,修改一个fd句柄的事件回调函数与用户参数:

    void sio_set(struct sio *sio, struct sio_fd *sfd, sio_callback_t callback, void *arg);

    4,向epoll取消一个fd的注册:

    void sio_del(struct sio *sio, struct sio_fd *sfd);

    5,向epoll注册fd的EPOLLOUT事件:

    void sio_watch_write(struct sio *sio, struct sio_fd *sfd);

    6,向epoll取消注册fd的EPOLLOUT事件:

    void sio_unwatch_write(struct sio *sio, struct sio_fd *sfd);

    7,向epoll注册fd的EPOLLIN事件:

    void sio_watch_read(struct sio *sio, struct sio_fd *sfd);

    8,向epoll取消注册fd的EPOLLIN事件:

    void sio_unwatch_read(struct sio *sio, struct sio_fd *sfd);

    9,调用epoll_wait并处理fd事件,通过回调函数通知用户:

    void sio_run(struct sio *sio, int timeout_ms);

    10,立即唤醒epoll_wait,令其返回(暂时不需要理解这个接口):

    void sio_wakeup(struct sio *sio);

    11,启动定时器,提供超时回调和用户参数:

    void sio_start_timer(struct sio *sio, struct sio_timer *timer, uint64_t timeout_ms, sio_timer_callback_t callback, void *arg);

    12,停止定时器:

    void sio_start_timer(struct sio *sio, struct sio_timer *timer, uint64_t timeout_ms, sio_timer_callback_t callback, void *arg);

    为了学习epoll自身,阅读时暂时忽略定时器的2个接口,忽略唤醒wakeup接口,剩余接口均是对epoll的4个API的抽象与封装,逐个函数阅读理解,有任何疑惑可以留言(我会受到邮件通知),我会第一时间回复。

    为了便于理解,这里需要为sio.c设计上的东西做一些基本的解释,先去读代码,如果你阅读的过程中遇到了障碍,再回来查看,切忌不要直接读下面的内容。

    1,struct sio里的is_in_loop和deferred_to_close是什么用途?

    答:这是为了sio_del接口设计的,当你在sio_run函数之外调用sio_del时,epoll_ctl可以通过EPOLL_CTL_DEL立即从epoll句柄上移除fd,下一次sio_run(epoll_wait调用)就不会检测到这个fd的任何事件了。 首先注意我是将struct sio_fd注册到了struct epoll_event的data字段,考虑在epoll_wait返回之后,sio_run结束之前对各个fd处理的过程中调用sio_del是否会有特殊问题?

    这里的问题就是,假如你在fd=1的回调函数中sio_del了fd=2,并且本次epoll_wait也检测到了fd=2的事件并已经填充到了struct epoll_event  poll_events[64]中,那么接下来处理fd=2的时候就会非法内存操作,因为在fd=1中已经sio_del释放了fd=2的struct sio_fd内存。

    这是一个非常常见的网络库bug,有的网络库为了避免这种问题选择将fd注册到struct epoll_event的data字段,并创建一个fd索引数组fd_array,通过设置fd_array[fd]=NULL标示已关闭从而避免非法操作,但仍然是有bug的,因为极有可能在sio_del后又创建了新fd并且fd=2然后sio_add注册到epoll并令fd_array[fd]!=NULL,这将导致接下来处理fd=2的事件时发生”串门“,也就是说现在的fd=2早已不是epoll_wait时的fd=2,又一个悲剧的bug。

    2,sio_new里忽略SIGPIPE信号是为什么?

    答:摘自man 3p write:EPIPE A write was attempted on a socket that is shut down for writing, or is no longer connected. In the latter case, if the socket is of type,SOCK_STREAM, the SIGPIPE signal is generated to the calling process.

    也就是说,sio.c极有可能被用于开发socket程序,那么就有可能会触发SIGPIPE信号,而该信号默认通常是结束程序,所以需要为用户干掉这个信号。 那么这个信号到底发生于什么情况下呢?首先是TCP连接,其次是当本端收到RST信号的时候,当对端向本端发送了FIN握手之后,如果本端继续向对端发送数据,对端就可能返回RST包,造成SIGPIPE信号的触发。

    3,为什么不检测malloc和realloc的返回值,而要检测epoll_ctl(EPOLL_CTL_ADD)的返回值,返回值检查的原则是什么?

    答:纯属个人习惯,如果内存都不足了,程序崩溃也没什么不可接受的,况且有swap分区的情况下,malloc失败的可能性几乎为0,所以我总是假设内存分配不会失败,这样就不必写大量的返回值检测了,而且真的没必要检测。  而对于向epoll ADD一个fd的调用,是真真切切会失败的,因为一个epoll能够容纳的fd个数是可以通过系统参数配置的,所以我不会假设它成功。

    我在返回值检测方面的原则:来自不可信用户的外部输入需要严格检查,比如网络请求。其他输入不做检查,比如sio.c里的接口绝对不会校验你传入的参数是不是NULL,因为使用者应该有能力明确的保证使用是正确的,这个职责不在于sio自身。 而对于系统API来说,我只检查的确会失败的,不会检查永远不会失败的,这一点还是需要靠manpage和经验。

  • 相关阅读:
    高精度入门(减法、加法、乘法)之 CODE[VS] 3115、3116、3117
    DP经典 之 CODE[VS] 1576 最长严格上升子序列 (O(n^2) 和 O(nlogn))
    CODE[VS] 1098 均分纸牌 ( 2002年NOIP全国联赛提高组)
    C++ string 与 int 等类型 的相互转换
    组合数学 + 大数乘法 + 大数除法 之 hdu 1261 字串数
    自然语言理解 之 统计词频
    Leetcode_10【正则表达式匹配】
    01——字符串反转
    Leetcode_09【回文数】
    Leetcode_07【整数反转】
  • 原文地址:https://www.cnblogs.com/qq120848369/p/3656644.html
Copyright © 2011-2022 走看看