从main函数开始,位于memcached.c
1 int main (int argc, char **argv) { 2 //....................... 3 //....................... 4 //....................... 5 6 7 /* handle SIGINT 注册信号处理函数,目前sig_handler是空函数*/ 8 signal(SIGINT, sig_handler); 9 10 settings_init(); 11 /* init settings 12 初始化默认设置,其中 13 settings.port = 11211; 默认端口 14 settings.maxbytes = 64 * 1024 * 1024; 默认使用64MB内存 15 settings.num_threads = 4; 默认启动4个工作线程 16 settings.item_size_max = 1024 * 1024; 默认每对 key value的value最大1MB 17 */ 18 19 20 //....................... 21 //....................... 22 //....................... 23 24 25 /* 初始化主线程的libevent实例 */ 26 main_base = event_init(); 27 28 29 //....................... 30 //....................... 31 //....................... 32 33 34 /* 启动工作线程 */ 35 thread_init(settings.num_threads, main_base); 36 /* 37 // 主线程是分配线程,分配工作给工作线程 38 dispatcher_thread.base = main_base; 39 dispatcher_thread.thread_id = pthread_self(); 40 //设置工作线程的属性以及它们各自的libevent实例初始化 41 for (i = 0; i < nthreads; i++) { 42 int fds[2]; 43 pipe(fds) 44 threads[i].notify_receive_fd = fds[0]; 45 threads[i].notify_send_fd = fds[1]; 46 setup_thread(&threads[i]); 47 } 48 //启动线程,线程处理函数为worker_libevent, 每个线程有各自的event_base_loop 49 for (i = 0; i < nthreads; i++) { 50 create_worker(worker_libevent, &threads[i]); 51 } 52 */ 53 54 55 //....................... 56 //....................... 57 //....................... 58 59 60 /* 创建服务端的socket */ 61 server_sockets(settings.port, tcp_transport, portnumber_file)) 62 63 //....................... 64 //....................... 65 //....................... 66 67 68 /* 主线程的libevent消息循环 */ 69 if (event_base_loop(main_base, 0) != 0) { 70 retval = EXIT_FAILURE; 71 } 72 73 74 //....................... 75 //....................... 76 //....................... 77 78 79 return retval; 80 }
主线程
1 server_sockets(settings.port, tcp_transport, portnumber_file) 2 ---> 3 server_socket(settings.inter, port, transport, portnumber_file); 4 ---> 5 listen(sfd, settings.backlog) 6 listen_conn_add = conn_new(sfd, conn_listening, EV_READ | EV_PERSIST, 1, transport, main_base) 7 ---> 8 //设置sfd的消息响应函数event_handler 9 event_set(&c->event, sfd, event_flags, event_handler, (void *)c); 10 event_base_set(base, &c->event); 11 ---> 12 void event_handler(const int fd, const short which, void *arg) { 13 conn *c; 14 c = (conn *)arg; 15 16 //....................... 17 //....................... 18 //....................... 19 20 drive_machine(c); 21 22 return; 23 } 24 ---> 25 //drive_machine 26 sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen) 27 dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST, 28 DATA_BUFFER_SIZE, tcp_transport); 29 30 ---> 31 void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, 32 int read_buffer_size, enum network_transport transport) { 33 char buf[1]; 34 int tid = (last_thread + 1) % settings.num_threads; 35 36 LIBEVENT_THREAD *thread = threads + tid; 37 38 //往其中一个worker线程的管道写,分配这个链接给worker 39 buf[0] = 'c'; 40 if (write(thread->notify_send_fd, buf, 1) != 1) { 41 perror("Writing to thread notify pipe"); 42 } 43 }
函数补充:
1 static void setup_thread(LIBEVENT_THREAD *me) { 2 me->base = event_init(); 3 if (! me->base) { 4 fprintf(stderr, "Can't allocate event base "); 5 exit(1); 6 } 7 8 // 把管道的读事件注册到libevent对象中 9 // 分配线程会往工作线程的 notify_send_fd 写数据,然后触发工作线程的 notify_receive_fd 的读事件 10 event_set(&me->notify_event, me->notify_receive_fd, 11 EV_READ | EV_PERSIST, thread_libevent_process, me); 12 event_base_set(me->base, &me->notify_event); 13 14 if (event_add(&me->notify_event, 0) == -1) { 15 fprintf(stderr, "Can't monitor libevent notify pipe "); 16 exit(1); 17 } 18 19 //....................... 20 //....................... 21 //....................... 22 }
工作线程
thread_libevent_process -> conn_new -> event_set(&c->event, sfd, event_flags, event_handler, (void *)c);event_base_set(base, &c->event);
->event_handler -> drive_machine
工作线程的消息libevent事件处理就由drive_machine负责
尝试去理解它的工作流程
drive_machine
-->
case conn_read:
case READ_DATA_RECEIVED:
conn_set_state(c, conn_parse_cmd);
-->
case conn_parse_cmd :
if (try_read_command(c) == 0) {
-->
static int try_read_command(conn *c) {
//..............
if (c->protocol == binary_prot) {
}
else {
//............
process_command(c, c->rcurr);
}
-->
static void process_command(conn *c, char *command) {
//假设是set
} else if ((ntokens == 6 || ntokens == 7) &&
((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
(strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
(strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
(strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
(strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
process_update_command(c, tokens, ntokens, comm, false);
-->
process_update_command(c, tokens, ntokens, comm, false);
{
//...........
//其实我不懂为什么要分配一个出来
it = item_alloc(key, nkey, flags, realtime(exptime), vlen);
//................
conn_set_state(c, conn_nread);
}
-->
//drive_machine
case conn_nread:
if (c->rlbytes == 0) {
complete_nread(c);
-->
complete_nread_ascii(c);
{
ret = store_item(it, comm, c);
}
不知道理解得对不对
-----------------------------------------------------------------------------------------------------------------
用户连接memcached成功后,主要逻辑在drive_machine这个函数
我尝试打日志,理解它的流程
A:表示memcached B:表示用户
A: ./memcached -m 512 -p 14444 -vv
B: telnet 127.0.0.1 14444
A: ############################## state = conn_listening
B: add id 1 0 4
A:
############################## state = conn_new_cmd
############################## state = conn_waiting
############################## state = conn_read
############################## state = conn_parse_cmd
32: Client using the ascii protocol
<32 add id 1 0 4
############################## state = conn_nread
B: 1234
A:
############################## state = conn_nread
############################## state = conn_nread
>32 STORED
############################## state = conn_write
############################## state = conn_mwrite
############################## state = conn_write
############################## state = conn_mwrite
############################## state = conn_new_cmd
############################## state = conn_waiting
B: 收到
STORED
B: get id
A:
############################## state = conn_read
############################## state = conn_parse_cmd
<32 get id
>32 sending key id
>32 END
############################## state = conn_mwrite
############################## state = conn_mwrite
############################## state = conn_new_cmd
############################## state = conn_waiting
B: 收到
VALUE id 1 4
1234
END
B: quit
A:
############################## state = conn_read
############################## state = conn_parse_cmd
<32 quit
############################## state = conn_closing
<32 connection closed.