zoukankan      html  css  js  c++  java
  • 基于管道通知的百万并发长连接server模型

    0、前言

    最近突然想了解怎样设计一个支持百万连接的后台server架构。

    要设计一个支持百万连接的后台server,我们首先要知道会有哪些因素限制后台server的高并发连接,这里想到的因素有以下几点:

    1、操作系统的参数设置能否支持百万并发连接;

    2、操作系统维持百万并发长连接需要多少内存;

    3、应用层面上维持百万并发长连接需要多少内存;

    4、百万并发长连接的吞吐量是否超过了硬件网卡的限制。

    在学习的过程中,主要针对的是1、2、4,第3点一般跟业务相关,这里暂时没有考虑。

    本篇文章估计需要多次才能完成,现在初步的想法是先写一个demo程序,然后后面再慢慢测试优化。

    1、后台设计

    1.1 后台设计图

    如下为后台的设计结构:

    1、首先主进程根据机器CPU个数,创建对应数量的管道;

    2、创建完对应的管道之后,再创建一样数量的线程,每个线程绑定一个CPU;

    3、主进程开始初始化socket,然后accept,当接收到一个客户端连接时,就把conn_fd写到某个pipe中;

    3、每个线程创建epoll,然后监听对应pipe的写端fd,当监听到pipe中有数据时,就读取该数据,格式化为fd,将该fd加入epoll进行监听。

    1.2 编码实现

    根据1.1的设计,我们编写代码,包括server模块和worker模块。server模块负责创建pipe、线程、和监听客户端连接;worker模块负责处理每个客户端的连接。代码如下所示:

    1.2.0 common

     1 #ifndef _SERV_COMMON_H
     2 #define _SERV_COMMON_H
     3 
     4 typedef struct {
     5     int id;
     6     int fd;
     7 } thread_arg;
     8 
     9 #define SERV_PORT 9876
    10 #define MAX_LINE  1024
    11 
    12 #endif
    View Code

    1.2.1 worker

    worker.h

    1 #ifndef _SERV_WORKER_H
    2 #define _SERV_WORKER_H
    3 
    4 void *worker(void *arg);
    5 
    6 #endif
    View Code

    worker.cc

      1 #include <errno.h>
      2 #include <fcntl.h>
      3 #include <stdio.h>
      4 #include <stdlib.h>
      5 #include <string.h>
      6 #include <unistd.h>
      7 #include <sched.h>
      8 #include <pthread.h>
      9 #include <sys/epoll.h>
     10 #include <sys/types.h>
     11 #include <sys/socket.h>
     12 
     13 #include "common.h"
     14 
     15 #define MAXFDS 1000000
     16 #define EVENTSIZE 1000
     17 
     18 int taskset_thread_core(int core_id)
     19 {
     20     cpu_set_t cpuset;
     21     CPU_ZERO(&cpuset);
     22     CPU_SET(core_id, &cpuset);
     23 
     24     pthread_t curr_tid = pthread_self();
     25     return pthread_setaffinity_np(curr_tid, sizeof(cpu_set_t), &cpuset);
     26 }
     27 
     28 int setnonblocking(int fd)
     29 {
     30     if (fcntl(fd, F_SETFL, fcntl(fd, F_GETFD, 0) | O_NONBLOCK) == -1) {
     31         printf("fd %d set non blocking failed
    ", fd);
     32         return -1;
     33     }
     34 
     35     return 0;
     36 }
     37 
     38 void handle_req(int cli_fd)
     39 {
     40     char in_buff[MAX_LINE];
     41     int ret, rs = 1;
     42 
     43     while (rs) {
     44         ret = recv(cli_fd, in_buff, 1024, 0);
     45 
     46         if (ret < 0) {
     47             if (errno == EAGAIN) {
     48                 printf("EAGAIN
    ");
     49                 break;
     50             } else {
     51                 printf("recv error: %d
    ", errno);
     52                 close(cli_fd);
     53                 break;
     54             }
     55         } else if (ret == 0) {
     56             rs = 0;
     57         }
     58 
     59         if (ret == sizeof(in_buff))
     60             rs = 1;
     61         else
     62             rs = 0;
     63     }
     64 
     65     if (ret > 0) {
     66         send(cli_fd, in_buff, strlen(in_buff), 0);
     67     }
     68 }
     69 
     70 void run_epoll(int epfd, int pipe_fd)
     71 {
     72     int i, cli_fd, nfds;
     73     struct epoll_event ev, events[EVENTSIZE];
     74     char buff[16];
     75 
     76     ev.events = EPOLLIN | EPOLLET;
     77 
     78     while (1) {
     79         nfds = epoll_wait(epfd, events, EVENTSIZE , -1);
     80         for (i = 0; i < nfds; i++) {
     81             // pipe msg, add connected fd to epoll
     82             if (events[i].data.fd == pipe_fd) {
     83                 read(pipe_fd, buff, 16);
     84                 cli_fd = atoi(buff);
     85                 setnonblocking(cli_fd);
     86                 ev.data.fd = cli_fd;
     87 
     88                 if (epoll_ctl(epfd, EPOLL_CTL_ADD, cli_fd, &ev) < 0) {
     89                     printf("epoll add fd %d failed
    ", cli_fd);
     90                 }
     91             } else {  // socket msg
     92                 cli_fd = events[i].data.fd;
     93                 handle_req(cli_fd);
     94             }
     95         }
     96     }
     97 }
     98 
     99 void *worker(void *arg)
    100 {
    101     int epfd, pipe_fd;
    102     struct epoll_event ev;
    103 
    104     taskset_thread_core(((thread_arg*) arg)->id);
    105     
    106     pipe_fd = ((thread_arg*) arg)->fd;
    107     epfd = epoll_create(MAXFDS);
    108     setnonblocking(pipe_fd);
    109     ev.data.fd = pipe_fd;
    110     ev.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
    111     if (epoll_ctl(epfd, EPOLL_CTL_ADD, pipe_fd, &ev) < 0) {
    112         printf("epoll add mq fail
    ");
    113     }
    114 
    115     run_epoll(epfd, pipe_fd);
    116 
    117     return 0;
    118 }
    View Code

    1.2.2 server

     1 #include <stdio.h>
     2 #include <stdlib.h>
     3 #include <string.h>
     4 #include <unistd.h>
     5 #include <sched.h>
     6 #include <pthread.h>
     7 #include <arpa/inet.h>
     8 #include <sys/socket.h>
     9 
    10 #include "common.h"
    11 #include "worker.h"
    12 
    13 int start_listen()
    14 {
    15     int fd, opt = 1;
    16     struct sockaddr_in servaddr;
    17 
    18     bzero(&servaddr, sizeof(servaddr));
    19     servaddr.sin_family = AF_INET;
    20     servaddr.sin_port = htons(SERV_PORT);
    21     servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    22 
    23     if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
    24         printf("open socket failed!
    ");
    25         exit(1);
    26     }
    27 
    28     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
    29 
    30     if ((bind(fd, (struct sockaddr*) &servaddr, sizeof(servaddr))) < 0) {
    31         printf("bind failed!
    ");
    32         exit(1);
    33     }
    34 
    35     if (listen(fd, SOMAXCONN) < 0) {
    36         printf("listen failed!
    ");
    37         exit(1);
    38     }
    39 
    40     return fd;
    41 }
    42 
    43 int main(int argc, char **argv)
    44 {
    45     int i, num_cores, listen_fd, cli_fd;
    46     char name[32];
    47 
    48     listen_fd = start_listen();
    49 
    50     num_cores = sysconf(_SC_NPROCESSORS_ONLN);
    51     printf("core num: %d
    ", num_cores);
    52 
    53     int pipe_fd[num_cores][2];
    54     thread_arg targ[num_cores];
    55     pthread_t tid[num_cores];
    56     pthread_attr_t attr;
    57 
    58     if (pthread_attr_init(&attr) != 0) {
    59         perror("pthrad attr init error: ");
    60         exit(1);
    61     }
    62 
    63     if (pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0) {
    64         perror("pthread set attr detached error: ");
    65         exit(1);
    66     }
    67 
    68     for (i = 0; i < num_cores; i++) {
    69         pipe(pipe_fd[i]);
    70         targ[i] = (thread_arg) {i, pipe_fd[i][0]};
    71 
    72         if (pthread_create(&tid[i], &attr, worker, &targ[i]) != 0) {
    73             perror("pthread create error: ");
    74             exit(1);
    75         }
    76     }
    77 
    78     pthread_attr_destroy(&attr);
    79     sleep(2);
    80     printf("server started
    
    ");
    81 
    82     while ((cli_fd = accept(listen_fd, NULL, NULL)) > 0) {
    83         sprintf(name, "%d", cli_fd);
    84         i = cli_fd % num_cores;
    85         write(pipe_fd[i][1], name, strlen(name));
    86     }
    87 
    88     close(listen_fd);
    89 
    90     for (i = 0; i < num_cores; i++) {
    91         close(pipe_fd[i][1]);
    92     }
    93 
    94     return 0;
    95 }
    View Code

     写完后台代码之后,开始测试能支持多少连接,但测试过程中一直有问题,会报如下的错误:error: Cannot assign requested address。

    google了一下,说是因为短时间内大量短连接造成TIME_WAIT耗尽端口问题,不明白我的测试代码怎么是短连接,而不是长连接。

    我的客户端代码如下,不知道是哪里出问题了。

    #include <unistd.h>
    #include <arpa/inet.h>
    #include <sys/socket.h>
    
    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <errno.h>
    
    void process_conn_svr(const char *svr_ip, int svr_port);
    
    int connections = 0;
    
    #define MAX_CONN 1005000
    int fd[MAX_CONN];
     
    int main(int argc, char **argv) 
    {
        if (argc <= 2) { 
            printf("usage: %s ip port
    ", argv[0]);
            exit(0);
        }
        const char *ip = argv[1];
        int port = atoi(argv[2]);
    
       
        pid_t pid = fork(); 
        if (pid == 0) { 
            process_conn_svr(ip, port);
        }
    
        const char buf[] = "keepalive!";
        for (;;) {
            usleep(1*1000);
            for (int i = 0; i < MAX_CONN; ++i) {
                if (fd[i] != 0) { 
                    send(fd[i], buf, sizeof(buf), 0);
                }       
            }       
        }
        return 0;   
        
    }
    
    void process_conn_svr(const char *svr_ip, int svr_port)
    {
        int conn_idx = 0;
         for (;;) {
            struct sockaddr_in serv_addr;
            bzero(&serv_addr, sizeof(serv_addr));
            serv_addr.sin_family = AF_INET;
            inet_pton(AF_INET, svr_ip, &serv_addr.sin_addr);
    
            serv_addr.sin_port = htons(svr_port);
            int cli_fd = socket(AF_INET, SOCK_STREAM, 0);
            if (cli_fd == -1) {
                goto sock_err;
            }
    
            if (connect(cli_fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -1) {
                goto sock_err;
            }
    
            fd[conn_idx] = cli_fd;
            conn_idx++;
    
            connections++;
            printf("connections: %d, fd: %d
    ", connections, cli_fd);
    
            if (connections % 10000 == 9999) {
                printf("press Enter to continue: ");
                getchar();
            }
            usleep(1*1000);
        }
    
    sock_err:
        printf("error: %s
    ", strerror(errno));
    }
    View Code
  • 相关阅读:
    5804: 最大子序和(单调队列)
    5801: 七夕祭(贪心)
    5920: 喷水装置(贪心)
    5924: 加工生产调度(贪心)
    5929: 家庭作业(贪心+并查集)
    H1N1's Problem(费马小定理+快速幂)
    欧拉筛法求素数
    Cube Stacking(并查集加递归)
    写2个线程,一个打印1-52,一个打印A-Z,打印顺序是12A34B。。。(采用同步代码块和同步方法两种同步方法)
    java创建多线程的三种方式
  • 原文地址:https://www.cnblogs.com/i4oolish/p/3970402.html
Copyright © 2011-2022 走看看