在beanstalk中,网络处理模块没有使用第三方库,而是作者自己实现的一个模块,总计代码不到100行。
是epoll的使用典范。
1. epoll_event
struct epoll_event结构如下:
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
typedef union epoll_data {
void ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
beanstalk中将epoll_data存储一个socket抽象,结构如下:


socket抽象
struct Socket { int fd; //socket句柄 Handle f; //回调处理函数 void *x; //回调函数输入参数 int added; //是否加入epoll };
2. epoll处理
epoll操作包括包括创建epoll对象,将socket加入epoll对象,获取准备好的socket。


epoll int sockinit(void) //epoll初始化,非常简单,没有特殊处理 { epfd = epoll_create(1); if (epfd == -1) { twarn("epoll_create"); return -1; } return 0; } int sockwant(Socket *s, int rw) //管理epoll的socket,增删改 { int op; struct epoll_event ev = {}; //epoll操作 if (!s->added && !rw) { return 0; } else if (!s->added && rw) { s->added = 1; op = EPOLL_CTL_ADD; } else if (!rw) { op = EPOLL_CTL_DEL; } else { op = EPOLL_CTL_MOD; } //epoll_event的事件和数据,使用的是epoll默认的LT(level-trigger )模式 switch (rw) { case 'r': ev.events = EPOLLIN; break; case 'w': ev.events = EPOLLOUT; break; } ev.events |= EPOLLRDHUP | EPOLLPRI; ev.data.ptr = s; return epoll_ctl(epfd, op, s->fd, &ev); } int socknext(Socket **s, int64 timeout) //获取一个触发epoll事件的socket { int r; struct epoll_event ev; r = epoll_wait(epfd, &ev, 1, (int)(timeout/1000000)); if (r == -1 && errno != EINTR) { //由于epoll_wait同步等待,有可能被信号中断,返回EINTR错误 twarn("epoll_wait"); exit(1); } //将事件转化为抽象的操作 if (r) { *s = ev.data.ptr; if (ev.events & (EPOLLHUP|EPOLLRDHUP)) { return 'h'; } else if (ev.events & EPOLLIN) { return 'r'; } else if (ev.events & EPOLLOUT) { return 'w'; } } return 0;
3. epoll在程序中的回调
在beanstalk中,socket抽象包括两类:
a. 监听socket,回调函数就是接受处理srvaccept
b.和客户端连接socket,回调函数就是协议处理prothandle
整体来说流程如下:
a. Server负责初始化epoll;并将自己的监听socket加入epoll;
b. 当客户端有连接的时候,监听socket通过接受处理srvaccept和客户端建立socket连接,并将其加入epoll;
c. 时钟处理通过定时任务prottick,把超时的客户端连接socket加入epoll。
这里的加入epoll,第一个是直接调用socketwant加入的,其他两种都是将socket放入协议处理文件的静态变量dirty中,
然后调用update_conns()来完成的。
4. 网络处理流程总结
a. Server创建epoll;
b. 监听socket加入epoll,接受客户端连接h_accept、连接管理update_conns、关闭连接connclose调用socketwant处理socket请求;
c. 在连接Conn.state为STATE_CLOSE时,或者事件fd和连接fd不同时,或者在update_conns中调用socketwant失败时,调用关闭连接;
d. 接受客户端连接h_accept、协议处理prothandle、定时处理prottick三个函数,函数返回之前都要调用连接管理函数update_conns;
5.. 网络连接接受处理

1 void 2 h_accept(const int fd, const short which, Server *s) 3 { 4 Conn *c; 5 int cfd, flags, r; 6 socklen_t addrlen; 7 struct sockaddr_in6 addr; 8 9 addrlen = sizeof addr; 10 cfd = accept(fd, (struct sockaddr *)&addr, &addrlen); 11 if (cfd == -1) { 12 if (errno != EAGAIN && errno != EWOULDBLOCK) twarn("accept()"); 13 update_conns(); 14 return; 15 } 16 if (verbose) { 17 printf("accept %d ", cfd); 18 } 19 20 flags = fcntl(cfd, F_GETFL, 0); 21 if (flags < 0) { 22 twarn("getting flags"); 23 close(cfd); 24 if (verbose) { 25 printf("close %d ", cfd); 26 } 27 update_conns(); 28 return; 29 } 30 31 r = fcntl(cfd, F_SETFL, flags | O_NONBLOCK); 32 if (r < 0) { 33 twarn("setting O_NONBLOCK"); 34 close(cfd); 35 if (verbose) { 36 printf("close %d ", cfd); 37 } 38 update_conns(); 39 return; 40 } 41 42 c = make_conn(cfd, STATE_WANTCOMMAND, default_tube, default_tube); 43 if (!c) { 44 twarnx("make_conn() failed"); 45 close(cfd); 46 if (verbose) { 47 printf("close %d ", cfd); 48 } 49 update_conns(); 50 return; 51 } 52 c->srv = s; 53 c->sock.x = c; 54 c->sock.f = (Handle)prothandle; 55 c->sock.fd = cfd; 56 57 r = sockwant(&c->sock, 'r'); 58 if (r == -1) { 59 twarn("sockwant"); 60 close(cfd); 61 if (verbose) { 62 printf("close %d ", cfd); 63 } 64 update_conns(); 65 return; 66 } 67 update_conns(); 68 }