zoukankan      html  css  js  c++  java
  • 五十九、linux 编程—— I/O 多路复用 fcntl

    59.1 介绍

      前面介绍的函数如,recv、send、read 和 write 等函数都是阻塞性函数,若资源没有准备好,则调用该函数的进程将进入阻塞状态。我们可以使用 I/O 多路复用来解决此问题(即解决并发)。

    • I/O 多路复用的方式主要有两种实现方法
      • fcntl 函数实现(非阻塞方式)
      • select 函数实现

    59.1.1 fcntl 非阻塞方式——I/O多路复用/转换

      

    59.2 例子

     59.2.1 动态数组模块

     vector_fd.c

     1 #include <malloc.h>
     2 #include <stdio.h>
     3 #include <assert.h>
     4 #include <stdlib.h>
     5 #include <memory.h>
     6 #include "vector_fd.h"
     7 
     8 static void encapacity(vector_fd *vfd)
     9 {
    10     if(vfd->counter >= vfd->max_counter){
    11         int *fds = (int *)calloc(vfd->counter + 5, sizeof(int));
    12         assert(fds != NULL);
    13         memcpy(fds, vfd->fd, sizeof(int) * vfd->counter);
    14         free(vfd->fd);
    15         vfd->fd = fds;
    16         vfd->max_counter += 5;
    17     }
    18 }
    19 
    20 static int indexof(vector_fd *vfd, int fd)
    21 {
    22     int i = 0;
    23     for(; i < vfd->counter; i++){
    24         if(vfd->fd[i] == fd) return i;
    25     }
    26 
    27     return -1;
    28 }
    29 
    30 
    31 vector_fd *create_vector_fd(void)
    32 {
    33     vector_fd *vfd = (vector_fd *)calloc(1, sizeof(vector_fd));
    34     assert(vfd != NULL);
    35 
    36     vfd->fd = (int *)calloc(5, sizeof(int));
    37     assert(vfd->fd != NULL);
    38     vfd->counter = 0;
    39     vfd->max_counter = 0;
    40 
    41     return vfd;
    42 }
    43 
    44 
    45 void destroy_vector_fd(vector_fd *vfd)
    46 {
    47     assert(vfd != NULL);
    48     free(vfd->fd);
    49     free(vfd);
    50 }
    51 
    52 int get_fd(vector_fd *vfd, int index)
    53 {
    54     assert(vfd != NULL);
    55     if(index < 0 || index > vfd->counter - 1) return 0;
    56     
    57     return vfd->fd[index];
    58 }
    59 
    60 void remove_fd(vector_fd *vfd, int fd)
    61 {
    62     assert(vfd != NULL);
    63     int index = indexof(vfd, fd);
    64     if(index == -1) return;
    65     int i = index;
    66     for(; i < vfd->counter - 1; i++){
    67         vfd->fd[i] = vfd->fd[i + 1];
    68     }
    69     vfd->counter--;
    70 }
    71 
    72 void add_fd(vector_fd *vfd, int fd)
    73 {
    74     assert(vfd != NULL);
    75     encapacity(vfd);
    76     vfd->fd[vfd->counter++] = fd;
    77 }

    vector_fd.h

     1 #ifndef __VECTOR_FD_H__
     2 #define __VECTOR_FD_H__
     3 
     4 typedef struct {
     5     int     *fd;
     6     int     counter;
     7     int     max_counter;
     8 }vector_fd;
     9 
    10 extern vector_fd *create_vector_fd(void);
    11 extern void destroy_vector_fd(vector_fd *);
    12 extern int get_fd(vector_fd *, int index);
    13 extern void remove_fd(vector_fd *, int fd);
    14 extern void add_fd(vector_fd *, int fd);
    15 
    16 #endif

      编译成模块:gcc -o obj/vector_fd.o -Iinclude -c src/vector_fd.c

    59.2.2 服务器端

      echo_tcp_server_fcntl.c

      1 #include <netdb.h>
      2 #include <netinet/in.h>
      3 #include <sys/socket.h>
      4 #include <sys/wait.h>
      5 #include <unistd.h>
      6 #include <string.h>
      7 #include <stdio.h>
      8 #include <stdlib.h>
      9 #include <memory.h>
     10 #include <signal.h>
     11 #include <fcntl.h>
     12 #include <time.h>
     13 #include <arpa/inet.h>
     14 #include <errno.h>
     15 #include <pthread.h>
     16 #include "vector_fd.h"
     17 
     18 vector_fd *vfd;
     19 int sockfd;
     20 
     21 void sig_handler(int signo)
     22 {
     23     if(signo == SIGINT){
     24         printf("server close
    ");
     25         /** 步骤6: 关闭 socket */
     26         close(sockfd);
     27         /** 销毁动态数组 */
     28         destroy_vector_fd(vfd);
     29         exit(1);
     30     }
     31 }
     32 
     33 /**
     34  * fd 对应于某个连接的客户端,和某一个连接的客户端进行双向通信(非阻塞方式)
     35  */
     36 void do_service(int fd)
     37 {
     38     char buff[512];
     39     memset(buff, 0, sizeof(buff));
     40 
     41     /** 
     42      *  因为采用非阻塞方式,若读不到数据直接返回,
     43      *  直接服务于下一个客户端,
     44      *  因此不需要判断 size < 0 的情况 */
     45     ssize_t size = read(fd, buff, sizeof(buff));
     46     
     47     if(size == 0){
     48         /** 客户端已经关闭连接 */
     49         char info[] = "client closed";
     50         write(STDOUT_FILENO, info, sizeof(info));
     51         /** 从动态数组中删除对应的 fd */
     52         remove_fd(vfd, fd);
     53         /** 关闭对应客户端的 socket */
     54         close(fd);
     55     }
     56     else if(size > 0){
     57         write(STDOUT_FILENO, buff, sizeof(buff));
     58         if(write(fd, buff, size) < 0){
     59             if(errno == EPIPE){
     60                 /** 客户端关闭连接 */
     61                 perror("write error");
     62                 remove_fd(vfd, fd);
     63                 close(fd);
     64             }
     65             perror("protocal error");
     66         }
     67     }
     68 }
     69 
     70 void out_addr(struct sockaddr_in *clientaddr)
     71 {
     72     char ip[16];
     73     memset(ip, 0, sizeof(ip));
     74     int port = ntohs(clientaddr->sin_port);
     75     inet_ntop(AF_INET, &clientaddr->sin_addr.s_addr, ip, sizeof(ip));
     76     printf("%s(%d) connected!
    ", ip, port);
     77 }
     78 
     79 void *th_fn(void *arg)
     80 {
     81     int i;
     82     while(1){
     83         i = 0;
     84         /** 遍历动态数组中的 socket 描述符 */
     85         for(; i < vfd->counter; i++){
     86             do_service(get_fd(vfd, i));
     87         }
     88     }
     89 
     90     return (void *)0;
     91 }
     92 
     93 int main(int argc, char *argv[])
     94 {
     95     if(argc < 2){
     96         printf("usage: %s #port
    ", argv[0]);
     97         exit(1);
     98     }
     99 
    100     if(signal(SIGINT, sig_handler) == SIG_ERR){
    101         perror("signal sigint error");
    102         exit(1);
    103     }
    104 
    105 
    106     /** 步骤1: 创建 socket(套接字) 
    107      *  注: socket 创建在内核中,是一个结构体.
    108      *  AF_INET: IPV4
    109      *  SOCK_STREAM: tcp 协议
    110      *  AF_INET6: IPV6
    111      */
    112     sockfd = socket(AF_INET, SOCK_STREAM, 0);
    113     if(sockfd < 0){
    114         perror("socket error");
    115         exit(1);
    116     }
    117 
    118     /** 
    119      * 步骤2: 调用 bind 函数将 socket 和地址(包括 ip、port)进行绑定
    120      */
    121     struct sockaddr_in  serveraddr;
    122     memset(&serveraddr, 0, sizeof(struct sockaddr_in));
    123     /** 往地址中填入 ip、port、internet 地址族类型 */
    124     serveraddr.sin_family = AF_INET;    ///< IPV4
    125     serveraddr.sin_port = htons(atoi(argv[1])); ///< 填入端口
    126     serveraddr.sin_addr.s_addr = INADDR_ANY; ///< 填入 IP 地址
    127     if(bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr_in))){
    128         perror("bind error");
    129         exit(1);
    130     }
    131 
    132     /**
    133      *  步骤3: 调用 listen 函数启动监听(指定 port 监听)
    134      *         通知系统去接受来自客户端的连接请求
    135      *         (将接受到的客户端连接请求放置到对应的队列中)
    136      *  第二个参数: 指定队列的长度
    137      */
    138     if(listen(sockfd, 10) < 0){
    139         perror("listen error");
    140         exit(1);
    141     }
    142 
    143     /** 创建放置套接字描述符 fd 的动态数组 */
    144     vfd = create_vector_fd();
    145 
    146     /** 设置线程的分离属性 */
    147     pthread_t th;
    148     pthread_attr_t attr;
    149     pthread_attr_init(&attr);
    150     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
    151     int err;
    152     if((err = pthread_create(&th, &attr, th_fn, (void *)0)) != 0){
    153         perror("pthread create error");
    154         exit(1);
    155     }
    156     pthread_attr_destroy(&attr);
    157 
    158     /** 
    159      * 1)主控线程获得客户端的链接,将新的 socket 描述符放置到动态数组中
    160      * 2)启动的子线程负责遍历动态数组中 socket
    161      *   描述符,并和对应的客户端进行双向通信(采用非阻塞方式读写)
    162      */
    163     struct sockaddr_in clientaddr;
    164     socklen_t len = sizeof(clientaddr);
    165 
    166     while(1){
    167         /**
    168          *  步骤4: 调用 accept 函数从队列中获得一个客户端的请求连接, 并返回新的
    169          *         socket 描述符
    170          *  注意:  若没有客户端连接,调用此函数后会阻塞, 直到获得一个客户端的连接
    171          */
    172         /** 主控线程负责调用 accept 去获得客户端的连接 */
    173         int fd = accept(sockfd, (struct sockaddr *)&clientaddr, &len);
    174         if(fd < 0){
    175             perror("accept error");
    176             continue;
    177         }
    178 
    179         out_addr(&clientaddr);
    180 
    181         /** 将读写修改为非阻塞方式 */
    182         int val;
    183         fcntl(fd, F_GETFL, &val);   ///< 获取原来的状态标志
    184         val |= O_NONBLOCK;
    185         fcntl(fd, F_SETFL, val);    ///< 添加新的状态标志
    186 
    187         /** 将返回的新的 socket 描述符加入到动态数组中 */
    188         add_fd(vfd, fd);
    189 
    190     }
    191 
    192     return 0;
    193 }

      编译:

      gcc -o bin/echo_tcp_server_fcntl -Iinclude obj/vector_fd.o src/echo_tcp_server_fcntl.c -lpthread

    59.2.3 客户端

      echo_tcp_client_fcntl.c

     1 #include <sys/types.h>
     2 #include <stdlib.h>
     3 #include <stdio.h>
     4 #include <memory.h>
     5 #include <unistd.h>
     6 #include <sys/socket.h>
     7 #include <netdb.h>
     8 #include <signal.h>
     9 #include <string.h>
    10 #include <time.h>
    11 #include <arpa/inet.h>
    12 
    13 
    14 int main(int argc, char *argv[])
    15 {
    16     if(argc < 3){
    17         printf("usage: %s ip port
    ", argv[0]);
    18         exit(1);
    19     }
    20 
    21     /** 步骤1: 创建 socket */
    22     int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    23     if(sockfd < 0){
    24         perror("socket error");
    25         exit(1);
    26     }
    27 
    28     /** 往 serveraddr 中填入 ip、port 和地址族类型(ipv4) */
    29     struct sockaddr_in serveraddr;
    30     memset(&serveraddr, 0, sizeof(struct sockaddr_in));
    31     serveraddr.sin_family = AF_INET;
    32     serveraddr.sin_port = htons(atoi(argv[2]));
    33     /** 将 ip 地址转换成网络字节序后填入 serveraddr 中  */
    34     inet_pton(AF_INET, argv[1], &serveraddr.sin_addr.s_addr);
    35 
    36     /**
    37      *  步骤2: 客户端调用 connect 函数连接到服务器端
    38      */
    39     if(connect(sockfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr_in)) < 0){
    40         perror("connect error");
    41         exit(1);
    42     }
    43 
    44     /** 步骤3: 调用 IO 函数(read/write)和服务器端进行双向通信 */
    45     char buff[512];
    46     ssize_t size;
    47     char *prompt = "==>";
    48     while(1){
    49         memset(buff, 0, sizeof(buff));
    50         write(STDOUT_FILENO, prompt, 3);
    51         size = read(STDIN_FILENO, buff, sizeof(buff));
    52         if(size < 0) continue;
    53         buff[size - 1] = '';
    54 
    55         if(write(sockfd, buff, sizeof(buff)) < 0){
    56             perror("write msg error");
    57             continue;
    58         }
    59         else {
    60             if(read(sockfd, buff, sizeof(buff)) < 0){
    61                 perror("read msg error");
    62                 continue;
    63             }
    64             else {
    65                 printf("%s
    ", buff);
    66             }
    67         }
    68     }
    69 
    70     /** 步骤4: 关闭 socket */
    71     close(sockfd);
    72 
    73     return 0;
    74 }

      编译:gcc -o bin/echo_tcp_client_fcntl src/echo_tcp_client_fcntl.c

    59.2.4 测试运行

      

  • 相关阅读:
    Scrapy中间件
    Scrapy简介
    Scrapy解析器xpath
    postman
    yarn
    brew 安装 yarn 时候失败
    immutability-helper 用途+使用方法
    js 正则
    react redux 应用链接
    react 事件传参数
  • 原文地址:https://www.cnblogs.com/kele-dad/p/10513656.html
Copyright © 2011-2022 走看看