zoukankan      html  css  js  c++  java
  • Libevent学习之SocketPair实现

      Libevent设计的精化之一在于把Timer事件、Signal事件和IO事件统一集成在一个Reactor中,以统一的方式去处理这三种不同的事件,更确切的说是把Timer事件和Signal事件融合到了IO多路复用机制中。

      Timer事件的融合相对清晰简单,其套用了Reactor和Proactor模式(如Windows上的IOCP)中处理Timer事件的经典方法,其实Libevent就是一个Reactor嘛。由于IO复用机制(如Linux下的select、epoll)允许使用一个最大等待时间(即最大超时时间)timeout,当超过了这段最大等待时间,即使没有发生IO事件,也会返回并执行用户设置好的函数。那么在Libevent中将Timer时间融合到正常的IO事件中的方法就是,把系统IO复用的最大超时时间设置为一系列Timer时间中最小的超时时间。这样就能如我们所愿在IO事件能顺利执行的情况下我们去执行IO事件而忽略Timer事件,如果到达timeout时间没有IO事件执行,系统复用也会因为超时而返回,这时候刚好就能执行Timer事件。

      而Signal事件统一到系统的IO复用机制中就没那么自然了,由于Signal事件的出现的随机的,进程不能只是测试一个变量来判别是否发生了一个信号,而是必须告诉内核“在此信号发生时,请执行如下的操作”。这似乎是一件不可以完成的任务,但有时候我们只要换个角度思考,就会发现到达终点的路不只有一条,虽然不是笔直的那条。我们发现,当Signal事件发生时,只要触发系统的IO复用机制,使其返回,再去统一处理所有的待处理事件就可以了。那么如何触发呢?最简单的就是当一个套接字可读时,IO复用就能被触发,而我们的任务就是在Signal事件发生时,向这个套接字(假设为A)的另一端(另一个套接字,我们称之为B)写入数据(不用多,一个字节即可)即可使套接字A有数据读,使得IO复用返回继而处理事件。而用户只需要向Reactor在套接字A处注册一个persist的可读事件,就如同注册其他事件一样,把Signal事件融合到IO复用机制中。


      上述的套接字A和B由于都在本地,目的是为了实现两个进程之间的通信,可以将其看作是一个"数据结构",成为socketpair,在任何一个套接字上写数据都能发送到另一个套接字,是一个全双工的实现。进程间的通信我们最直接的办法就是使用pipe,Linux 提供了 popen 和 pclose 函数,用于创建和关闭管道与另外一个进程进行通信。

    FILE *popen(const char *command, const char *mode);  
    int pclose(FILE *stream);  

      遗憾的是,popen 创建的管道只能是单向的 -- mode 只能是 "r" 或 "w" 而不能是某种组合--用户只能选择要么往里写,要么从中读,而不能同时在一个管道中进行读写。如果非得用pipe来实现“全双工”,就要popen两次,打开两个管道。有没有更简单的办法呢?答案就是上述我们所讲到的socketpair,而BSD的内核已经实现了一个socketpair函数,该系统调用能创建一对已连接的UNIX族socket。在Linux中,完全可以把这一对socket当成pipe返回的文件描述符一样使用,唯一的区别就是这一对文件描述符中的任何一个都可读和可写,函数原型如下:

    int socketpair(int d, int type, int protocol, int sv[2]); 

      socketpair()函数建立一对匿名的已经连接的套接字,其特性由协议族d、类型type、协议protocol决定,建立的两个套接字描述符会放在sv[0]和sv[1]中。
      第1个参数d,表示协议族,只能为AF_LOCAL或者AF_UNIX;
      第2个参数type,表示类型,只能为0。
      第3个参数protocol,表示协议,可以是SOCK_STREAM或者SOCK_DGRAM。用SOCK_STREAM建立的套接字对是管道流,与一般的管道相区别的是,套接字对建立的通道是双向的,即每一端都可以进行读写。参数sv,用于保存建立的套接字对。

      关于Unix域协议和源自BSD的socketpair函数在《Unix网络编程 卷1 <第三版>》中第15章Stevens先生已经给我们详细的讲解了,在中文版第330页也有使用socketpair来实现描述符传递的例子,可仔细研读。

      Libevent中也有对socketpair的实现,由于在原来的函数中有一些特定的宏和变量名,直接阅读和使用会不方便,所以我将他抽出来进行通用化(^_^),作为一个可复用的函数,供学习和使用。下面是源码:

    #include <stdio.h>
    #include <stdlib.h>
    
    #include <netinet/in.h>
    #include <sys/socket.h>
    #include <sys/types.h>
    
    /**
     *  创建一个SocketPair,通过返回的两个fd可以进行进程间通信
     *  @param family : 套接字对间使用的协议族,可以是AF_INET或AF_LOCAL
     *  @param type : 套接字类型
     *  @param protocol : 协议类型
     *  @param fd[2] : 将要创建的Socketpair两端的文件描述符
     */
    int
    Socketpair(int family, int type, int protocol, int fd[2])
    {
        int32_t listener = -1;
        int32_t connector = -1;
        int32_t acceptor = -1;
        struct sockaddr_in listen_addr;
        struct sockaddr_in connect_addr;
        unsigned int size;
    
        if (protocol || 
                (family != AF_INET && family != AF_LOCAL)) {
            fprintf(stderr, "EAFNOSUPPORT
    ");
            return -1;
        }
        if (!fd) {
            fprintf(stderr, "EINVAL
    ");
            return -1;
        }
    
        /*创建listener,监听本地的换回地址,端口由内核分配*/
        listener = socket(AF_INET, type, 0);
        if (listener < 0)
            return -1;
        memset(&listen_addr, 0, sizeof(listen_addr));
        listen_addr.sin_family = AF_INET;
        listen_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
        listen_addr.sin_port = 0;    /* kernel chooses port.    */
        if (bind(listener, (struct sockaddr *) &listen_addr, sizeof(listen_addr))
                == -1)
            goto fail;
        if (listen(listener, 1) == -1)
            goto fail;
    
        /*创建connector, 连接到listener, 作为Socketpair的一端*/
        connector = socket(AF_INET, type, 0);
        if (connector < 0)
            goto fail;
        /* We want to find out the port number to connect to.  */
        size = sizeof(connect_addr);
        if (getsockname(listener, (struct sockaddr *) &connect_addr, &size) == -1)
            goto fail;
        if (size != sizeof(connect_addr))
            goto fail;
        if (connect(connector, (struct sockaddr *) &connect_addr,
                    sizeof(connect_addr)) == -1)
            goto fail;
    
        size = sizeof(listen_addr);
        /*调用accept函数接受connector的连接,将返回的文件描述符作为Socketpair的另一端*/
        acceptor = accept(listener, (struct sockaddr *) &listen_addr, &size);
        if (acceptor < 0)
            goto fail;
        if (size != sizeof(listen_addr))
            goto fail;
        close(listener);
        
        /**
         * 至此,我们已经创建了两个连接在一起的文件描述符,
         * 通过向其中任意一个发送数据,都会“转发”到另一个,即可以实现进程间的通信
         */
         
        /* Now check we are talking to ourself by matching port and host on the
           two sockets.     */
        if (getsockname(connector, (struct sockaddr *) &connect_addr, &size) == -1)
            goto fail;
        if (size != sizeof(connect_addr)
                || listen_addr.sin_family != connect_addr.sin_family
                || listen_addr.sin_addr.s_addr != connect_addr.sin_addr.s_addr
                || listen_addr.sin_port != connect_addr.sin_port)
            goto fail;
        fd[0] = connector;
        fd[1] = acceptor;
    
        return 0;
    
    fail:
        if (listener != -1)
            close(listener);
        if (connector != -1)
            close(connector);
        if (acceptor != -1)
            close(acceptor);
    
        return -1;
    }

    简单测试一下:

    #include <sys/types.h>
    #include <sys/socket.h>
    
    #include <stdlib.h>
    #include <stdio.h>
    
    int Socketpair(int, int, int, int[]);
    
    int main ()
    {
        int fds[2];
    
        int r = Socketpair(AF_INET, SOCK_STREAM, 0, fds);
        if (r < 0) {
            perror( "socketpair()" );
            exit( 1 );
        }
    
        if(fork()) {
            /*  Parent process: echo client */
            int val = 0;
            close( fds[1] );
            while ( 1 ) {
                sleep(1);
                ++val;
                printf( "Sending data: %d
    ", val );
                write( fds[0], &val, sizeof(val) );
                read( fds[0], &val, sizeof(val) );
                printf( "Data received: %d
    ", val );
            }
        }
        else {
            /*  Child process: echo server */
            int val;
            close( fds[0] );
            while ( 1 ) {
                read( fds[1], &val, sizeof(val) );
                ++val;
                write( fds[1], &val, sizeof(val) );
            }
        }
    }

    测试结果:

    =============================神奇的分割线============================

                                                                   源码请猛戳{ 这里

    ================================================================

    参考资料:

    Linux上实现双向进程间通信管道(socketpair)

    libevent源码深度剖析

    Libevent源码

    《Unix网络编程 卷一 <第三版>》

  • 相关阅读:
    多线程伪共享FalseSharing
    C语言restrict限定符
    Linux线程基础函数
    Linux信号函数
    C函数前向声明省略参数
    12.2 关闭DLM 自动收集统计信息 (SCM0)ORA-00600之[ksliwat: bad wait time]
    pdb的数量限制
    关闭或开启memory_target
    OSWATCH安装
    参数SID写错,ERROR OGG-00664 ORA-01034: ORACLE not available ORA-27101: shared memory realm does not exist
  • 原文地址:https://www.cnblogs.com/big-xuyue/p/4098578.html
Copyright © 2011-2022 走看看