zoukankan      html  css  js  c++  java
  • Linux中epoll+线程池实现高并发

    服务器并发模型通常可分为单线程和多线程模型,这里的线程通常是指“I/O线程”,即负责I/O操作,协调分配任务的“管理线程”,而实际的请求和任务通常交由所谓“工作者线程”处理。通常多线程模型下,每个线程既是I/O线程又是工作者线程。所以这里讨论的是,单I/O线程+多工作者线程的模型,这也是最常用的一种服务器并发模型。我所在的项目中的server代码中,这种模型随处可见。它还有个名字,叫“半同步/半异步“模型,同时,这种模型也是生产者/消费者(尤其是多消费者)模型的一种表现。

    这种架构主要是基于I/O多路复用的思想(主要是epoll,select/poll已过时),通过单线程I/O多路复用,可以达到高效并发,同时避免了多线程I/O来回切换的各种开销,思路清晰,易于管理,而基于线程池的多工作者线程,又可以充分发挥和利用多线程的优势,利用线程池,进一步提高资源复用性和避免产生过多线程。

    瓶颈在于IO密集度。
    线程池你开10个线程当然可以一上来全部accept阻塞住,这样客户端一连上来便会自动激活一个线程去处理,但是设想一下,如果10个线程全部用掉了,第11个客户端就会发生丢弃。这样为了实现”高并发“你得不断加大线程池的数量。这样会带来严重的内存占用和线程切换的时延问题。
    于是前置事件轮询设施的方案就应运而生了,
    主线程轮询负责IO,作业交给线程池。
    在高并发下,10W个客户端上来,就主线程负责accept,放到队列中,不至于发生没有及时握手而丢弃掉连接的情况发生,而作业线程从队列中认领作业,做完回复主线程,主线程负责write。这样可以用极少的系统资源处理大数量连接。
    在低并发下,比如2个客户端上来,也不会出现100个线程hold住在那从而发生系统资源浪费的情况。

    正确实现基本线程池模型的核心:
    主线程负责所有的 I/O 操作,收齐一个请求所有数据之后如果有必要,交给工作线程进行处理 。处理完成之后,把需要写回的数据还给主线程去做写回 / 尝试写回数据直到阻塞,然后交回主线程继续。
    这里「如果有必要」的意思是:经过测量,确认这个处理过程中所消耗的 CPU 时间(不包括任何 I/O 等待,或者相关的 I/O 等待操作无法用 epoll 接管)相当显著。如果这个处理过程(不包含可接管的 I/O 操作)不显著,则可以直接放在主线程里解决。
    这个「必要」与否的前提不过三个词:假设,分析,测量。

    所以,一个正确实现的线程池环境钟,用 epoll + non-blocking I/O 代替 select + blocking I/O 的好处是,处理大量 socket 的时候,前者效率比后者高,因为前者不需要每次被唤醒之后重新检查所有 fd 判断哪个 fd 的状态改变可以进行读写了。

    实现单I/O线程的epoll模型是本架构的第一个技术要点,主要思想如下: 

    单线程创建epoll并等待,有I/O请求(socket)到达时,将其加入epoll并从线程池中取一个空闲工作者线程,将实际的业务交由工作者线程处理。

    以上摘自:https://www.cnblogs.com/cthon/p/9139384.html

    创建一个epoll实例;
    while(server running)
    {
        epoll等待事件;
        if(新连接到达且是有效连接)
        {
            accept此连接;
            将此连接设置为non-blocking;
           为此连接设置event(EPOLLIN | EPOLLET ...);
            将此连接加入epoll监听队列;
            从线程池取一个空闲工作者线程并处理此连接;
        }
        else if(读请求)
        {
            从线程池取一个空闲工作者线程并处理读请求;
        }
        else if(写请求)
        {
            从线程池取一个空闲工作者线程并处理写请求;
        }
        else
            其他事件;     
    }

    刚学线程池,若有误请大家指出(可联系我,下有邮箱):

    服务器代码:

    lock.h

    /*************************************************************************
        > File Name: lock.h
        > Author: gushi
        > Mail: 971859774@qq.com 
        > Created Time: 2018年11月22日 星期四 20时06分55秒
     ************************************************************************/
    
    #ifndef LOCK_H
    #define LOCK_H
    #include <iostream>
    #include <pthread.h>
    #include <semaphore.h>
    
    using namespace std;
    
    class Sem
    {
        private:
            sem_t sem;
    
        public:
            Sem();
            ~Sem();
            bool wait();
            bool post();
    };
    
    Sem::Sem()
    {
        if(sem_init(&sem,0,0)!=0)//信号量的初始值和和基于内存的信号量
            cerr<<"sem init error."<<endl;
    }
    
    Sem::~Sem()
    {
        sem_destroy(&sem);
    }
    
    bool Sem::wait()
    {
        return sem_wait(&sem)==0?true:false;
    }
    
    bool Sem::post()
    {
        return sem_post(&sem)==0?true:false;
    }
    
    //互斥类
    class Mutex
    {
        private:
            pthread_mutex_t mutex;
    
        public:
            Mutex();
            ~Mutex();
            bool mutex_lock();
            bool mutex_unlock();
    };
    
    Mutex::Mutex()
    {
        if(pthread_mutex_init(&mutex,NULL)!=0)//可用PTHRAD_MUTEX_INITIALIZER宏初始化
            cerr<<"mutex init error"<<endl;
    }
    
    Mutex::~Mutex()
    {
        pthread_mutex_destroy(&mutex);
    }
    
    bool Mutex::mutex_lock()
    {
        return pthread_mutex_lock(&mutex)==0?true:false;
    }
    
    bool Mutex::mutex_unlock()
    {
        return pthread_mutex_unlock(&mutex)==0?true:false;
    }
    
    //条件变量的类
    class Cond
    {
        private:
            pthread_mutex_t mutex;
            pthread_cond_t cond;
    
        public:
            Cond();
            ~Cond();
            bool wait();
            bool signal();
            bool broadcast();
    };
    
    Cond::Cond()
    {
        if(pthread_mutex_init(&mutex,NULL)!=0)
        {
            cerr<<"Cond mutex init error"<<endl;
            exit(0);
        }
        if(pthread_cond_init(&cond,NULL)!=0)
        {
            cerr<<"Cond cond init error"<<endl;
            pthread_mutex_destroy(&mutex);
            exit(0);
        }
    }
    
    Cond::~Cond()
    {
        pthread_mutex_destroy(&mutex);
        pthread_cond_destroy(&cond);
    }
    
    bool Cond::wait()
    {
        int rs=0;
        pthread_mutex_lock(&mutex);
        rs=pthread_cond_wait(&cond,&mutex);
        pthread_mutex_unlock(&mutex);
        return rs==0?true:false;
    }
    
    bool Cond::signal()
    {
        return pthread_cond_signal(&cond)==0?true:false;
    }
    
    bool Cond::broadcast()
    {
        return pthread_cond_broadcast(&cond);
    }
    
    #endif

    threadpool.h,还没有实现动态增加功能,以后待更新...

    /*************************************************************************
        > File Name: threadpool.h
        > Author: gushi
        > Mail: 971859774@qq.com 
        > Created Time: 2018年11月22日 星期四 20时32分12秒
     ************************************************************************/
    
    #ifndef THREADPOOL_H
    #define THREADPOOL_H
    
    #include <queue>
    #include <vector>
    #include <exception>
    #include <errno.h>
    #include "lock.h"
    #define MAX_THREADS 1024
    
    using namespace std;
    
    template <class T>
    class Threadpool
    {
        private:
            int idle;//线程池中空闲线程的数量
            int num;//线程池中线程数
            vector<pthread_t> idle_tid;//空闲线程的集合
            vector<pthread_t> busy_tid;//正在执行任务的线程的集合
            queue<T *> task_queue;//任务队列
            Mutex mutex;//互斥锁
            Cond cond;//条件变量锁
            bool is_stop;//是否结束线程
    
        public:
            static void *worker(void *arg);//线程函数,里面执行run函数
            void run();
            T *get_task();//获取任务函数
            int mv_to_idle(pthread_t tid);//执行任务完成后,放入空闲
            int mv_to_busy(pthread_t tid);//移入到忙碌线程中
    
        public:
            Threadpool(int n=20);
            ~Threadpool();
            bool append_task(T *task);//添加任务函数
            void start();//开始创建线程池
            void stop();//线程停止函数
    };
    
    template <class T>
    Threadpool<T>::Threadpool(int n):num(n),idle(n),is_stop(false)                                       
    {
        if(num<=0)
        {
            cerr<<"threadpool can't init because num<=0."<<endl;
            exit(1);
        }
    }
    
    template <class T>
    Threadpool<T>::~Threadpool()
    {
        stop();
    }
    
    template <class T>
    bool Threadpool<T>::append_task(T *task)
    {
        mutex.mutex_lock();//临界资源上锁
    
        bool is_signal=task_queue.empty();
        task_queue.push(task);
    
        mutex.mutex_unlock();//解锁
    
        if(is_signal)//signal to null queue
            cond.signal();
    
        return true;
    }
    
    template <class T>
    void Threadpool<T>::start()
    {
        for(int i=0;i<num;++i)    
        {
            pthread_t tid=0;
            if(pthread_create(&tid,NULL,worker,this)!=0)//this参数的传递对于开启线程运行函数要用到,
            {
                throw exception();
                exit(1);
            }
            idle_tid.push_back(tid);//加入到空闲线程集合
        }
    }
    
    template <class T>
    void Threadpool<T>::stop()
    {
        is_stop=true;
        cond.broadcast();
    }
    
    template <class T>
    void *Threadpool<T>::worker(void *arg)
    {
        Threadpool<T> *thread=(Threadpool<T> *)arg;//thread为一个线程池的指针,指向整个线程池,
        thread->run();//调用线程运行函数,真正执行工作的函数
        return thread;
    }
    
    template <class T>
    void Threadpool<T>::run()
    {
        pthread_t tid=pthread_self();//mutex.mutex_lock();这会造成与append_task函数构成死锁,访问task_queue与idle_tid与busy_tid等临界资源时,可以再各自的函数实现中加锁
        while(1)//if
        {
            T *task=get_task();//函数实现中有互斥锁临界访问
            if(task==NULL)
            {
                cerr<<"task_queue is null.wait()"<<endl;
                cond.wait();
            }
            else
            {
                mv_to_busy(tid);//函数实现中有互斥锁保护访问
                task->doit();//工作函数
                mv_to_idle(tid);
            }
        }
        //mutex.mutex_unlock();
    }
    
    template <class T>
    T *Threadpool<T>::get_task()
    {
        T *task=NULL;
        
        mutex.mutex_lock();
        if(!task_queue.empty())
        {
            task=task_queue.front();
            task_queue.pop();
        }
        mutex.mutex_unlock();
    
        return task;
    }
    
    template <class T>
    int Threadpool<T>::mv_to_idle(pthread_t tid)
    {
        vector<pthread_t>::iterator busy_iter=busy_tid.begin();
        while(busy_iter!=busy_tid.end())
        {
            if(tid==*busy_iter)
                break;
            ++busy_iter;
        }    
        
        mutex.mutex_lock();
        busy_tid.erase(busy_iter);//此线程空闲,从繁忙任务队列中移除
        idle_tid.push_back(tid);//添加到空闲线程集合中
    
        //mutex.mutex_lock();
        ++idle;
        mutex.mutex_unlock();
        return 0;
    }
    
    template <class T>
    int Threadpool<T>::mv_to_busy(pthread_t tid)
    {
        vector<pthread_t>::iterator idle_iter=idle_tid.begin();
        while(idle_iter!=idle_tid.end())
        {
            if(tid==*idle_iter)
                break;
            ++idle_iter;
        }
        mutex.mutex_lock();
        idle_tid.erase(idle_iter);
        busy_tid.push_back(tid);
    
        //mutex.mutex_lock();
        --idle;
        mutex.mutex_unlock();
    }
    
    #endif

    epollserver.h

    /*************************************************************************
        > File Name: epollserver.h
        > Author: gushi
        > Mail: 971859774@qq.com 
        > Created Time: 2018年11月23日 星期五 17时07分25秒
     ************************************************************************/
    
    #ifndef EPOLL_SERVER_H
    #define EPOLL_SERVER_H
    
    #include <sys/socket.h>
    #include <sys/types.h>
    #include <sys/wait.h>
    #include <sys/epoll.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <string.h>
    #include <fcntl.h>
    #include <unistd.h>//write头文件
    #include <errno.h>
    #include "threadpool.h"
    
    #define MAX_EVENT 1024
    #define MAX_BUFFER 2048
    using namespace std;
    
    class Basetask
    {
        public:
            virtual void doit()=0;
    };
    
    class Task:public Basetask
    {
        private:
            int sockfd;
            char msg[MAX_BUFFER];
        
        public:
            Task(int ,char *);
            void doit();
    };
    
    Task::Task(int fd,char *str):sockfd(fd)
    {
        memset(msg,0,sizeof(msg));
        strcpy(msg,str);
    }
    
    void Task::doit()
    {
        cout<<"server reveive message is: "<<msg<<endl;
        write(sockfd,msg,strlen(msg));
        //Threadpool<Task>::mv_to_idle(tid);
    }
    
    class Epollserver
    {
        private:
            bool is_stop;//是否停止epoll_wait
            int num;//线程数目
            int sockfd;
            int port;
            int epollfd;
            Threadpool<Task> *pool;//线程池的指针
            epoll_event events[MAX_EVENT];
            struct sockaddr_in servaddr;
        
        public:
            Epollserver(int p,int n);
            Epollserver(){}
            ~Epollserver();
            void init();
            void epoll();
            static int setnonblocking(int fd);
            static void addfd(int epollfd,int sockfd,bool onshot);
    };
    Epollserver::Epollserver(int p,int n):port(p),num(n),is_stop(false),
                            pool(NULL)
    {
    }
    
    Epollserver::~Epollserver()
    {
        delete pool;
    }
    
    void Epollserver::init()
    {
        bzero(&servaddr,sizeof(servaddr));
        servaddr.sin_family=AF_INET;
        servaddr.sin_addr.s_addr=htonl(INADDR_ANY);
        servaddr.sin_port=htons(port);
    
        //监听套接字
        sockfd=socket(AF_INET,SOCK_STREAM,0);
        if(sockfd<0)
        {
            cerr<<"Epollserver socket init error"<<endl;
            exit(1);
        }
    
        int tmp=bind(sockfd,(struct sockaddr *)&servaddr,sizeof(servaddr));
        if(tmp<0)
        {
            cerr<<"Epollserver bind init error"<<endl;
            exit(1);
        }
    
        tmp=listen(sockfd,10);
        if(tmp<0)
        {
            //cout<<errno<<endl;
            cerr<<"Epollserver listen init error:"<<strerror(errno)<<endl;
            exit(1);
        }
    
        epollfd=epoll_create(1024);
        if(epollfd<0)
        {
            cerr<<"Epollserver epoll_create init error"<<endl;
            exit(1);
        }
    
        //创建线程池,num是线程池中线程的个数,调用构造函数
        pool=new Threadpool<Task>(num); 
    }
    
    void Epollserver::epoll()
    {
        pool->start();//启动线程池
    
        addfd(epollfd,sockfd,false);
        while(!is_stop)
        {
            int ret=epoll_wait(epollfd,events,MAX_EVENT,-1);
            if(ret<0)
            {
                cerr<<"epoll_wait error"<<endl;
                exit(1);
            }
    
            for(int i=0;i<ret;++i)
            {
                //int fd=events[i].data.fd;
                if(events[i].data.fd==sockfd)
                {
                    struct sockaddr_in cliaddr;
                    socklen_t len=sizeof(cliaddr);
                    //accpet 返回已连接套接字
                    int confd=accept(sockfd,(struct sockaddr *)&cliaddr,&len);
                    Epollserver::addfd(epollfd,confd,false);
                }
                else if(events[i].events&EPOLLIN)//有数据可读
                {
                    char buffer[MAX_BUFFER];
                    int fd=events[i].data.fd;//接受已连接套接字,对客户端进行内容回送
        readagain:    memset(buffer,0,sizeof(buffer));
                    ret=read(fd,buffer,MAX_BUFFER-1);
                    if(ret==0)//某个fd关闭连接
                    {
                        struct epoll_event ev;
                        ev.events=EPOLLIN;
                        ev.data.fd=fd;
                        epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&ev);
                        shutdown(fd,SHUT_RDWR);
                        cout<<fd<<" exit"<<endl;
                        continue;
                    }
                    else if(ret<0)//读取失败
                    {
                        if(errno==EAGAIN)
                        {
                            cout<<"read error. read again"<<endl;
                            goto readagain;
                            break;
                        }
                    }
                    else//读取成功
                    {
                        Task *task=new Task(fd,buffer);
                        pool->append_task(task);
                    }
                }//else if
                else
                    cerr<<"something else had happend"<<endl;
            }//for
        }//while
        close(sockfd);
        pool->stop();
    }
    
    int Epollserver::setnonblocking(int fd)
    {
        int old_opt=fcntl(fd,F_GETFL);
        int new_opt=old_opt|O_NONBLOCK;
        fcntl(fd,F_SETFL,new_opt);
        return old_opt;
    }
    
    void Epollserver::addfd(int epollfd,int sockfd,bool oneshot)
    {
        epoll_event event;
        event.data.fd=sockfd;
        event.events=EPOLLIN|EPOLLET;
        if(oneshot)
            event.events|=EPOLLONESHOT;
        epoll_ctl(epollfd,EPOLL_CTL_ADD,sockfd,&event);
        Epollserver::setnonblocking(sockfd);
    }
    
    #endif

    server.cpp服务器的主函数

    /*************************************************************************
        > File Name: server.cpp
        > Author: gushi
        > Mail: 971859774@qq.com 
        > Created Time: 2018年11月23日 星期五 19时34分29秒
     ************************************************************************/
    
    #include "epollserver.h"
    #define INDARRY_PORT 9877
    
    using namespace std;
    
    int main(int argc,char **argv)
    {
        Epollserver *epoll=new Epollserver(INDARRY_PORT,20);
        epoll->init();//对初始化服务器(socket,bind,listen,epoll_create...等函数,病完成线程池的初始化)
        epoll->epoll();//开启线程池,完成相应的任务添加之后,自动调用线程池中空闲的函数来完成doit工作
        return 0;
    }

    客户端程序:

    /*************************************************************************
        > File Name: client.cpp
        > Author:gushi
        > Mail: 971859774@qq.com 
        > Created Time: 2018年11月24日 星期六 15时36分23秒
     ************************************************************************/
    
    #include <iostream>
    #include <sys/socket.h>
    #include <sys/types.h>
    #include <arpa/inet.h>
    #include <unistd.h>
    #include <netinet/in.h>
    #include <strings.h>
    #define SERV_PORT 9877
    #define MAXLINE 1204
    
    using namespace std;
    
    void str_cli(FILE *fp,int sockfd)
    {
        fd_set set;
        FD_ZERO(&set);
    
        char buff[1024];
        int stdineof=0,n;
        while(1)
        {
            if(stdineof==0)
                FD_SET(fileno(fp),&set);
            FD_SET(sockfd,&set);
    
            int maxfd=max(fileno(fp),sockfd)+1;
            
            select(maxfd,&set,NULL,NULL,NULL);
    
            if(FD_ISSET(sockfd,&set))
            {
                if((n=read(sockfd,buff,MAXLINE))==0)
                    if(stdineof==1)
                        return;
                    else
                        cerr<<"str_cli: server terinated peraturely"<<endl;
    
                write(fileno(stdout),buff,n);
            }
            else if(FD_ISSET(fileno(fp),&set))
            {
                if((n=read(fileno(fp),buff,MAXLINE))==0)//客户完成输入
                    stdineof=1;
                write(sockfd,buff,n);
    
                //shutdown(sockfd,SHUT_WR);
    
                FD_CLR(fileno(fp),&set);
                continue;
            }
            //write(sockfd,buff,n);
        }
        return;
    }
    
    int main(int argc,char **argv)
    {
        if(argc!=2)
        {
            cerr<<"please input server address."<<endl;
            exit(1);
        }
    
        int sockfd=socket(AF_INET,SOCK_STREAM,0);
    
        struct sockaddr_in servaddr;
        bzero(&servaddr,sizeof(servaddr));
        servaddr.sin_family=AF_INET;
        inet_pton(AF_INET,argv[1],&servaddr.sin_addr);
        servaddr.sin_port=htons(SERV_PORT);
        
        connect(sockfd,(struct sockaddr *)&servaddr,sizeof(servaddr));
    
        str_cli(stdin,sockfd);
        return 0;
    }

    GitHub:https://github.com/tianzengBlog/websServer

  • 相关阅读:
    阈值处理——实例分析
    阈值处理
    split()函数+merge()函数
    imread函数+cvtColor()函数
    OpenCV3.2.0+VS2015开发环境配置
    Javascript中的async await
    React Native 系列(一)
    React Native 系列(三)
    React Native 系列(六)
    React Native 系列(七)
  • 原文地址:https://www.cnblogs.com/tianzeng/p/10013766.html
Copyright © 2011-2022 走看看