通过多路复用构建高性能服务器是一种常见的模型,单个I/O多路复用线程+一组工作线程,I/O线程负责协调分配任务,而实际工作交给工作线程处理。这种模型的好处在于高效并发和充分利用多线程的处理能力。
以memcached的构架图为例
memcached的主线程用epoll监听到EPOLLIN事件,并且触发该事件的fd是服务器listen的fd,就accept该连接请求,返回的fd主线程并不处理,而是通过CQ队列发送给工作线程去处理,工作线程又维护了一个epoll多路复用队列,子线程的epoll轮询和响应请求。
这种架构包含两大块:
1.多路复用
#include<stdlib.h>
#include<sys/epoll.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<sys/types.h>
#include<fcntl.h>
using namespace std;
const int PORT = 8888;
const int MAX_CLIENT_NUM = 10000;
const int MAX_LEN = 2000;
bool setfdnoblock(int fd)
{
int flg = fcntl(fd, F_GETFL);
if(flg < 0)
{
cout << "get fd flag failed" << endl;
return false;
}
if(fcntl(fd, F_SETFL, O_NONBLOCK | flg) < 0)
{
return false;
}
return true;
}
int CreateTcpServer(int port, int listennum)
{
int fd;
fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
sockaddr_in TcpServer;
bzero(&TcpServer, sizeof(TcpServer));
TcpServer.sin_family = AF_INET;
TcpServer.sin_port = htons(8888);
TcpServer.sin_addr.s_addr = htonl(INADDR_ANY);
int iRet = bind(fd, (struct sockaddr*)&TcpServer, sizeof(TcpServer));
if(-1 == iRet)
{
cout << "server bind error!" << endl;
return -1;
}
if(listen(fd, listennum) == -1)
{
cout << "server listen error" << endl;
return -1;
}
return fd;
}
int main()
{
int Serverfd = CreateTcpServer(PORT, MAX_CLIENT_NUM);
if(Serverfd == -1)
{
cout << "server create failed" << endl;
}
else
{
cout << "serverfd is :" << Serverfd << endl;
}
int Epollfd = epoll_create(MAX_CLIENT_NUM);
if(Epollfd == -1)
{
cout << "epoll_create failed" << endl;
}
epoll_event ev, events[MAX_CLIENT_NUM];
int nfds = 0;
int client = 0;
char buff[MAX_LEN];
sockaddr_in CliAddr;
unsigned int iCliSize = sizeof(CliAddr);
ev.events = EPOLLIN|EPOLLOUT;
ev.data.fd = Serverfd;
if(!setfdnoblock(Serverfd))
{
cout << "set serverfd no_block failed" << endl;
}
if(epoll_ctl(Epollfd, EPOLL_CTL_ADD, Serverfd, &ev))
{
cout << "epoll add serverfd error" << endl;
}
while(1)
{
nfds = epoll_wait(Epollfd, events, MAX_CLIENT_NUM, 100000);
if(nfds == -1)
{
cout << "error occur, exit" << endl;
return -1;
}
else if( nfds == 0)
{
cout << "epoll_wait return zero" << endl;
}
else
{
for(int i = 0; i < nfds; i++)
{
cout << "events[i].data.fd is :" << events[i].data.fd << endl;
if(events[i].data.fd == Serverfd)
{
cout << " Serverfd received event" << endl;
client = accept(Serverfd, (struct sockaddr*)&CliAddr, &iCliSize);
if(client == -1)
{
cout << "accept error" << endl;
return -1;
}
ev.data.fd = client;
if(!setfdnoblock(client))
{
cout << "set client fd no_block error" << endl;
}
if(epoll_ctl(Epollfd, EPOLL_CTL_ADD, client, &ev))
{
cout << "epoll add client error" << endl;
}
else
{
cout << "success add client" << endl;
}
}
else if(events[i].events&EPOLLIN)
{
cout << "recv client msg" << endl;
if(events[i].data.fd < 0)
{
cout << " event[i].data.fd is smaller than zero" << endl;
continue;
}
if(read(events[i].data.fd, buff, MAX_LEN) == -1)
{
perror("clifd read");
}
else
{
cout << "read client msg suc" << endl;
printf("%s",buff);
}
char resp[] = "recv a client msg, this is resp msg";
write(events[i].data.fd, resp, strlen(resp)+1);
//read and mod
}
else if(events[i].events&EPOLLOUT)
{
//send and mod
}
}
}
}
}
例子中epoll listen和accept新连接,并响应新连接的请求。
2.工作线程or线程池
进程的线程数量并不是越多越好,也不是越少越好,需要根据机器逐步调优。
工作线程的工作原理,
1.I/O线程把收到的请求放入队列,并通知工作线程处理,队列和通知机制可以是传统的加锁消息队列、信号量,也可以是memcached+libevent的实现:CQ队列装消息,线程管道通知工作线程。
2.I/O线程没有新的任务分配,工作线程阻塞或等待一段时间。
线程池用到的比较少,不做评价。