zoukankan      html  css  js  c++  java
  • Linux非阻塞IO(六)使用poll实现非阻塞的服务器端

    关于poll模型监听的事件以及返回事件,我们定义宏如下:

    #define kReadEvent (POLLIN | POLLPRI)
    #define kWriteEvent (POLLOUT | POLLWRBAND)
    #define kReadREvent (POLLIN | POLLPRI | POLLRDHUP)
    #define kWriteREvent (POLLOUT)

    前面我们说明了,为什么非阻塞IO必须具备缓冲区。事实上,对于server而言,每条TCP连接应该具有两个缓冲区,一个用于输入,一个用于输出。

    sockfd –>  InputBuffer –> 用户空间 –> 处理数据 –> 得到结果 –> OutputBuffer –> sockfd

    但是本例仅仅是简单的回射服务,收到数据立刻发出,所以我们只使用一个缓冲区。

    对于每个TCP连接,我们对应这样的一个数据结构:

    typedef struct{
        buffer_t buffer_;
    } tcp_connection_t; //表示一条TCP连接

    那么我们需要建立一个从fd到对应的tcp_connection_t的对应关系。我使用以下的数组:

    tcp_connection_t *connsets[EVENTS_SIZE]; //提供从fd到TCP连接的映射

    这个数组初始化为NULL,然后我使用fd作为下标,也就是说,当我需要处理某fd的时候,以该fd为下标就可以找到它相应的tcp_connection_t指针。这本质上是一种哈希表的思想

    使用tcp_connection_t的逻辑是:

    每当accept一个新的连接,就动态创建一个新的tcp_connection_t,并且将其指针保存至以peerfd为下标的位置中

    每当连接关闭,需要释放内存,同时重置指针为NULL。

    我将所有的代码全部写入main函数中,因为前面封装了buffer,代码的可读性已经提高,再加上我们的业务逻辑简单,进一步封装,反而降低可读性。

    这里跟client有几处相同点:

    仅当缓冲区有空闲空间时才监听read事件

    仅当缓冲区有数据时,才监听write事件

    所以我们每次进行poll调用前都需要将fd的监听事件重新装填。

    server的代码整体框架如下:

    //初始化poll
    
    while(1)
    {
        //重新装填fd数组
    
        //poll系统调用
    
        //依次处理每个fd的读写事件
    }

    所以完整的代码如下:

    #define _GNU_SOURCE             /* See feature_test_macros(7) */
    #include <sys/socket.h>
    #include "sysutil.h"
    #include "buffer.h"
    #include <assert.h>
    
    #define EVENTS_SIZE 1024
    
    typedef struct{
        buffer_t buffer_;
    } tcp_connection_t; //表示一条TCP连接
    
    tcp_connection_t *connsets[EVENTS_SIZE]; //提供从fd到TCP连接的映射
    
    int main(int argc, char const *argv[])
    {
        //获取监听fd
        int listenfd = tcp_server("localhost", 9981);
        //将监听fd设置为非阻塞
        activate_nonblock(listenfd);
    
        //初始化connsets
        int i;
        for(i = 0; i < EVENTS_SIZE; ++i)
        {
            connsets[i] = NULL;
        }
    
        //初始化poll
        struct pollfd events[EVENTS_SIZE];
        for(i = 0; i < EVENTS_SIZE; ++i)
            events[i].fd = -1;
        events[0].fd = listenfd;
        events[0].events = kReadEvent;
        int maxi = 0;
    
        while(1)
        {
            //重新装填events数组
            int i;
            for(i = 1; i < EVENTS_SIZE; ++i)
            {
                int fd = events[i].fd;
                events[i].events = 0; //重置events
                if(fd == -1)
                    continue;
                assert(connsets[fd] != NULL);
                
                //当Buffer中有数据可读时,才监听write事件
                if(buffer_is_readable(&connsets[fd]->buffer_))
                {
                    events[i].events |= kWriteEvent;
                }
    
                //当Buffer中有空闲空间时,才监听read事件
                if(buffer_is_writeable(&connsets[fd]->buffer_))
                {
                    events[i].events |= kReadEvent;
                }
    
            }
    
            //poll调用
            int nready = poll(events, maxi + 1, 5000);
            if(nready == -1)
                ERR_EXIT("poll");
            else if(nready == 0)
            {
                printf("poll timeout.
    ");
                continue;
            }
    
            
            //处理listenfd
            if(events[0].revents & kReadEvent)
            {
                //接受一个新的客户fd
                int peerfd = accept4(listenfd, NULL, NULL, SOCK_NONBLOCK | SOCK_CLOEXEC);
                if(peerfd == -1)
                    ERR_EXIT("accept4");
                //新建tcp连接
                tcp_connection_t *pt = (tcp_connection_t*)malloc(sizeof(tcp_connection_t));
                buffer_init(&pt->buffer_);
                //将该tcp连接放入connsets
                connsets[peerfd] = pt;
                //放入events数组
                int i;
                for(i = 0; i < EVENTS_SIZE; ++i)
                {
                    if(events[i].fd == -1)
                    {
                        events[i].fd = peerfd; //这里不必监听fd
                        if(i > maxi)
                            maxi = i; //更新maxi
                        break;
                    }
                }
                if(i == EVENTS_SIZE)
                {
                    fprintf(stderr, "too many clients
    ");
                    exit(EXIT_FAILURE);
                }
            }
    
            //处理客户fd
            //int i;
            for(i = 1; i <= maxi; ++i)
            {
                int sockfd = events[i].fd;
                if(sockfd == -1)
                    continue;
                //取出指针
                tcp_connection_t *pt = connsets[sockfd];
                assert(pt != NULL);
                if(events[i].revents & kReadREvent) //读取数据
                {
                    if(buffer_read(&pt->buffer_, sockfd) == 0)
                    {
                        //close
                        events[i].fd = -1;
                        close(sockfd);
                        free(pt);
                        connsets[sockfd] = NULL;
                        continue; //继续下一次循环
                    } 
                }
    
                if(events[i].revents & kWriteREvent) //可以发送数据
                {
                    buffer_write(&pt->buffer_, sockfd);
                }
            }
        }
    
        close(listenfd);
    
        return 0;
    }

    需要注意的是,关于accept,我使用的是accpet4,这是Linux新增的系统调用

    int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);

    与accept的区别就是accept4可以指定非阻塞标志位。

    下文使用epoll。

  • 相关阅读:
    linux异常处理体系结构
    网站、架构、集群相关资源
    (转)分布式Web服务器架构的演变与技术需求
    B树、B树、B+树、B*树详解(转)
    (转)事件和路由事件概述
    LCID及Culture Name列表
    触摸键盘概述
    MySQL远端连接设置
    C#实现平衡多路查找树(B树) (转)
    CentOS6.3 LAMP运营环境安装
  • 原文地址:https://www.cnblogs.com/inevermore/p/4055033.html
Copyright © 2011-2022 走看看