zoukankan      html  css  js  c++  java
  • Version 0.01:使用 linux API (epoll) 实现一个 echo toy.

    Version 0.01:使用 linux API (epoll) 实现一个 echo toy.

    UNIX 网络编程基础介绍:

      个人认为,网络编程的本质还是进程间通信,只是通信区域跨越了一个网络罢了,通信的方式是使用套接字。

    重要的数据结构:

    struct sockaddr_in{
        short int sin_family;           // 1. AF_INET    2. PF_INET
        unsigned short int sin_port;    // 端口号
        struct in_addr sin_addr;        // ip 地址
        unsigned char sin_zero[8];      // 无意义
    };
    
    struct in_addr{
        in_addr_t s_addr;
    };
    

    服务器端的 API 调用:

    socket API 用于创建一个套接字,这个套接字常常是监听套接字

    #include <sys/socket.h>
    
    int socket(int domain, int type, int protocol);
            Return fd on success, or -1 on error
    

    bind API 用于将 socket 产生的套接字与服务器 ip:port 进行绑定

    #include <sys/socket.h>
    
    int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
            Return 0 on success, or -1 on error
    

    listen API 用于指定监听队列的长度,因为考虑到大量连接请求的情况,会有大量请求到达 listenfd,此时要求确定一个队列长度

    #include <sys/socket.h>
    
    int listen(int sockfd, int backlog);
            Return 0 on success, or -1 on error 
    

    accept API 用于产生一个服务套接字

    #include <sys/socket.h>
    int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
            Return fd on success, or -1 on error 
    

    其中 addr 指向的是一个需要由 accept API 填写的结构,表明了客户端的 ip:port

    linux API: epoll

    epoll - I/O event notification facility

    epoll (event poll) 可以检查多个文件描述符上的 I/O 就绪状态。epoll API 的主要优点如下:

    • 当检查大量的文件描述符时,epoll 性能的延展性比 select 和 poll 高很多
    • epoll API 既支持水平触发又支持边缘触发。select 和 poll 仅支持水平触发而 信号驱动I/O 仅支持边缘触发

    性能表现上 信号驱动I/O 与 epoll 相近,但是 epoll 具有以下 信号驱动I/O 不具备的优势:

    • 可以避免复杂的信号处理流程(比如信号队列溢出时的处理流程,因为实时信号可以排队,但队长有限)
    • 灵活性高,可以指定我们希望检查的事件(例如检查文件描述符的读就绪,或写就绪,或两者都检查)

    epoll 一共就 3 个接口:

      1. int epoll_create(int size);
      1. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *ev);
      1. int epoll_wait(int epfd, struct epoll_event *evlist, int maxevents, int timeout);
    #include <sys/epoll.h>
    
    int epoll_create(int size);
            Return fd on success, or -1 on error 
    
    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *ev);
            Return 0 on success, or -1 on error 
    
    int epoll_wait(int epfd, struct epoll_event *evlist, int maxevents, int timeout);
            Return number of ready fd, 0 on timeout, or -1 on error 
    
    1. int epoll_create(int size);

    创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

    注意:size参数只是告诉内核这个 epoll对象会处理的事件大致数目,而不是能够处理的事件的最大个数。在 Linux最新的一些内核版本的实现中,这个 size参数没有任何意义。

    1. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

    epoll的事件注册函数,epoll_ctl向 epoll对象中添加、修改或者删除感兴趣的事件,epoll_wait方法返回的事件必然是通过 epoll_ctl添加到 epoll中的。

    第一个参数是epoll_create()的返回值,第二个参数表示动作,用三个宏来表示:

    EPOLL_CTL_ADD:注册新的fd到epfd中;

    EPOLL_CTL_MOD:修改已经注册的fd的监听事件;

    EPOLL_CTL_DEL:从epfd中删除一个fd;

    第三个参数是需要监听的fd,第四个参数是告诉内核需要监听什么事,struct epoll_event结构如下:

    typedef union epoll_data {
        void *ptr;
        int fd;
        __uint32_t u32;
        __uint64_t u64;
    } epoll_data_t;
    
    struct epoll_event {
        __uint32_t events; /* Epoll events */
        epoll_data_t data; /* User data variable */
    

    events可以是以下几个宏的集合:

    EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);

    EPOLLOUT:表示对应的文件描述符可以写;

    EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);

    EPOLLERR:表示对应的文件描述符发生错误;

    EPOLLHUP:表示对应的文件描述符被挂断;

    EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。

    EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

    1. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

    等待事件的产生,类似于select()调用。参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。

    第1个参数 epfd是 epoll的描述符。

    第2个参数 events则是分配好的 epoll_event结构体数组,epoll将会把发生的事件复制到 events数组中(events不可以是空指针,内核只负责把数据复制到这个 events数组中,不会去帮助我们在用户态中分配内存。内核这种做法效率很高)。

    第3个参数 maxevents表示本次可以返回的最大事件数目,通常 maxevents参数与预分配的events数组的大小是相等的。

    第4个参数 timeout表示在没有检测到事件发生时最多等待的时间(单位为毫秒),如果 timeout为0,则表示 epoll_wait在 rdllist链表中为空,立刻返回,不会等待。

    另外一些在网络编程时用到的函数

    • char* inet_ntoa(struct in_addr);
    • in_addr_t inet_addr(const char *cp);
    • uint32_t htonl(uint32_t hostlong);
    • uint16_t htons(uint16_t hostshort);
    • int fcntl(int fd, int cmd, ... /* arg */ );

      The htonl function converts the unsigned int hostlong from host byte order to network byte order.
      The htons function converts the unsigned int hostshort from host byte order to network byte order.
      The inet_ntoa function 将返回一个点分十进制的 ip 地址字符串,该字符串保存在静态内存中,即初始化数据段中。
      The inet_addr function 将会将点分十进制的 ip 地址字符串转换成无符号长整形 (in_addr_t)。
      The fcntl function performs one of the operations on the open file descriptor fd. The operation is determined by cmd. 具体请看 man 手册。

    程序总体框架设计

    1. 将 socket、bind、listen 放在一个函数里面,用于创建一个监听套接字。

    2. 通过 epoll 来不断查询是否有可以进行 I/O 的套接字。

    3. 当监听套接字读就绪时,说明有连接接入,则通过 accept 产生一个服务套接字来对连接进行服务。

    4. 当服务套接字读写就绪时,进行相应的 I/O 操作

    以下是具体实现:

    #include <sys/socket.h>
    #include <sys/epoll.h>
    #include <unistd.h>
    #include <iostream>
    #include <vector>
    #include <map>
    #include <string>
    #include <fcntl.h>
    #include <arpa/inet.h>
    #include <error.h>
    
    using namespace std;
    
    #define MAXSIZE 4096
    #define MAXQUEUE 5
    #define MAXEVENTS 200
    const short int serv_port = 22222;
    
    void perror(const string msg){
        cout << msg << errno << endl;
        exit(0);
    }
    int createAndListen(){
        int listenfd;
        int on = 1;
        listenfd = socket(AF_INET, SOCK_STREAM, 0);
        fcntl(listenfd, F_SETFL, O_NONBLOCK); // non-block IO
    //    setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
        
        struct sockaddr_in serv_addr;
        serv_addr.sin_family = AF_INET;
        serv_addr.sin_port = htons(serv_port);
        serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    
        if(-1 == bind(listenfd, (sockaddr*) &serv_addr, (socklen_t) sizeof(serv_addr)))
            perror("bind error:");
        if(-1 == listen(listenfd, MAXQUEUE))
            perror("listen:");
        return listenfd;
    }
    int main(){
        char buf[MAXSIZE];
        int sockfd, listenfd, connfd;
        struct epoll_event ev, events[MAXEVENTS];
        struct sockaddr_in clieaddr;
        socklen_t client = sizeof(sockaddr_in);
    
    //    int num = 0;
        map<int, pair<in_addr, short int> > connfd2addr;
    //    vector<pair<in_addr, short int> > clients;
        listenfd = createAndListen();
    
        ev.data.fd = listenfd;
        ev.events = EPOLLIN;
    
        int epfd = epoll_create(1);
        if(epfd < 0){
            cout << "epfd < 0 error!
    ";
            exit(0);
        }
        // 往 epoll fd 中添加监听套接字 listenfd,ev 对象中存放的是要 epoll 监听的具体事件,以及要监听的 socket
        if(-1 == epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev)) 
            perror("epoll_ctl:");
    
        // 不停通过 epoll_wait 轮询是否有监听的事件发生
        for(;;){
            int n = epoll_wait(epfd, events, MAXEVENTS, -1);
            if(n < 0)
                perror("epoll_wait:");
            for(int i = 0;i < n;i++){
                // listenfd 中有事件发生(代表 listenfd 可读)
                if(events[i].data.fd == listenfd){
                    connfd = accept(listenfd, (sockaddr*) &clieaddr, (socklen_t*) &client);
                    if(-1 == connfd)
                        perror("accept:");
                    else{
                        cout << "new connection from host:"
                             << "[" << inet_ntoa(clieaddr.sin_addr)
                             << ntohs(clieaddr.sin_port) << "]"
                             << " accepted fd: " << connfd << endl;
                    }
    
                    short int clieport = clieaddr.sin_port;
                    in_addr clieip = clieaddr.sin_addr;
    
                    if(connfd2addr.end() == connfd2addr.find(connfd)){
                        connfd2addr.insert(pair<int, pair<in_addr, short int> >(connfd, make_pair(clieip, clieport)));
                    }
    
                    ev.data.fd = connfd;
                    ev.events = EPOLLIN;
                    fcntl(connfd, F_SETFL, O_NONBLOCK);
                    if(-1 == epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev))
                        perror("epoll_ctl:");
    
                }
                else if(events[i].events & EPOLLIN){
                    int readnum;
                    if((sockfd = events[i].data.fd) < 0){
                        cout << "EPOLLIN sockfd < 0 error!
    ";
                        continue;
                    }
                    else if((readnum = read(sockfd, buf, MAXSIZE)) < 0)
                        perror("read:");
                    else if(readnum == 0){
                        cout << "fd: " << sockfd <<" has read the end of file.	";
                        cout << "host:[" << inet_ntoa(connfd2addr[sockfd].first) 
                             << ":" << ntohs(connfd2addr[sockfd].second) << "] disconnected!
    ";
                        connfd2addr.erase(connfd2addr.find(connfd));
                        close(sockfd);
                    }
                    else{
                        if(write(sockfd, buf, readnum) != readnum)
                            cout << "not finished one time!
    ";
                        else{
                            cout << "----> fd = " << sockfd << endl;
                        }
                    }
                }
            }
        }
        return 0;
    }
    

    代码中的一些比较模糊的点:

    1. 在 createAndListen() 函数中,为什么要将 listenfd(监听 socket)设置为 O_NONBLOCK,也就是非阻塞?
    2. 在 createAndListen() 函数中,填写 IP 地址时为什么用到了 INADDR_ANY,它是什么意思?
    3. 客户端与服务器通过 TCP 三次握手建立连接后,与服务器的通信都是通过 服务套接字,这个 服务套接字监听套接字 是共用一个服务器端口(port)吗?如果是共用,那怎么区分不同的客户端(新的连接或是通信)?
    4. 在 main() 函数中,为什么要将 connfd(服务 socket)设置为 O_NONBLOCK,也就是非阻塞?

    回答:

    1. 首先来看下 TCP 三次握手示意图:

      从图中可知,connect 会先于 accept 返回。当一个连接到来的时候,监听套接字可读,此时,我们稍微等一段时间之后再调用 accept()。就在这段时间内,客户端设置linger选项(l_onoff = 1, l_linger = 0),然后调用了close(),那么客户端将不经过四次挥手过程,通过发送RST报文断开连接。(也就是使用 RST 客户端,下面有 RST 客户端的代码实现)。服务端接收到RST报文,系统会将排队的这个未完成连接直接删除,此时就相当于没有任何的连接请求到来, 而接着调用的 accept() 将会被阻塞,直到另外的新连接到来时才会返回。这是与IO多路复用的思想相违背的(系统不阻塞在某个具体的IO操作上,而是阻塞在 select、poll、epoll 这些 IO 复用上)。

      在 RST 客户端的这种情况下,如果将监听套接字设置为非阻塞, accept() 不会阻塞住,它返回 -1,同时 errno = EWOULDBLOCK。具体请参考

      UNP 16.6节 非阻塞 accept

    2. INADDR_ANY 就是指定地址为 0.0.0.0 的地址,这个地址事实上表示不确定地址,或 “所有地址”、“任意地址”。其实就是指的本机的任意 IP 地址(如果有多个的话)。

    3. 注意:服务套接字监听套接字 是共用一个服务器端口(port)的!!!那么怎么区分不同的客户端呢(毕竟服务器是要连接一大堆客户端的)?这就涉及到传输层的知识了。服务器通过识别传输层五元组(源IP,源端口,目的IP,目的端口,传输层协议)来具体区分一个客户端和服务套接字。

    4. 这个问题其实在 man 文档里就已经有提到了。我们来看下 man 文档是怎么说的:

      Level-triggered and edge-triggered

      The epoll event distribution interface is able to behave both as edge-triggered (ET) and as level-triggered (LT).

      The difference between the two mechanisms can be described as follows. Suppose that this scenario happens:

      1. The file descriptor that represents the read side of a pipe (rfd) is registered on the epoll instance.
      
      2. A pipe writer writes 2 kB of data on the write side of the pipe.
      
      3. A call to epoll_wait(2) is done that will return rfd as a ready file descriptor.
      
      4. The pipe reader reads 1 kB of data from rfd.
      
      5. A call to epoll_wait(2) is done.
      

      If the rfd file descriptor has been added to the epoll interface using the EPOLLET (edge-triggered) flag, the
      call to epoll_wait(2) done in step 5 will probably hang despite the available data still present in the file
      input buffer; meanwhile the remote peer might be expecting a response based on the data it already sent. The
      reason for this is that edge-triggered mode delivers events only when changes occur on the monitored file
      descriptor. So, in step 5 the caller might end up waiting for some data that is already present inside the input
      buffer. In the above example, an event on rfd will be generated because of the write done in 2 and the event is
      consumed in 3. Since the read operation done in 4 does not consume the whole buffer data, the call to
      epoll_wait(2) done in step 5 might block indefinitely.

      An application that employs the EPOLLET flag should use nonblocking file descriptors to avoid having a blocking
      read or write starve a task that is handling multiple file descriptors. The suggested way to use epoll as an
      edge-triggered (EPOLLET) interface is as follows:

       i   with nonblocking file descriptors; and
      
       ii  by waiting for an event only after read(2) or write(2) return EAGAIN.
      

      翻译:

      首先假设一个场景

       1. 一个管道读端的文件描述符 rfd 被注册进入了一个 epoll 实例
       2. 管道的写端写了 2KB 的数据
       3. epoll_wait 会监测到一个 EPOLLIN 事件并返回 rfd 这个读就绪的文件描述符
       4. 管道的读端就从 rfd 读了 1KB 的数据
       5. epoll_wait 调用结束
      

      如果 rfd 在注册进 epoll 实例时指定了是边缘触发 (EPOLLET),那么第 5 步的 epoll_wait 调用可能被阻塞,即使 rfd 中还有 1KB 的数据可读;与此同时剩下的 1KB 描述的正好是 “你读完和给我发个 response” 这样的含义,由于读端无法检测到 rfd 的 EPOLLIN 事件就无法得知剩余 1KB 的消息,就不会给写端发 response,而写端还在苦苦地等着读端的 response,最后双方都在等,导致了死锁,这个文件描述符就相当于废了。

      所以 man 手册建议我们在使用 epoll 的 ET 模式时要做到以下 2 点:

      1. 文件描述符都设置成非阻塞
      2. 在读或写返回一个 EAGAIN 后再调用 epoll_wait
  • 相关阅读:
    写了一个Windows服务,通过C#模拟网站用户登录并爬取BUG列表查询有没有新的BUG,并提醒我
    WCF快速上手(二)
    Oracle递归查询
    100多行代码实现6秒完成50万条多线程并发日志文件写入
    C#写日志工具类
    WPF定时刷新UI界面
    HttpUtil工具类
    WPF GridView的列宽度设置为按比例分配
    Visifire图表
    C# BackgroundWorker 的使用、封装
  • 原文地址:https://www.cnblogs.com/Codroc/p/14042314.html
Copyright © 2011-2022 走看看