zoukankan      html  css  js  c++  java
  • memcached源码阅读笔记一

    从main函数开始,位于memcached.c

     1 int main (int argc, char **argv) {
     2     //.......................
     3     //.......................
     4     //.......................
     5     
     6     
     7     /* handle SIGINT 注册信号处理函数,目前sig_handler是空函数*/
     8     signal(SIGINT, sig_handler);
     9     
    10     settings_init();
    11     /* init settings
    12          初始化默认设置,其中
    13          settings.port = 11211;                     默认端口
    14          settings.maxbytes = 64 * 1024 * 1024;      默认使用64MB内存
    15          settings.num_threads = 4;                  默认启动4个工作线程
    16          settings.item_size_max = 1024 * 1024;      默认每对 key value的value最大1MB
    17      */
    18     
    19     
    20     //.......................
    21     //.......................
    22     //.......................
    23     
    24     
    25     /* 初始化主线程的libevent实例 */
    26     main_base = event_init();
    27     
    28     
    29     //.......................
    30     //.......................
    31     //.......................
    32     
    33     
    34     /* 启动工作线程 */
    35     thread_init(settings.num_threads, main_base);
    36     /*
    37          // 主线程是分配线程,分配工作给工作线程
    38          dispatcher_thread.base = main_base;
    39          dispatcher_thread.thread_id = pthread_self();
    40          //设置工作线程的属性以及它们各自的libevent实例初始化
    41          for (i = 0; i < nthreads; i++) {
    42          int fds[2];
    43          pipe(fds)
    44          threads[i].notify_receive_fd = fds[0];
    45          threads[i].notify_send_fd = fds[1];
    46          setup_thread(&threads[i]);
    47          }
    48          //启动线程,线程处理函数为worker_libevent, 每个线程有各自的event_base_loop
    49          for (i = 0; i < nthreads; i++) {
    50          create_worker(worker_libevent, &threads[i]);
    51          }
    52     */
    53     
    54     
    55     //.......................
    56     //.......................
    57     //.......................
    58     
    59     
    60     /* 创建服务端的socket */
    61     server_sockets(settings.port, tcp_transport, portnumber_file))
    62     
    63     //.......................
    64     //.......................
    65     //.......................
    66     
    67     
    68     /* 主线程的libevent消息循环 */
    69     if (event_base_loop(main_base, 0) != 0) {
    70         retval = EXIT_FAILURE;
    71     }
    72     
    73     
    74     //.......................
    75     //.......................
    76     //.......................
    77     
    78     
    79     return retval;
    80 }

    主线程

     1 server_sockets(settings.port, tcp_transport, portnumber_file)
     2 --->
     3 server_socket(settings.inter, port, transport, portnumber_file);
     4 --->
     5 listen(sfd, settings.backlog)
     6 listen_conn_add = conn_new(sfd, conn_listening, EV_READ | EV_PERSIST, 1, transport, main_base)
     7 --->    
     8 //设置sfd的消息响应函数event_handler
     9 event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
    10 event_base_set(base, &c->event);
    11 --->
    12 void event_handler(const int fd, const short which, void *arg) {
    13     conn *c;
    14     c = (conn *)arg;
    15     
    16     //.......................
    17     //.......................
    18     //.......................
    19     
    20     drive_machine(c);
    21     
    22     return;
    23 }
    24 --->
    25 //drive_machine
    26 sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)
    27 dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
    28                   DATA_BUFFER_SIZE, tcp_transport);
    29 
    30 --->
    31 void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
    32                        int read_buffer_size, enum network_transport transport) {
    33     char buf[1];
    34     int tid = (last_thread + 1) % settings.num_threads;
    35     
    36     LIBEVENT_THREAD *thread = threads + tid;
    37     
    38     //往其中一个worker线程的管道写,分配这个链接给worker
    39     buf[0] = 'c';
    40     if (write(thread->notify_send_fd, buf, 1) != 1) {
    41         perror("Writing to thread notify pipe");
    42     }
    43 }

    函数补充:

     1 static void setup_thread(LIBEVENT_THREAD *me) {
     2     me->base = event_init();
     3     if (! me->base) {
     4         fprintf(stderr, "Can't allocate event base
    ");
     5         exit(1);
     6     }
     7     
     8     // 把管道的读事件注册到libevent对象中
     9     // 分配线程会往工作线程的 notify_send_fd 写数据,然后触发工作线程的 notify_receive_fd 的读事件
    10     event_set(&me->notify_event, me->notify_receive_fd,
    11               EV_READ | EV_PERSIST, thread_libevent_process, me);
    12     event_base_set(me->base, &me->notify_event);
    13     
    14     if (event_add(&me->notify_event, 0) == -1) {
    15         fprintf(stderr, "Can't monitor libevent notify pipe
    ");
    16         exit(1);
    17     }
    18     
    19     //.......................
    20     //.......................
    21     //.......................
    22 }

    工作线程

    thread_libevent_process -> conn_new -> event_set(&c->event, sfd, event_flags, event_handler, (void *)c);event_base_set(base, &c->event);

    ->event_handler -> drive_machine

    工作线程的消息libevent事件处理就由drive_machine负责

    尝试去理解它的工作流程

    drive_machine

    -->

    case conn_read:

        case READ_DATA_RECEIVED:
        conn_set_state(c, conn_parse_cmd);

    -->

    case conn_parse_cmd :
        if (try_read_command(c) == 0) {

    -->

    static int try_read_command(conn *c) {

        //..............

        if (c->protocol == binary_prot) {

        }

        else {

            //............

            process_command(c, c->rcurr);

        }

    -->

    static void process_command(conn *c, char *command) { 

        //假设是set    

      } else if ((ntokens == 6 || ntokens == 7) &&
      ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
      (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
      (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
      (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
      (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {

        process_update_command(c, tokens, ntokens, comm, false);

    -->

    process_update_command(c, tokens, ntokens, comm, false);
    {

       //...........
      //其实我不懂为什么要分配一个出来
      it = item_alloc(key, nkey, flags, realtime(exptime), vlen);
      //................
      conn_set_state(c, conn_nread);
    }

    -->

    //drive_machine
    case conn_nread:
      if (c->rlbytes == 0) {
        complete_nread(c);

    -->

    complete_nread_ascii(c);
    {
      ret = store_item(it, comm, c);
    }

    不知道理解得对不对

    -----------------------------------------------------------------------------------------------------------------

    用户连接memcached成功后,主要逻辑在drive_machine这个函数

    我尝试打日志,理解它的流程

    A:表示memcached  B:表示用户

    A:   ./memcached -m 512 -p 14444 -vv 
    B:   telnet 127.0.0.1 14444

    A:   ##############################  state = conn_listening  
    B:   add id 1 0 4

    A:   
    ##############################  state = conn_new_cmd
    ##############################  state = conn_waiting
    ##############################  state = conn_read
    ##############################  state = conn_parse_cmd 
    32: Client using the ascii protocol
    <32 add id 1 0 4
    ##############################  state = conn_nread
     
    B:  1234
    A:
    ##############################  state = conn_nread
    ##############################  state = conn_nread
    >32 STORED
    ##############################  state = conn_write
    ##############################  state = conn_mwrite
    ##############################  state = conn_write
    ##############################  state = conn_mwrite
    ##############################  state = conn_new_cmd
    ##############################  state = conn_waiting

    B: 收到
    STORED  

    B: get id

    A:

    ############################## state = conn_read
    ############################## state = conn_parse_cmd
    <32 get id
    >32 sending key id
    >32 END
    ############################## state = conn_mwrite
    ############################## state = conn_mwrite
    ############################## state = conn_new_cmd
    ############################## state = conn_waiting

    B: 收到

    VALUE id 1 4
    1234
    END

    B: quit

    A:

    ############################## state = conn_read
    ############################## state = conn_parse_cmd
    <32 quit
    ############################## state = conn_closing
    <32 connection closed.

  • 相关阅读:
    初识Spring框架IOC属性注入
    JSP:在本地获取图片后立即展示选择的图片
    JavaWeb手机短信实现前台利用JS获取随机验证码,倒计时效果
    通过form表单上传文件,后台接收的方法
    封装数据库方法
    JavaWeb无限级分销结构分析
    JavaWeb忘记密码后通过邮箱进入修改密码的界面
    JavaWeb通过快递单号展示物流信息转JSON显示(servlet)
    markdown 语法测试
    example数据库
  • 原文地址:https://www.cnblogs.com/yemsheng/p/3308132.html
Copyright © 2011-2022 走看看