zoukankan      html  css  js  c++  java
  • Linux客户/服务器程序设计范式2——并发服务器(进程池)

    引言

    让服务器在启动阶段调用fork创建一个子进程池,通过子进程来处理客户端请求。子进程与父进程之间使用socketpair进行通信(为了方便使用sendmsg与recvmsg,如果使用匿名管道,则无法使用以上两个函数)。以下针对TCP进行分析。

    server端使用select轮询用于监听客户端请求的被动套接字fd_listen以及用于父子之间通信的socketpair。每当客户端有请求时,server端会将由accept返回的用于与客户端通信的socket描述符通过socketpair发送给一个空闲的子进程,由子进程与客户端进行通信(处理请求)。因此服务器端需要维护一个子进程队列,队列中的每个元素存放着与子进程通信的socketpair以及标记子进程是否空闲的标志位,如下:

    1 typedef struct tag_chd
    2 {
    3     int s_sfd ;     //与子进程通信的socketpair描述符
    4     int s_state ;   //标记子进程是否空闲
    5 }NODE, *pNODE;

    每当子进程处理完客户端请求时,会通过socketpair向server端发送消息,server端select到该socketpair后,会将对应子进程标志位设置为空闲。

    注意

    1. 由于父进程是先创建子进程,之后才accept用于与客户端通信的socket描述符fd_client,因此子进程的pcb中并没有fd_client的信息。server端需要将fd_client发送子进程。如果只是用send来发送fd_client信息的话,子进程只会将其当成一个整型数。我们需要用sendmsg将fd_client连同其辅助(控制)信息一并发送,这样子进程才会将其当成一个socket描述符。

    2. 父进程预先创建子进程池,该子进程如同server端一样是永远不会退出的。子进程中使用while死循环,如下:

    1 while(1)
    2     {
    3         readn = read(sfd, &flag, 4);                // 服务器分配的子进程在子进程队列中的下标
    4         printf("readn: %d 
    ", readn);             
    5         printf("read from father: %d 
    ", flag);
    6         recv_fd(sfd, &fd_client);                   // recv_fd中封装了recvmsg,接收与客户端通信的socket描述符
    7         handle_request(fd_client);                  // 处理客户端请求
    8         write(sfd, &pid, sizeof(pid));              // 处理完请求后通过socketpair通知服务器,服务器将该子进程状态设置为空闲
    9     }

    每当子进程处理完一个客户端请求后(也就是客户端退出了),子进程会阻塞在 read 处,等待接收下一个客户端请求。

    由于是while死循环,且死循环中没有break语句,因此子进程不可能跳出这个while循环,也就不会执行while循环以下的内容了,这样可以保证子进程结尾没有exit也不会执行之后的内容。

    函数原型

     1 #include <sys/types.h>
     2  #include <sys/socket.h>
     3 
     4  ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);
     5  ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);
     6 
     7  struct msghdr {
     8                void         *msg_name;       /* optional address */
     9                socklen_t     msg_namelen;    /* size of address */
    10                struct iovec *msg_iov;        /* scatter/gather array */
    11                size_t        msg_iovlen;     /* # elements in msg_iov */
    12                void         *msg_control;    /* ancillary data, see below */
    13                socklen_t     msg_controllen; /* ancillary data buffer len */
    14                int           msg_flags;      /* flags on received message */
    15            };
    16 
    17  
    18  
    19  fields:
    20  
    21  struct iovec {                    /* Scatter/gather array items */
    22                void  *iov_base;              /* Starting address */
    23                size_t iov_len;               /* Number of bytes to transfer */
    24            };
    25            
    26  struct cmsghdr {
    27            socklen_t cmsg_len;    /* data byte count, including header */
    28            int       cmsg_level;  /* originating protocol */ /* 如果是文件描述符,填SOL_SOCKET */
    29            int       cmsg_type;   /* protocol-specific type */ /* 如果是文件描述符,填SCM_RIGHTS */
    30            /* followed by unsigned char cmsg_data[]; */
    31        };
    32  
    33  /* 返回cmsghdr结构的cmsg_len成员的值,考虑到对齐,使用数据部分的长度作为参数。*/      
    34  size_t CMSG_LEN(size_t length);  
    35  /* 返回cmsghdr的数据部分指针。*/
    36  unsigned char *CMSG_DATA(struct cmsghdr *cmsg); 
    37 
    38  CMSG_DATA() returns a pointer to the data portion of a cmsghdr.
    39  CMSG_LEN()  returns  the  value  to store in the cmsg_len member of the cmsghdr structure, taking into
    40              account any necessary alignment.  It takes the data length as an argument. 
    41              This is a constant expression.
     1 NAME
     2        socketpair - create a pair of connected sockets
     3 
     4 SYNOPSIS
     5        #include <sys/types.h>          /* See NOTES */
     6        #include <sys/socket.h>
     7 
     8        int socketpair(int domain, int type, int protocol, int sv[2]);
     9 RETURN VALUE
    10        On  success,  zero is returned.  On error, -1 is returned, and errno is
    11        set appropriately.

    代码

    server.h

     1 #ifndef __SERVER_H__
     2 #define __SERVER_H__
     3 #include "my_socket.h"
     4 #include <sys/stat.h>
     5 #include <sys/types.h>
     6 #include <fcntl.h>
     7 #include <sys/time.h>
     8 #include <sys/select.h>
     9 #include <sys/uio.h>
    10 #include <sys/wait.h>
    11 #include <errno.h>
    12 #define SER_IP "127.0.0.1"
    13 #define SER_PORT 8888
    14 #define ST_BUSY 1
    15 #define ST_IDLE 2
    16 #define SIZE 8192
    17 #define MSG_SIZE (SIZE - 4)
    18 
    19 typedef struct tag_mag
    20 {
    21     int msg_len ;
    22     char msg_buf[MSG_SIZE];//8188
    23 }MSG, *pMSG;
    24 
    25 typedef struct tag_chd
    26 {
    27     int s_sfd ;
    28     int s_state ;
    29 }NODE, *pNODE;
    30 
    31 extern int errno ;
    32 void make_child(pNODE arr, int cnt);
    33 void child_main(int sfd) ;
    34 void handle_request(int sfd);
    35 void send_fd(int sfd, int fd_file) ;
    36 void recv_fd(int sfd, int* fd_file) ;
    37 void dispatch(pNODE arr, int cnt, int fd_client);
    38 #endif

    main.c

     1 #include "server.h"
     2 int main(int argc, char* argv[])//exe chld_cnt
     3 {
     4     if(argc != 2)
     5     {
     6         printf("Usage: exe , child_cnt! 
    ");
     7         exit(1);
     8     }
     9     int child_cnt = atoi(argv[1]);
    10     pNODE arr_child = (pNODE)calloc(child_cnt, sizeof(NODE)) ; /* 动态数组维护子进程池 */
    11     make_child(arr_child, child_cnt);
    12     
    13     int fd_listen, fd_client ;
    14     my_socket(&fd_listen, MY_TCP, SER_IP, SER_PORT);
    15     my_listen(fd_listen, 10);
    16     
    17     fd_set readset, readyset ;
    18     FD_ZERO(&readset);
    19     FD_ZERO(&readyset);
    20     FD_SET(fd_listen, &readset);
    21     int index ;
    22     for(index = 0; index < child_cnt; index ++)
    23     {
    24         FD_SET(arr_child[index].s_sfd, &readset);
    25     }
    26     
    27     int select_ret ;
    28     struct timeval tm ;
    29     while(1)
    30     {
    31         tm.tv_sec = 0 ;
    32         tm.tv_usec = 1000 ;
    33         readyset = readset ;
    34         select_ret = select(1024, &readyset, NULL, NULL, &tm);
    35         if(select_ret == 0)        /* 轮询时间内,所有描述符均没有活动,返回0,继续轮询 */
    36         {
    37             continue ;
    38         }else if(select_ret == -1) /* 信号 */
    39         {
    40             if(errno == EINTR)
    41             {
    42                 continue ;
    43             }else 
    44             {
    45                 exit(1);
    46             }
    47         }else 
    48         {
    49             if(FD_ISSET(fd_listen, &readyset))
    50             {
    51             fd_client = accept(fd_listen, NULL, NULL) ;    
    52             dispatch(arr_child, child_cnt ,fd_client);
    53             close(fd_client);
    54             }
    55             for(index = 0; index < child_cnt; index ++)
    56             {
    57                 if(FD_ISSET(arr_child[index].s_sfd, &readyset))
    58                 {
    59                     int val ;
    60                     read(arr_child[index].s_sfd, &val, 4);
    61                     arr_child[index].s_state = ST_IDLE ;
    62                 }
    63             }
    64             
    65         }
    66         
    67     }   
    68 }

    server.c

      1 #include "server.h"
      2 void make_child(pNODE arr, int cnt)
      3 {
      4     int index ; 
      5     for(index = 0; index < cnt; index ++)
      6     {
      7         pid_t pid ;
      8         int fds[2] ;//fds[0] - c  fds[1] - p
      9         socketpair(AF_LOCAL, SOCK_STREAM, 0, fds);
     10         pid = fork() ;
     11         if(pid == 0)// child
     12         {
     13             close(fds[1]);         /* 子进程用fds[0],关闭fds[1] */
     14             child_main(fds[0]) ;   /* 每创建一个子进程,子进程就进入该函数中(死循环),接收请求,处理请求,如此循环。*/
     15 
     16         }else 
     17         {
     18             /* 初始化进程池队列中的每一个子进程 */
     19             arr[index].s_sfd = fds[1] ;
     20             arr[index].s_state = ST_IDLE ;
     21             close(fds[0]);         /* 父进程用fds[1], 关闭fds[0] */
     22         }
     23 
     24     }
     25 
     26 }
     27 void child_main(int sfd)
     28 {
     29     int fd_client ;
     30     int flag ;
     31     int readn ;
     32     pid_t pid = getpid();
     33     while(1)
     34     {
     35         readn = read(sfd, &flag, 4);
     36         printf("readn: %d 
    ", readn);
     37         printf("read from father: %d 
    ", flag);
     38         recv_fd(sfd, &fd_client);
     39         handle_request(fd_client);
     40         write(sfd, &pid, sizeof(pid));
     41     }
     42 }
     43 void handle_request(int sfd)
     44 {    
     45 
     46     MSG my_msg ;
     47     int recvn ;
     48     while(1)
     49     {
     50         memset(&my_msg, 0, sizeof(MSG));
     51         my_recv(&recvn, sfd, &my_msg, 4);
     52         if(my_msg.msg_len  == 0)
     53         {
     54             break ;
     55         }
     56         my_recv(NULL, sfd, my_msg.msg_buf, my_msg.msg_len);
     57         my_send(NULL, sfd, &my_msg, my_msg.msg_len + 4);
     58 
     59     }
     60 
     61 }
     62 void send_fd(int sfd, int fd_file) 
     63 {
     64     struct msghdr my_msg ;
     65     memset(&my_msg, 0, sizeof(my_msg));
     66     
     67     struct iovec bufs[1] ;
     68     char buf[32] = "hello world ! 
    ";
     69     bufs[0].iov_base = buf ;
     70     bufs[0].iov_len = strlen(buf) ;
     71     
     72     my_msg.msg_name = NULL ;
     73     my_msg.msg_namelen = 0 ;
     74     my_msg.msg_iov = bufs ;
     75     my_msg.msg_iovlen = 1 ;
     76     my_msg.msg_flags = 0 ;
     77 
     78     struct cmsghdr *p  ;
     79     int cmsg_len = CMSG_LEN(sizeof(int)) ;     /* 所传为文件描述符,因此sizeof(int) */
     80     p = (struct cmsghdr*)calloc(1, cmsg_len) ;
     81     p -> cmsg_len = cmsg_len ;
     82     p -> cmsg_level = SOL_SOCKET ;
     83     p -> cmsg_type = SCM_RIGHTS ;
     84     *(int*)CMSG_DATA(p) = fd_file ;
     85     
     86     my_msg.msg_control = p ;
     87     my_msg.msg_controllen = cmsg_len ;
     88     
     89     int sendn ;
     90     sendn = sendmsg(sfd, &my_msg, 0);
     91     printf("send masg len : %d 
    ", sendn);
     92 }
     93 void recv_fd(int sfd, int* fd_file) 
     94 {
     95     struct msghdr my_msg ;
     96     
     97     struct iovec bufs[1] ;
     98     char buf1[32]="" ;
     99     bufs[0].iov_base = buf1 ;
    100     bufs[0].iov_len = 31 ;
    101 
    102     my_msg.msg_name = NULL ;
    103     my_msg.msg_namelen = 0 ;
    104     my_msg.msg_iov = bufs ;
    105     my_msg.msg_iovlen = 2 ;
    106     my_msg.msg_flags = 0 ;
    107     
    108     struct cmsghdr *p  ;
    109     int cmsg_len = CMSG_LEN(sizeof(int)) ;
    110     p = (struct cmsghdr*)calloc(1, cmsg_len) ;
    111     my_msg.msg_control = p ;
    112     my_msg.msg_controllen = cmsg_len ;
    113     
    114     int recvn ;
    115     recvn = recvmsg(sfd, &my_msg, 0);
    116     
    117     *fd_file = *(int*)CMSG_DATA((struct cmsghdr*)my_msg.msg_control); //写成*(int*)CMSG_DATA(P)也可
    118     
    119     printf("buf1: %s, recv msg len : %d   
    ", buf1, recvn);
    120 
    121 }
    122 void dispatch(pNODE arr, int cnt, int fd_client)
    123 {
    124     int index ;
    125     for(index = 0 ; index < cnt; index ++)
    126     {
    127         if(arr[index].s_state == ST_IDLE)
    128         {
    129             write(arr[index].s_sfd, &index, 4);
    130             send_fd(arr[index].s_sfd, fd_client); /* 向空闲的子进程分配任务,将服务器accept返回的socket描述符发送给子进程*/
    131             arr[index].s_state = ST_BUSY ;
    132             break ;
    133         }
    134     }
    135 }

    client.c

     1 #include "my_socket.h"
     2 #define MY_IP "127.0.0.1"
     3 #define MY_PORT 6666
     4 #define SER_IP "127.0.0.1"
     5 #define SER_PORT 8888
     6 #define SIZE 8192
     7 #define MSG_SIZE (SIZE - 4)
     8 typedef struct tag_mag// 
     9 {
    10     int msg_len ;
    11     char msg_buf[MSG_SIZE];//8188
    12 }MSG, *pMSG;
    13 int main(int argc, char* argv[])
    14 {
    15     int sfd ;
    16     my_socket(&sfd, MY_TCP, MY_IP, atoi(argv[1]));
    17     my_connect(sfd, SER_IP, SER_PORT);
    18     MSG my_msg ;
    19     while(memset(&my_msg, 0, sizeof(MSG)), fgets(my_msg.msg_buf, MSG_SIZE, stdin)!= NULL)
    20     {
    21         my_msg.msg_len = strlen(my_msg.msg_buf);
    22         my_send(NULL, sfd, &my_msg, 4 + my_msg.msg_len );
    23         memset(&my_msg, 0, sizeof(MSG));
    24         my_recv(NULL, sfd, &my_msg, 4);
    25         my_recv(NULL, sfd, &my_msg.msg_buf, my_msg.msg_len);
    26         printf("recv from server : %s 
    ", my_msg.msg_buf);
    27     
    28     }
    29     /* 客户端退出时,向服务器发送一个长度为0的消息 ,用于通知服务器退出 */
    30     memset(&my_msg, 0, sizeof(MSG));
    31     my_send(NULL, sfd, &my_msg, 4 + my_msg.msg_len);
    32     close(sfd);
    33 
    34 }
  • 相关阅读:
    django之--模型层(ORM语法)
    mysql问题记录
    CentOS系统内存使用问题(内存是拿来用的,而不是拿来看的)
    CentOS6&CentOS7安装FFmpeg
    django之--模板层
    Django之--视图层
    Django之ORM学习2--路由层
    Django之ORM学习
    Django入门
    第二版mq 数据结构选型
  • 原文地址:https://www.cnblogs.com/DLzhang/p/4020910.html
Copyright © 2011-2022 走看看