zoukankan      html  css  js  c++  java
  • 应用层协议实现系列(一)——HTTPserver之仿nginx多进程和多路IO的实现

    近期在尝试自己写一个Httpserver,在粗略研究了nginx的代码之后,决定仿照nginx中的部分设计自己实现一个高并发的HTTPserver,在这里分享给大家。

    眼下使用的较多的Httpserver就是apache和nginx,apache的主要特点就是稳定,而nginx的主要特点是承载的并发量高。在这里从实现原理上做一个分析:

    apache採用的是多进程server模型,即server每监听到一个连接时,会创建一个新的进程去处理连接,进程与进程之间是独立的,因此就算进程在处理连接的过程中崩溃了,也不会影响其它进程的执行。但因为server能创建的进程数目与内存有关,因此server的最大并发数会受到机器内存的影响,同一时候假设有人发起大量恶意长链接攻击,就会导致server超载。

    nginx採用的是多路IO复用server模型,即server每监听到一个连接时,会将连接增加到连接数组中,使用epoll多路IO复用函数对每一个连接进行监听,当连接中有数据到来时,epoll会返回对应的连接,依此对各个连接进行处理就可以。epoll的最大连接数量尽管也会受到内存的影响,但因为每一个未激活的连接占用的内存非常小,所以相比于apache能够承载更大的并发。

    但因为多路IO复用是在一个进程中进行的,所以假设在处理连接的过程中崩溃了,其它连接也会受到影响。为了解决问题,nginx中也引入了多进程,即nginxserver由一个master进程和多个worker进程组成。master进程作为父进程在启动的时候会创建socket套接字以及若干个worker进程,每一个worker进程会使用epoll对master创建的套接字进行监听。当有新连接到来时,若干个worker进程的epoll函数都会返回,但仅仅有一个worker进程能够accept成功。该进程accept成功之后将该连接增加到epoll的监听数组中,该连接之后的数据都将由该worker进程处理。假设当中一个worker进程在处理连接的过程中崩溃了,父进程会收到信号并重新启动该进程以保证server的稳定性。

    另外,每次新连接到来都会唤醒若干个worker进程同一时候进行accept,但仅仅有一个worker能accept成功,为了避免这个问题,nginx引入了相互排斥信号量,即每一个worker进程在accept之前都须要先获取锁,假设获取不到则放弃accept。

    在明白了上述原理之后,我们就能够仿照nginx实现一个httpserver了。首先是创建套接字的函数:

    //创建socket
    int startup(int port) {
        struct sockaddr_in servAddr;
        memset(&servAddr, 0, sizeof(servAddr));
        //协议域(ip地址和端口)
        servAddr.sin_family = AF_INET;
        //绑定默认网卡
        servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
        //端口
        servAddr.sin_port = htons(port);
        int listenFd;
        //创建套接字
        if ((listenFd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
            printf("create socket error: %s(errno: %d)
    ",strerror(errno),errno);
            return 0;
        }
        unsigned value = 1;
        setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value));
        //绑定套接字
        if (bind(listenFd, (struct sockaddr *)&servAddr, sizeof(servAddr))) {
            printf("bind socket error: %s(errno: %d)
    ",strerror(errno),errno);
            return 0;
        }
        //開始监听,设置最大连接请求
        if (listen(listenFd, 10) == -1) {
            printf("listen socket error: %s(errno: %d)
    ",strerror(errno),errno);
            return 0;
        }
        return listenFd;
    }

    该函数创建了一个套接字并将其绑定到了一个port上開始监听。因为我们接下来要创建若干个worker进程,能够通过fork函数实现:

    //管理子进程的数组,数组多大就有几个子进程
    static int processArr[PROCESS_NUM];
    //创建若干个子进程,返回当前进程是否父进程
    bool createSubProcess() {
        for (int i=0; i<GET_ARRAY_LEN(processArr); i++) {
            int pid = fork();
            //假设是子进程,返回0
            if (pid == 0) {
                return false;
            }
            //假设是父进程,继续fork
            else if (pid >0){
                processArr[i] = pid;
                continue;
            }
            //假设出错
            else {
                fprintf(stderr,"can't fork ,error %d
    ",errno);
                return true;
            }
        }
        return true;
    }

    在以上代码中,创建的进程数目由数组大小决定,建议将该进程数目设置为CPU的核数,以充分利用多核CPU。为了避免在父进程退出后,子进程仍然存在产生僵尸进程,我们还须要实现一个信号处理函数:

    //信号处理
    void handleTerminate(int signal) {
        for (int i=0; i<GET_ARRAY_LEN(processArr); i++) {
            kill(processArr[i], SIGTERM);
        }
        exit(0);
    }

    该函数实现了当父进程收到退出信号时,向每一个子进程也发送退出信号。以下来看看main函数的实现,因为本人是在mac os下进行开发,mac下不支持epoll函数,于是改为类似的select函数:

    int main(int argc, const char * argv[])
    {
        int listenFd;
        
        initMutex();
        //设置port号
        listenFd = startup(8080);
        
        //创建若干个子进程
        bool isParent = createSubProcess();
        //假设是父进程
        if (isParent) {
            while (1) {
                //注冊信号处理
                signal(SIGTERM, handleTerminate);
                //挂起等待信号
                pause();
            }
        }
        //假设是子进程
        else {
            //套接字集合
            fd_set rset;
            //最大套接字
            int maxFd = listenFd;
            std::set<int> fdArray;
            //循环处理事件
            while (1) {
                FD_ZERO(&rset);
                FD_SET(listenFd, &rset);
                //又一次设置每一个须要监听的套接字
                for (std::set<int>::iterator iterator=fdArray.begin();iterator!=fdArray.end();iterator++) {
                    FD_SET(*iterator, &rset);
                }
                //開始监听
                if (select(maxFd+1, &rset, NULL, NULL, NULL)<0) {
                    fprintf(stderr, "select error: %s(errno: %d)
    ",strerror(errno),errno);
                    continue;
                }
                
                //遍历每一个连接套接字
                for (std::set<int>::iterator iterator=fdArray.begin();iterator!=fdArray.end();) {
                    int currentFd = *iterator;
                    if (FD_ISSET(currentFd, &rset)) {
                        if (!handleRequest(currentFd)) {
                            close(currentFd);
                            fdArray.erase(iterator++);
                            continue;
                        }
                    }
                    ++iterator;
                }
                //检查连接监听套接字
                if (FD_ISSET(listenFd, &rset)) {
                    if (pthread_mutex_trylock(mutex)==0) {
                        int newFd = accept(listenFd, (struct sockaddr *)NULL, NULL);
                        if (newFd<=0) {
                            fprintf(stderr, "accept socket error: %s(errno: %d)
    ",strerror(errno),errno);
                            continue;
                        }
                        //更新最大的套接字
                        if (newFd>maxFd) {
                            maxFd = newFd;
                        }
                        fdArray.insert(newFd);
                        pthread_mutex_unlock(mutex);
                    }
                }
            }
        }
    
        close(listenFd);
        return 0;
    }

    在以上代码中,还涉及了进程间相互排斥信号量的定义,代码例如以下:

    //相互排斥量
    pthread_mutex_t *mutex;
    //创建共享的mutex
    void initMutex()
    {
        //设置相互排斥量为进程间共享
        mutex=(pthread_mutex_t*)mmap(NULL, sizeof(pthread_mutex_t), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANON, -1, 0);
        if( MAP_FAILED==mutex) {
            perror("mutex mmap failed");
            exit(1);
        }
        //设置attr的属性
        pthread_mutexattr_t attr;
        pthread_mutexattr_init(&attr);
        int ret = pthread_mutexattr_setpshared(&attr,PTHREAD_PROCESS_SHARED);
        if(ret != 0) {
            fprintf(stderr, "mutex set shared failed");
            exit(1);
        }
        pthread_mutex_init(mutex, &attr);
    }

    对每一个连接的处理例如以下:

    //处理http请求
    bool handleRequest(int connFd) {
        if (connFd<=0) return false;
        //读取缓存
        char buff[4096];
        //读取http header
        int len = (int)recv(connFd, buff, sizeof(buff), 0);
        if (len<=0) {
            return false;
        }
        buff[len] = '';
        std::cout<<buff<<std::endl;
        
        return true;
    }

    这样就实现了一个仿nginx的高并发server。完整的代码例如以下:

    #include <iostream>
    #include <set>
    #include <signal.h>
    #include <sys/select.h>
    #include <unistd.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <fcntl.h>
    
    #include <sys/mman.h>
    #include <pthread.h>
    
    #define GET_ARRAY_LEN(array) (sizeof(array) / sizeof(array[0]))
    #define PROCESS_NUM 4
    
    //创建socket
    int startup(int port) {
        struct sockaddr_in servAddr;
        memset(&servAddr, 0, sizeof(servAddr));
        //协议域(ip地址和端口)
        servAddr.sin_family = AF_INET;
        //绑定默认网卡
        servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
        //端口
        servAddr.sin_port = htons(port);
        int listenFd;
        //创建套接字
        if ((listenFd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
            printf("create socket error: %s(errno: %d)
    ",strerror(errno),errno);
            return 0;
        }
        unsigned value = 1;
        setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value));
        //绑定套接字
        if (bind(listenFd, (struct sockaddr *)&servAddr, sizeof(servAddr))) {
            printf("bind socket error: %s(errno: %d)
    ",strerror(errno),errno);
            return 0;
        }
        //開始监听,设置最大连接请求
        if (listen(listenFd, 10) == -1) {
            printf("listen socket error: %s(errno: %d)
    ",strerror(errno),errno);
            return 0;
        }
        return listenFd;
    }
    
    //管理子进程的数组,数组多大就有几个子进程
    static int processArr[PROCESS_NUM];
    //创建若干个子进程,返回当前进程是否父进程
    bool createSubProcess() {
        for (int i=0; i<GET_ARRAY_LEN(processArr); i++) {
            int pid = fork();
            //假设是子进程,返回0
            if (pid == 0) {
                return false;
            }
            //假设是父进程,继续fork
            else if (pid >0){
                processArr[i] = pid;
                continue;
            }
            //假设出错
            else {
                fprintf(stderr,"can't fork ,error %d
    ",errno);
                return true;
            }
        }
        return true;
    }
    
    //信号处理
    void handleTerminate(int signal) {
        for (int i=0; i<GET_ARRAY_LEN(processArr); i++) {
            kill(processArr[i], SIGTERM);
        }
        exit(0);
    }
    
    //处理http请求
    bool handleRequest(int connFd) {
        if (connFd<=0) return false;
        //读取缓存
        char buff[4096];
        //读取http header
        int len = (int)recv(connFd, buff, sizeof(buff), 0);
        if (len<=0) {
            return false;
        }
        buff[len] = '';
        std::cout<<buff<<std::endl;
        
        return true;
    }
    
    //相互排斥量
    pthread_mutex_t *mutex;
    //创建共享的mutex
    void initMutex()
    {
        //设置相互排斥量为进程间共享
        mutex=(pthread_mutex_t*)mmap(NULL, sizeof(pthread_mutex_t), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANON, -1, 0);
        if( MAP_FAILED==mutex) {
            perror("mutex mmap failed");
            exit(1);
        }
        //设置attr的属性
        pthread_mutexattr_t attr;
        pthread_mutexattr_init(&attr);
        int ret = pthread_mutexattr_setpshared(&attr,PTHREAD_PROCESS_SHARED);
        if(ret != 0) {
            fprintf(stderr, "mutex set shared failed");
            exit(1);
        }
        pthread_mutex_init(mutex, &attr);
    }
    
    int main(int argc, const char * argv[])
    {
        int listenFd;
        
        initMutex();
        //设置端口号
        listenFd = startup(8080);
        
        //创建若干个子进程
        bool isParent = createSubProcess();
        //假设是父进程
        if (isParent) {
            while (1) {
                //注冊信号处理
                signal(SIGTERM, handleTerminate);
                //挂起等待信号
                pause();
            }
        }
        //假设是子进程
        else {
            //套接字集合
            fd_set rset;
            //最大套接字
            int maxFd = listenFd;
            std::set<int> fdArray;
            //循环处理事件
            while (1) {
                FD_ZERO(&rset);
                FD_SET(listenFd, &rset);
                //又一次设置每一个须要监听的套接字
                for (std::set<int>::iterator iterator=fdArray.begin();iterator!=fdArray.end();iterator++) {
                    FD_SET(*iterator, &rset);
                }
                //開始监听
                if (select(maxFd+1, &rset, NULL, NULL, NULL)<0) {
                    fprintf(stderr, "select error: %s(errno: %d)
    ",strerror(errno),errno);
                    continue;
                }
                
                //遍历每一个连接套接字
                for (std::set<int>::iterator iterator=fdArray.begin();iterator!=fdArray.end();) {
                    int currentFd = *iterator;
                    if (FD_ISSET(currentFd, &rset)) {
                        if (!handleRequest(currentFd)) {
                            close(currentFd);
                            fdArray.erase(iterator++);
                            continue;
                        }
                    }
                    ++iterator;
                }
                //检查连接监听套接字
                if (FD_ISSET(listenFd, &rset)) {
                    if (pthread_mutex_trylock(mutex)==0) {
                        int newFd = accept(listenFd, (struct sockaddr *)NULL, NULL);
                        if (newFd<=0) {
                            fprintf(stderr, "accept socket error: %s(errno: %d)
    ",strerror(errno),errno);
                            continue;
                        }
                        //更新最大的套接字
                        if (newFd>maxFd) {
                            maxFd = newFd;
                        }
                        fdArray.insert(newFd);
                        pthread_mutex_unlock(mutex);
                    }
                }
            }
        }
    
        close(listenFd);
        return 0;
    }
    

    下一篇文章仿nginx Httpserver的设计与实现(二)——http协议解析中,将向大家说明怎样对http协议进行解析。


    假设大家认为对自己有帮助的话,还希望能帮顶一下,谢谢:)
    转载请注明出处,谢谢!

  • 相关阅读:
    fstab是什么?被谁用?怎么写?
    一个驱动导致的内存泄漏问题的分析过程(meminfo->pmap->slabtop->alloc_calls)
    Ubuntu下doxygen+graphviz使用概录
    记录Ubuntu下使用docker使用
    hidraw设备简要分析
    一个版本烧录过程中记录:fdisk、mkfs.ext4、make_ext4fs、img2simg、simg2img
    bootrom/spl/uboot/linux逐级加载是如何实现的?
    Linux uevent分析、用户接收uevent以及mdev分析
    sched_yield()和nanosleep()对进程调度的影响
    Linux Thermal Framework分析及实施
  • 原文地址:https://www.cnblogs.com/mengfanrong/p/3790328.html
Copyright © 2011-2022 走看看