zoukankan      html  css  js  c++  java
  • 进程池

    让服务器在启动阶段调用fork创建一个子进程池,通过子进程来处理客户端请求。子进程与父进程之间使用socketpair进行通信(为了方便使用sendmsg与recvmsg,如果使用匿名管道,则无法使用以上两个函数)。以下针对TCP进行分析。

    server端使用select轮询用于监听客户端请求的被动套接字fd_listen以及用于父子之间通信的socketpair。每当客户端有请求时,server端会将由accept返回的用于与客户端通信的socket描述符通过socketpair发送给一个空闲的子进程,由子进程与客户端进行通信(处理请求)。因此服务器端需要维护一个子进程队列,队列中的每个元素存放着与子进程通信的socketpair以及标记子进程是否空闲的标志位,如下:

    typedef struct tag_chd
    {
        int s_sfd ;     //与子进程通信的socketpair描述符
        int s_state ;   //标记子进程是否空闲
    }NODE, *pNODE;

    每当子进程处理完客户端请求时,会通过socketpair向server端发送消息,server端select到该socketpair后,会将对应子进程标志位设置为空闲。

    注意

    1. 由于父进程是先创建子进程,之后才accept用于与客户端通信的socket描述符fd_client,因此子进程的pcb中并没有fd_client的信息。server端需要将fd_client发送子进程。如果只是用send来发送fd_client信息的话,子进程只会将其当成一个整型数。我们需要用sendmsg将fd_client连同其辅助(控制)信息一并发送,这样子进程才会将其当成一个socket描述符。

    2. 父进程预先创建子进程池,该子进程如同server端一样是永远不会退出的。子进程中使用while死循环,如下:

    复制代码
    while(1)
        {
            readn = read(sfd, &flag, 4);                // 服务器分配的子进程在子进程队列中的下标
            printf("readn: %d 
    ", readn);             
            printf("read from father: %d 
    ", flag);
            recv_fd(sfd, &fd_client);                   // recv_fd中封装了recvmsg,接收与客户端通信的socket描述符
            handle_request(fd_client);                  // 处理客户端请求
            write(sfd, &pid, sizeof(pid));              // 处理完请求后通过socketpair通知服务器,服务器将该子进程状态设置为空闲
        }
    复制代码

    每当子进程处理完一个客户端请求后(也就是客户端退出了),子进程会阻塞在 read 处,等待接收下一个客户端请求。

    由于是while死循环,且死循环中没有break语句,因此子进程不可能跳出这个while循环,也就不会执行while循环以下的内容了,这样可以保证子进程结尾没有exit也不会执行之后的内容。

    3. 编译用到的动态库见

    函数原型

    复制代码
     #include <sys/types.h>
     #include <sys/socket.h>
    
     ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);
     ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);
    
     struct msghdr {
                   void         *msg_name;       /* optional address */
                   socklen_t     msg_namelen;    /* size of address */
                   struct iovec *msg_iov;        /* scatter/gather array */
                   size_t        msg_iovlen;     /* # elements in msg_iov */
                   void         *msg_control;    /* ancillary data, see below */
                   socklen_t     msg_controllen; /* ancillary data buffer len */
                   int           msg_flags;      /* flags on received message */
               };
    
     
     
     fields:
     
     struct iovec {                    /* Scatter/gather array items */
                   void  *iov_base;              /* Starting address */
                   size_t iov_len;               /* Number of bytes to transfer */
               };
               
     struct cmsghdr {
               socklen_t cmsg_len;    /* data byte count, including header */
               int       cmsg_level;  /* originating protocol */ /* 如果是文件描述符,填SOL_SOCKET */
               int       cmsg_type;   /* protocol-specific type */ /* 如果是文件描述符,填SCM_RIGHTS */
               /* followed by unsigned char cmsg_data[]; */
           };
     
     /* 返回cmsghdr结构的cmsg_len成员的值,考虑到对齐,使用数据部分的长度作为参数。*/      
     size_t CMSG_LEN(size_t length);  
     /* 返回cmsghdr的数据部分指针。*/
     unsigned char *CMSG_DATA(struct cmsghdr *cmsg); 
    
     CMSG_DATA() returns a pointer to the data portion of a cmsghdr.
     CMSG_LEN()  returns  the  value  to store in the cmsg_len member of the cmsghdr structure, taking into
                 account any necessary alignment.  It takes the data length as an argument. 
                 This is a constant expression.
    复制代码
    复制代码
    NAME
           socketpair - create a pair of connected sockets
    
    SYNOPSIS
           #include <sys/types.h>          /* See NOTES */
           #include <sys/socket.h>
    
           int socketpair(int domain, int type, int protocol, int sv[2]);

    RETURN VALUE
           On  success,  zero is returned.  On error, -1 is returned, and errno is
           set appropriately.

    复制代码

    代码

    server.h

    复制代码
    #ifndef __SERVER_H__
    #define __SERVER_H__
    #include "my_socket.h"
    #include <sys/stat.h>
    #include <sys/types.h>
    #include <fcntl.h>
    #include <sys/time.h>
    #include <sys/select.h>
    #include <sys/uio.h>
    #include <sys/wait.h>
    #include <errno.h>
    #define SER_IP "127.0.0.1"
    #define SER_PORT 8888
    #define ST_BUSY 1
    #define ST_IDLE 2
    #define SIZE 8192
    #define MSG_SIZE (SIZE - 4)
    
    typedef struct tag_mag
    {
        int msg_len ;
        char msg_buf[MSG_SIZE];//8188
    }MSG, *pMSG;
    
    typedef struct tag_chd
    {
        int s_sfd ;
        int s_state ;
    }NODE, *pNODE;
    
    extern int errno ;
    void make_child(pNODE arr, int cnt);
    void child_main(int sfd) ;
    void handle_request(int sfd);
    void send_fd(int sfd, int fd_file) ;
    void recv_fd(int sfd, int* fd_file) ;
    void dispatch(pNODE arr, int cnt, int fd_client);
    #endif
    复制代码

    main.c

    复制代码
    #include "server.h"
    int main(int argc, char* argv[])//exe chld_cnt
    {
        if(argc != 2)
        {
            printf("Usage: exe , child_cnt! 
    ");
            exit(1);
        }
        int child_cnt = atoi(argv[1]);
        pNODE arr_child = (pNODE)calloc(child_cnt, sizeof(NODE)) ; /* 动态数组维护子进程池 */
        make_child(arr_child, child_cnt);
        
        int fd_listen, fd_client ;
        my_socket(&fd_listen, MY_TCP, SER_IP, SER_PORT);
        my_listen(fd_listen, 10);
        
        fd_set readset, readyset ;
        FD_ZERO(&readset);
        FD_ZERO(&readyset);
        FD_SET(fd_listen, &readset);//先将监听客户端的描述符设置到监听集合
        int index ;
        for(index = 0; index < child_cnt; index ++)
        {
            FD_SET(arr_child[index].s_sfd, &readset);//再将和子进程的通信的socket设置到监听集合
        }
        
        int select_ret ;
        struct timeval tm ;
        while(1)
        {
            tm.tv_sec = 0 ;
            tm.tv_usec = 1000 ;
            readyset = readset ;
            select_ret = select(1024, &readyset, NULL, NULL, &tm);
            if(select_ret == 0)        /* 轮询时间内,所有描述符均没有活动,返回0,继续轮询 */
            {
                continue ;
            }else if(select_ret == -1) /* 信号 */
            {
                if(errno == EINTR)
                {
                    continue ;
                }else 
                {
                    exit(1);
                }
            }else 
            {
                if(FD_ISSET(fd_listen, &readyset))//如果是客户端发来的信号
                {
                fd_client = accept(fd_listen, NULL, NULL) ;    
                dispatch(arr_child, child_cnt ,fd_client);
                close(fd_client);
                }
                for(index = 0; index < child_cnt; index ++)
                {
                    if(FD_ISSET(arr_child[index].s_sfd, &readyset))//是子进程发来的信号
                    {
                        int val ;
                        read(arr_child[index].s_sfd, &val, 4);
                        arr_child[index].s_state = ST_IDLE ;
                    }
                }
                
            }
            
        }   
    }
    复制代码

    server.c

    复制代码
    #include "server.h"
    void make_child(pNODE arr, int cnt)
    {
        int index ; 
        for(index = 0; index < cnt; index ++)
        {
            pid_t pid ;
            int fds[2] ;//fds[0] - c  fds[1] - p
            socketpair(AF_LOCAL, SOCK_STREAM, 0, fds);
            pid = fork() ;
            if(pid == 0)// child
            {
                close(fds[1]);         /* 子进程用fds[0],关闭fds[1] */
                child_main(fds[0]) ;   /* 每创建一个子进程,子进程就进入该函数中(死循环),接收请求,处理请求,如此循环。*/
    
            }else 
            {
                /* 初始化进程池队列中的每一个子进程 */
                arr[index].s_sfd = fds[1] ;
                arr[index].s_state = ST_IDLE ;
                close(fds[0]);         /* 父进程用fds[1], 关闭fds[0] */
            }
    
        }
    
    }
    void child_main(int sfd)
    {
        int fd_client ;
        int flag ;
        int readn ;
        pid_t pid = getpid();
        while(1)
        {
            readn = read(sfd, &flag, 4);
            printf("readn: %d 
    ", readn);
            printf("read from father: %d 
    ", flag);
            recv_fd(sfd, &fd_client);//从父进程中读出客户端的描述符
            handle_request(fd_client);
            write(sfd, &pid, sizeof(pid));
        }
    }
    void handle_request(int sfd)
    {    
    
        MSG my_msg ;
        int recvn ;
        while(1)
        {
            memset(&my_msg, 0, sizeof(MSG));
            my_recv(&recvn, sfd, &my_msg, 4);
            if(my_msg.msg_len  == 0)//如果客户端退出
            {
                break ;
            }
            my_recv(NULL, sfd, my_msg.msg_buf, my_msg.msg_len);
            my_send(NULL, sfd, &my_msg, my_msg.msg_len + 4);
    
        }
    
    }
    //父进程传文件描述符给子进程 void send_fd(int sfd, int fd_file) { struct msghdr my_msg ; memset(&my_msg, 0, sizeof(my_msg)); struct iovec bufs[1] ; char buf[32] = "hello world ! "; bufs[0].iov_base = buf ; bufs[0].iov_len = strlen(buf) ; my_msg.msg_name = NULL ; my_msg.msg_namelen = 0 ; my_msg.msg_iov = bufs ; my_msg.msg_iovlen = 1 ; my_msg.msg_flags = 0 ; struct cmsghdr *p ; int cmsg_len = CMSG_LEN(sizeof(int)) ; /* 所传为文件描述符,因此sizeof(int) */ p = (struct cmsghdr*)calloc(1, cmsg_len) ; p -> cmsg_len = cmsg_len ; p -> cmsg_level = SOL_SOCKET ; p -> cmsg_type = SCM_RIGHTS ; *(int*)CMSG_DATA(p) = fd_file ; my_msg.msg_control = p ; my_msg.msg_controllen = cmsg_len ; int sendn ; sendn = sendmsg(sfd, &my_msg, 0); printf("send masg len : %d ", sendn); } void recv_fd(int sfd, int* fd_file) { struct msghdr my_msg ; struct iovec bufs[1] ; char buf1[32]="" ; bufs[0].iov_base = buf1 ; bufs[0].iov_len = 31 ; my_msg.msg_name = NULL ; my_msg.msg_namelen = 0 ; my_msg.msg_iov = bufs ; my_msg.msg_iovlen = 2 ; my_msg.msg_flags = 0 ; struct cmsghdr *p ; int cmsg_len = CMSG_LEN(sizeof(int)) ; p = (struct cmsghdr*)calloc(1, cmsg_len) ; my_msg.msg_control = p ; my_msg.msg_controllen = cmsg_len ; int recvn ; recvn = recvmsg(sfd, &my_msg, 0); *fd_file = *(int*)CMSG_DATA((struct cmsghdr*)my_msg.msg_control); //写成*(int*)CMSG_DATA(P)也可 printf("buf1: %s, recv msg len : %d ", buf1, recvn); } void dispatch(pNODE arr, int cnt, int fd_client) { int index ; for(index = 0 ; index < cnt; index ++) { if(arr[index].s_state == ST_IDLE) { write(arr[index].s_sfd, &index, 4); send_fd(arr[index].s_sfd, fd_client); /* 向空闲的子进程分配任务,将服务器accept返回的socket描述符发送给子进程*/ arr[index].s_state = ST_BUSY ; break ; } } }
    复制代码

    client.c

    复制代码
    #include "my_socket.h"
    #define MY_IP "127.0.0.1"
    #define MY_PORT 6666
    #define SER_IP "127.0.0.1"
    #define SER_PORT 8888
    #define SIZE 8192
    #define MSG_SIZE (SIZE - 4)
    typedef struct tag_mag// 
    {
        int msg_len ;
        char msg_buf[MSG_SIZE];//8188
    }MSG, *pMSG;
    int main(int argc, char* argv[])
    {
        int sfd ;
        my_socket(&sfd, MY_TCP, MY_IP, atoi(argv[1]));
        my_connect(sfd, SER_IP, SER_PORT);
        MSG my_msg ;
        while(memset(&my_msg, 0, sizeof(MSG)), fgets(my_msg.msg_buf, MSG_SIZE, stdin)!= NULL)
        {
            my_msg.msg_len = strlen(my_msg.msg_buf);
            my_send(NULL, sfd, &my_msg, 4 + my_msg.msg_len );
            memset(&my_msg, 0, sizeof(MSG));
            my_recv(NULL, sfd, &my_msg, 4);
            my_recv(NULL, sfd, &my_msg.msg_buf, my_msg.msg_len);
            printf("recv from server : %s 
    ", my_msg.msg_buf);
        
        }
        /* 客户端退出时,向服务器发送一个长度为0的消息 ,用于通知服务器退出 */
        memset(&my_msg, 0, sizeof(MSG));
        my_send(NULL, sfd, &my_msg, 4 + my_msg.msg_len);
        close(sfd);
    
    }
    复制代码

    编译如下:

    gcc -o s server.c main.c -lmy_socket -I/home/purple/include
    gcc -o c client.c -lmy_socket -I/home/purple/include
  • 相关阅读:
    几个ssh和sftp的命令
    发现一个github的奇葩设定
    插耳机对orientation sensor的影响
    android中MediaPlayer类的用法
    Oracle 高性能SQL引擎剖析----执行计划
    【转】对列式数据库的一点总结和展望
    【转】大数据分析(Big Data OLAP)引擎Dremel, Tenzing 以及Impala
    TCP/IP协议详解---概述
    读取HttpWebResponse流的两种方法及注意的问题
    This project references NuGet package(s) that are missing on this computer.
  • 原文地址:https://www.cnblogs.com/hxjbc/p/3966531.html
Copyright © 2011-2022 走看看