这里分析两种模型
A: 来源于网络,http://bbs.chinaunix.net/thread-4067753-1-1.html,号称50万QPS
B: 本人自己写的,我觉得性能上比上述的模型要好
——————————————————————————————————————————
A:
#define _GNU_SOURCE
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sched.h>
#include <pthread.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <time.h>
typedef struct connection_st {
int sock;
int index; /* which epoll fd this conn belongs to*/
int using;
#define BUF_SIZE 4096
int roff;
char rbuf[BUF_SIZE];
int woff;
char wbuf[BUF_SIZE];
}*connection_t;
#define CONN_MAXFD 65536
struct connection_st g_conn_table[CONN_MAXFD] = {0};
static sig_atomic_t shut_server = 0;
void shut_server_handler(int signo) {
shut_server = 1;
}
#define EPOLL_NUM 8
int epfd[EPOLL_NUM];
int lisSock;
#define WORKER_PER_GROUP 1
#define NUM_WORKER (EPOLL_NUM * WORKER_PER_GROUP)
pthread_t worker[NUM_WORKER]; /* echo group has 6 worker threads */
int sendData(connection_t conn, char *data, int len) {
if (conn->woff){
if (conn->woff + len > BUF_SIZE) {
return -1;
}
memcpy(conn->wbuf + conn->woff, data, len);
conn->woff += len;
return 0;
} else {
int ret = write(conn->sock, data, len);
if (ret > 0){
if (ret == len) {
return 0;
}
int left = len - ret;
if (left > BUF_SIZE) return -1;
memcpy(conn->wbuf, data + ret, left);
conn->woff = left;
} else {
if (errno != EINTR && errno != EAGAIN) {
return -1;
}
if (len > BUF_SIZE) {
return -1;
}
memcpy(conn->wbuf, data, len);
conn->woff = len;
}
}
return 0;
}
int handleReadEvent(connection_t conn) {
if (conn->roff == BUF_SIZE) {
return -1;
}
int ret = read(conn->sock, conn->rbuf + conn->roff, BUF_SIZE - conn->roff);
if (ret > 0) {
conn->roff += ret;
int beg, end, len;
beg = end = 0;
while (beg < conn->roff) {
char *endPos = (char *)memchr(conn->rbuf + beg, ' ', conn->roff - beg);
if (!endPos) break;
end = endPos - conn->rbuf;
len = end - beg + 1;
/*echo*/
if (sendData(conn, conn->rbuf + beg, len) == -1) return -1;
beg = end + 1;
printf("request_finish_time=%ld ", time(NULL));
}
int left = conn->roff - beg;
if (beg != 0 && left > 0) {
memmove(conn->rbuf, conn->rbuf + beg, left);
}
conn->roff = left;
} else if (ret == 0) {
return -1;
} else {
if (errno != EINTR && errno != EAGAIN) {
return -1;
}
}
return 0;
}
int handleWriteEvent(connection_t conn) {
if (conn->woff == 0) return 0;
int ret = write(conn->sock, conn->wbuf, conn->woff);
if (ret == -1) {
if (errno != EINTR && errno != EAGAIN) {
return -1;
}
} else {
int left = conn->woff - ret;
if (left > 0) {
memmove(conn->wbuf, conn->wbuf + ret, left);
}
conn->woff = left;
}
return 0;
}
void closeConnection(connection_t conn) {
struct epoll_event evReg;
conn->using = 0;
conn->woff = conn->roff = 0;
epoll_ctl(epfd[conn->index], EPOLL_CTL_DEL, conn->sock, &evReg);
close(conn->sock);
}
void *workerThread(void *arg) {
int epfd = *(int *)arg;
struct epoll_event event;
struct epoll_event evReg;
/* only handle connected socket */
while (!shut_server) {
int numEvents = epoll_wait(epfd, &event, 1, 1000);
if (numEvents > 0) {
int sock = event.data.fd;
connection_t conn = &g_conn_table[sock];
if (event.events & EPOLLOUT) {
if (handleWriteEvent(conn) == -1) {
closeConnection(conn);
continue;
}
}
if (event.events & EPOLLIN) {
if (handleReadEvent(conn) == -1) {
closeConnection(conn);
continue;
}
}
evReg.events = EPOLLIN | EPOLLONESHOT;
if (conn->woff > 0) evReg.events |= EPOLLOUT;
evReg.data.fd = sock;
epoll_ctl(epfd, EPOLL_CTL_MOD, conn->sock, &evReg);
}
}
return NULL;
}
void *listenThread(void *arg) {
int lisEpfd = epoll_create(5);
struct epoll_event evReg;
evReg.events = EPOLLIN;
evReg.data.fd = lisSock;
epoll_ctl(lisEpfd, EPOLL_CTL_ADD, lisSock, &evReg);
struct epoll_event event;
int rrIndex = 0; /* round robin index */
/* only handle listen socekt */
while (!shut_server) {
int numEvent = epoll_wait(lisEpfd, &event, 1, 1000);
if (numEvent > 0) {
int sock = accept(lisSock, NULL, NULL);
if (sock > 0) {
g_conn_table[sock].using = 1;
int flag;
flag = fcntl(sock, F_GETFL);
fcntl(sock, F_SETFL, flag | O_NONBLOCK);
evReg.data.fd = sock;
evReg.events = EPOLLIN | EPOLLONESHOT;
/* register to worker-pool's epoll,
* not the listen epoll */
g_conn_table[sock].index= rrIndex;
epoll_ctl(epfd[rrIndex], EPOLL_CTL_ADD, sock, &evReg);
rrIndex = (rrIndex + 1) % EPOLL_NUM;
}
}
}
close(lisEpfd);
return NULL;
}
int main(int argc, char *const argv[]) {
int c;
for (c = 0; c < CONN_MAXFD; ++c) {
g_conn_table[c].sock = c;
}
struct sigaction act;
memset(&act, 0, sizeof(act));
act.sa_handler = shut_server_handler;
sigaction(SIGINT, &act, NULL);
sigaction(SIGTERM, &act, NULL);
/* create 2 different epoll fd */
lisSock = socket(AF_INET, SOCK_STREAM, 0);
int reuse = 1;
setsockopt(lisSock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
int flag;
flag = fcntl(lisSock, F_GETFL);
fcntl(lisSock, F_SETFL, flag | O_NONBLOCK);
struct sockaddr_in lisAddr;
lisAddr.sin_family = AF_INET;
lisAddr.sin_port = htons(9876);
lisAddr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(lisSock, (struct sockaddr *)&lisAddr, sizeof(lisAddr)) == -1) {
perror("bind");
return -1;
}
listen(lisSock, 4096);
pthread_t lisTid;
pthread_create(&lisTid, NULL, listenThread, NULL);
int epi;
for (epi = 0; epi < EPOLL_NUM; ++ epi) {
epfd[epi] = epoll_create(20);
}
int i;
cpu_set_t mask;
for (i = 0; i < EPOLL_NUM; ++i) {
int j;
for (j = 0; j < WORKER_PER_GROUP; ++j) {
pthread_create(worker + (i * WORKER_PER_GROUP + j), NULL, workerThread, epfd + i);
CPU_ZERO(&mask);
CPU_SET(i, &mask);
if (pthread_setaffinity_np(*(worker + (i * WORKER_PER_GROUP + j)), sizeof(mask), &mask) < 0)
{
fprintf(stderr, "set thread affinity failed ");
}
}
}
for (i = 0; i < NUM_WORKER; ++i) {
pthread_join(worker[i], NULL);
}
pthread_join(lisTid, NULL);
struct epoll_event evReg;
for (c = 0; c < CONN_MAXFD; ++c) {
connection_t conn = g_conn_table + c;
if (conn->using) {
epoll_ctl(epfd[conn->index], EPOLL_CTL_DEL, conn->sock, &evReg);
close(conn->sock);
}
}
for (epi = 0; epi < EPOLL_NUM; ++epi) {
close(epfd[epi]);
}
close(lisSock);
return 0;
}
B:
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <error.h>
#include <errno.h>
#include <signal.h>
#include <sched.h>
#include "thread-pool.h"
#define MAXEVENTS 64
static int make_socket_non_blocking(int sfd)
{
int flags, s;
flags = fcntl(sfd, F_GETFL, 0);
if (-1==flags)
{
perror("fcntl");
return -1;
}
flags |= O_NONBLOCK;
s = fcntl(sfd, F_SETFL, flags);
if (-1==s)
{
perror("fcntl");
return -1;
}
return 0;
}
static int create_and_bind(char *port)
{
struct addrinfo hints;
struct addrinfo *result, *rp;
int s, sfd;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_UNSPEC;//return IPv4 and IPv6 choices
hints.ai_socktype = SOCK_STREAM;//we want a TCP socket
hints.ai_flags = AI_PASSIVE;//all interfaces
s = getaddrinfo(NULL, port, &hints, &result);
if (0!=s)
{
fprintf(stderr, "getaddrinfo:%s ", gai_strerror(s));
return -1;
}
for(rp=result; NULL!=rp; rp=rp->ai_next)
{
sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (-1==sfd)
{
continue;
}
s = bind(sfd, rp->ai_addr, rp->ai_addrlen);
if (0==s)
{
//we managed to bind successfully
break;
}
close(sfd);
}
if (NULL==rp)
{
fprintf(stderr, "could not bind");
return -1;
}
freeaddrinfo(result);
return sfd;
}
int run = 1;
void SignalHandler(int iSignNum)
{
printf("capture signal number:%d ", iSignNum);
run = 0;
}
void *handler(void *arg)
{
int s;
int fd = *((int*)arg);
/*we have data on the fd waiting to be read. read and
display it. we must read whatever data is available
completely, as we are running in edge-triggered mode
and won't get notification again for the same data.*/
int done = 0;
while(1)
{
ssize_t count;
char buf[512];
count = read(fd, buf, sizeof(buf));
if (-1==count)
{
/*if errno==EAGAIN, that means we have read all
data. so go back to the main loop*/
if (errno==EAGAIN||errno==EWOULDBLOCK)
{
done = 1;
}
else
{
fprintf(stderr, "fd:%d ", fd);
perror("read client data");
}
break;
}
else if (0==count)
{
/*end of file. the remote has closed the connection*/
done = 1;
break;
}
//write the buffer to standard output
s = write(1, buf, count);
if (-1==s)
{
perror("write");
abort();
}
}
if (done)
{
write(fd, "fine, thank you", strlen("fine, thank you")+1);
printf("closed connection on descriptor %d ", fd);
/*closing the descriptor will make epoll remove it
from the set of descriptors which are monitored.*/
close(fd);
}
}
int main(int argc, char *argv[])
{
int sfd, s;
int efd;
struct epoll_event event;
struct epoll_event *events;
if (2!=argc)
{
fprintf(stderr, "Usage:%s [port] ", argv[0]);
exit(EXIT_FAILURE);
}
// init thread-pool
unsigned count = 1;
count = sysconf(_SC_NPROCESSORS_ONLN);
pool_init(count);
thread_pool *pool = (thread_pool*)pool_instance();
// wait thread to run
sleep(5);
// thread cpu affinity
cpu_set_t mask;
cpu_set_t get;
int thread_ccore = 0;
for(thread_ccore=0; thread_ccore<count; thread_ccore++)
{
CPU_ZERO(&mask);
CPU_SET(thread_ccore, &mask);
if (pthread_setaffinity_np(pool->threadid[thread_ccore], sizeof(mask), &mask) < 0)
{
fprintf(stderr, "set thread affinity failed ");
}
CPU_ZERO(&get);
if (pthread_getaffinity_np(pool->threadid[thread_ccore], sizeof(get), &get) < 0)
{
fprintf(stderr, "get thread affinity failed ");
}
if (CPU_ISSET(thread_ccore, &get))
{
printf("thread %ld is running in processor %d ", pool->threadid[thread_ccore], thread_ccore);
}
}
// listen
sfd = create_and_bind(argv[1]);
if (-1==sfd)
{
abort();
}
s = make_socket_non_blocking(sfd);
if (-1==s)
{
abort();
}
s = listen(sfd, SOMAXCONN);
if (-1==s)
{
perror("listen");
abort();
}
efd = epoll_create1(0);
if (-1==efd)
{
perror("epoll_create");
abort();
}
event.data.fd = sfd;
event.events = EPOLLIN|EPOLLET;
s = epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &event);
if (-1==s)
{
perror("epoll_ctl");
abort();
}
//buffer where events are returned
events = calloc(MAXEVENTS, sizeof event);
//the event loop
while(1)
{
signal(SIGINT, SignalHandler);
if (!run)
{
break;
}
int n, i;
n = epoll_wait(efd, events, MAXEVENTS, -1);
for(i=0; i<n; i++)
{
if ((events[i].events&EPOLLERR)||
(events[i].events&EPOLLHUP)||
(!(events[i].events&EPOLLIN)))
{
/*an error has occured on this fd, or the socet is not
ready for reading (whe were we notified then?) */
fprintf(stderr, "epoll error ");
close(events[i].data.fd);
continue;
}
else if (sfd!=events[i].data.fd)
{
pool_add_job(handler, (void*)&(events[i].data.fd));
}
else
{
/*we have a notification on the listening socket, which
means one or more incoming connections*/
while(1)
{
struct sockaddr in_addr;
socklen_t in_len;
int infd;
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
in_len = sizeof in_addr;
infd = accept(sfd, &in_addr, &in_len);
if (-1==infd)
{
if ((errno==EAGAIN)||
(errno==EWOULDBLOCK))
{
//we have processed all incoming connections
break;
}
else
{
perror("accept");
break;
}
}
s = getnameinfo(&in_addr, in_len, hbuf, sizeof hbuf,
sbuf, sizeof sbuf,
NI_NUMERICHOST|NI_NUMERICSERV);
if (0==s)
{
printf("accepted connection on descriptor %d"
"(host=%s, port=%s) ", infd, hbuf, sbuf);
}
/*make the incoming socket non-blocking and add it to the
list of fds to monitor*/
s = make_socket_non_blocking(infd);
if (-1==s)
{
abort();
}
event.data.fd = infd;
event.events = EPOLLIN|EPOLLET|EPOLLONESHOT;
s = epoll_ctl(efd, EPOLL_CTL_ADD, infd, &event);
if (-1==s)
{
perror("epoll_ctl");
abort();
}
}
}//else
}
}
free(events);
close(sfd);
close(efd);
pool_destroy();
printf("process exit ");
}
————————————————————————————————————————————————————————————————————
本机环境,CPU八核(虚拟机)
8 工作者 线程
A: 测试
//LoadRunner
#include "lrs.h"
Action()
{
lrs_create_socket("socket0", "TCP", "RemoteHost=10.20.61.117:9876", LrsLastArg);
lr_think_time(7);
lrs_send("socket0", "buf0", LrsLastArg);
lrs_receive("socket0", "buf1", LrsLastArg);
lrs_close_socket("socket0");
return 0;
}
B: 测试
#include "lrs.h"
Action()
{
lrs_create_socket("socket0", "TCP", "RemoteHost=10.20.61.117:8093", LrsLastArg);
lr_think_time(6);
lrs_send("socket0", "buf0", LrsLastArg);
lrs_receive("socket0", "buf1", LrsLastArg);
lrs_close_socket("socket0");
return 0;
}
Finally:
A模式:多epoll, 多线程。accept后,将socket fd分配给各个epoll fd,各个线程epoll_wait各自的epoll fd,不设置锁。
以大多数开发者的想法,这种不设置锁的多线程应该高效。但其实不然!!!!
首先,这个模型里,各个线程没有休眠,再有,connnect结构占用内容偏高。
结果,造成系统响应迟钝,退出缓慢,网络吞吐并不高。
B模式:单epoll,启用工作者线程池
大多数开发者看见了线程池有锁,就认为效率低下。其实不然!!!!
有人分析过,内核锁的效率不是应用效率的主障碍!!!!!!!
首先,这个模型里,cpu和内存占用极低,所有耗时都费在了应该费时的I/O上。
结果,系统响应极快,退出正常,网络吞吐是上个模型的2.5倍
有时候,新生事务是要比老事务先进的多的。因为A模型实在2013年提出的!!!