基本的架构是 epoll+线程池。
这篇博文主要从以下几个方面进行阐述:
(1)reactor模式的一个介绍:(只要是我的理解)
(2)关于线程池的说明。
(3)如何将epoll + 池结合起来实现一个群聊
一. reactor 模式:
从我个人的理解角度,所谓的reactor模式类似于:
场景:银行, 和三个业务工作人员 ,一个接待,有很多人在等待。
当你进去的时候,银行的接待会给你一个编号,这就是你第几个才会被业务工作人员接待。
这个时候,你就进入了等待的状态。直到轮流到你了,三个业务工作人员中的一个就会帮助你处理你的问题。
类比到计算机就是:作为接待只是给你发了一个编号,她并不关心你要处理什么业务,之后又去给后续进来的人发送编号。她就不在乎什么时候你的业务才会被处理到,至于你的业务被处理的时候,就属于业务处理人员。当然业务处理人员也不会关心给进来的人发编号,他只关心你当前要处理的业务是什么。
这种模式的优点在于:主线程只监听当前套接字是否可读或者可写,至于你要处理什么事情,就交给工作线程了。
估计现在本来清晰的你,已经被我弄糊涂了,那就用图说明:
IO模型实现reactor 模式的工作流程:
(1)主线程向epoll内核事件表内注册socket上的可读就绪事件。
(2)主线程调用epoll_wait()等待socket上有数据可读
(3)当socket上有数据可读,epoll_wait 通知主线程。主线程从socket可读事件放入请求队列。
(4)睡眠在请求队列上的某个可读工作线程被唤醒,从socket上读取数据,处理客户的请求。
然后向 epoll内核事件表里注册写的就绪事件
(5)主线程调用epoll_wait()等待数据可写 。
以上就是关于Reactor模式的一个说明,下面就来看线程池的说明。
(二)线程池:
之所以会出现池这个概念就是:单个任务处理事件比较短,需要处理的任务有比较多。使用线程池可以减少在创建和撤销线程上花费的时间以及系统资源的开销。如果不使用线程池有可能创建大量的线程而消耗完系统资源以及过度的切换。
线程池的概念是从一个很简单的模型开始的,那就是生产者和消费者思想,这个模型我相信很多学过操作系统的人,都知道。主线程就相当于是生产者,而我们自己创建的大量的线程就相当于消费者(工作线程),生产者将自己需要处理的业务放到一个任务队列中,而工作线程从任务队列中取出任务,然后又加以处理。我们创建的大量线程通过一个池的东西维护起来,这个池里面包含我们创建的线程,还有那个工作的队列,互斥锁,条件变量等等很多的东西。
(三)epoll + 池的结合
#include <arpa/inet.h> #include <unistd.h> #include <assert.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/socket.h> #include <sys/epoll.h> #include <sys/types.h> #include <pthread.h> #include <fcntl.h> #include <assert.h> #include <errno.h> #include <netinet/in.h> #include "thread_pool.h" #include "thread_pool.c" #define MAX_EVENT_NUMBER 1000 #define SIZE 1024 #define MAX 10 //从主线程向工作线程数据结构 struct fd { int epollfd; int sockfd ; }; //用户说明 struct user { int sockfd ; //文件描述符 char client_buf [SIZE]; //数据的缓冲区 }; struct user user_client[MAX]; //定义一个全局的客户数据表 //由于epoll设置的EPOLLONESHOT模式,当出现errno =EAGAIN,就需要重新设置文件描述符(可读) void reset_oneshot (int epollfd , int fd) { struct epoll_event event ; event.data.fd = fd ; event.events = EPOLLIN|EPOLLET|EPOLLONESHOT ; epoll_ctl (epollfd , EPOLL_CTL_MOD, fd , &event); } //向epoll内核事件表里面添加可写的事件 int addreadfd (int epollfd , int fd , int oneshot) { struct epoll_event event ; event.data.fd = fd ; event.events |= ~ EPOLLIN ; event.events |= EPOLLOUT ; event.events |= EPOLLET; if (oneshot) { event.events |= EPOLLONESHOT ; //设置EPOLLONESHOT } epoll_ctl (epollfd , EPOLL_CTL_MOD ,fd , &event); } //群聊函数 int groupchat (int epollfd , int sockfd , char *buf) { int i = 0 ; for ( i = 0 ; i < MAX ; i++) { if (user_client[i].sockfd == sockfd) { continue ; } strncpy (user_client[i].client_buf ,buf , strlen (buf)) ; addreadfd (epollfd , user_client[i].sockfd , 1); } } //接受数据的函数,也就是线程的回调函数 int funcation (void *args) { int sockfd = ((struct fd*)args)->sockfd ; int epollfd =((struct fd*)args)->epollfd; char buf[SIZE]; memset (buf , '