zoukankan      html  css  js  c++  java
  • memcached源代码阅读笔记(6)memcached1.4.10 在linux平台多线程模型研究

    为了方便gdb调试,按照如下重新编译
    ./configure --prefix=/usr/local/memcached --with-libevent=/usr/local/libevent CPPFLAGS='-ggdb3'

    make && make install

    gdb memcached
    gdb>set args -p 11211 -m 64 -uroot -t2 -vv

    然后

    gdb>b main

    然后s,n,u,disp,printf跟踪。

    这里仅研究TCP的情况,udp和unix域基本差不多。

    main()

    thread_init(settings.num_threads, main_base);

    这是初始化线程

    if (start_assoc_maintenance_thread() == -1) {
            exit(EXIT_FAILURE);
     }这个是开始一个hash批量维护的线程,用于扩展hash表。

     if (settings.port && server_sockets(settings.port, tcp_transport,
                                               portnumber_file))

    void thread_init(int nthreads, struct event_base *main_base) {
        int         i;
        int         power;

        pthread_mutex_init(&cache_lock, NULL);
        pthread_mutex_init(&stats_lock, NULL);

        pthread_mutex_init(&init_lock, NULL);
        pthread_cond_init(&init_cond, NULL);

        pthread_mutex_init(&cqi_freelist_lock, NULL);
        cqi_freelist = NULL;

        /* Want a wide lock table, but don't waste memory */
        if (nthreads < 3) {
            power = 10;
        } else if (nthreads < 4) {
            power = 11;
        } else if (nthreads < 5) {
            power = 12;
        } else {
            /* 8192 buckets, and central locks don't scale much past 5 threads */
            power = 13;
        }

        item_lock_count = ((unsigned long int)1 << (power));
        item_lock_mask  = item_lock_count - 1;

        item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
        if (! item_locks) {
            perror("Can't allocate item locks");
            exit(1);
        }
        for (i = 0; i < item_lock_count; i++) {
            pthread_mutex_init(&item_locks[i], NULL);
        }

        threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
        if (! threads) {
            perror("Can't allocate thread descriptors");
            exit(1);
        }

        dispatcher_thread.base = main_base;
        dispatcher_thread.thread_id = pthread_self();

        for (i = 0; i < nthreads; i++) {
            int fds[2];
            if (pipe(fds)) {
                perror("Can't create notify pipe");
                exit(1);
            }

            threads[i].notify_receive_fd = fds[0];
            threads[i].notify_send_fd = fds[1];

            setup_thread(&threads[i]);
            /* Reserve three fds for the libevent base, and two for the pipe */
            stats.reserved_fds += 5;
        }

        /* Create threads after we've done all the libevent setup. */
        for (i = 0; i < nthreads; i++) {
            create_worker(worker_libevent, &threads[i]);
        }

        /* Wait for all the threads to set themselves up before returning. */
        pthread_mutex_lock(&init_lock);
        while (init_count < nthreads) {
            pthread_cond_wait(&init_cond, &init_lock);
        }
        pthread_mutex_unlock(&init_lock);
    }


        dispatcher_thread.base = main_base;
    dispatcher_thread.thread_id = pthread_self();
    这说明dispatcher_thread是主线程

    if (pipe(fds)) {
                perror("Can't create notify pipe");
                exit(1);
            }

    threads[i].notify_receive_fd = fds[0];
    threads[i].notify_send_fd = fds[1];
    为每个工作线程建立通知管道
     setup_thread(&threads[i]);
    初始化工作线程
    create_worker(worker_libevent, &threads[i]);
    创建工作线程
    pthread_mutex_lock(&init_lock);
        while (init_count < nthreads) {
            pthread_cond_wait(&init_cond, &init_lock);
        }
        pthread_mutex_unlock(&init_lock);
    等待所有工作线程创建完毕。


    static void setup_thread(LIBEVENT_THREAD *me) {
        me->base = event_init();
        if (! me->base) {
            fprintf(stderr, "Can't allocate event base\n");
            exit(1);thread_libevent_process
        }

        /* Listen for notifications from other threads */
        event_set(&me->notify_event, me->notify_receive_fd,
                  EV_READ | EV_PERSIST, thread_libevent_process, me);

    为通知管道设置响应函数
        event_base_set(me->base, &me->notify_event);

        if (event_add(&me->notify_event, 0) == -1) {
            fprintf(stderr, "Can't monitor libevent notify pipe\n");
            exit(1);
        }

        me->new_conn_queue = malloc(sizeof(struct conn_queue));
        if (me->new_conn_queue == NULL) {
            perror("Failed to allocate memory for connection queue");
            exit(EXIT_FAILURE);
        }
        cq_init(me->new_conn_queue);
    初始化工作队列
        if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
            perror("Failed to initialize mutex");
            exit(EXIT_FAILURE);
        }

        me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
                                        NULL, NULL);
        if (me->suffix_cache == NULL) {
            fprintf(stderr, "Failed to create suffix cache\n");
            exit(EXIT_FAILURE);
        }


    工作线程收到通知后的响应函数
    static void thread_libevent_process(int fd, short which, void *arg) {
        LIBEVENT_THREAD *me = arg;
        CQ_ITEM *item;
        char buf[1];

        if (read(fd, buf, 1) != 1)//查看是否收到正确的管道通知
            if (settings.verbose > 0)
                fprintf(stderr, "Can't read from libevent pipe\n");

        item = cq_pop(me->new_conn_queue);//弹出连接队列

        if (NULL != item) {
            conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
                               item->read_buffer_size, item->transport, me->base);创建一个新的连接,当然这个新的连接的状态不再是Listening
            if (c == NULL) {
                if (IS_UDP(item->transport)) {
                    fprintf(stderr, "Can't listen for events on UDP socket\n");
                    exit(1);
                } else {
                    if (settings.verbose > 0) {
                        fprintf(stderr, "Can't listen for events on fd %d\n",
                            item->sfd);
                    }
                    close(item->sfd);
                }
            } else {
                c->thread = me;
            }
            cqi_free(item);
        }
    }

    static int server_socket(const char *interface,
                             int port,
                             enum network_transport transport,
                             FILE *portnumber_file) {
        int sfd;
        struct linger ling = {0, 0};
        struct addrinfo *ai;
        struct addrinfo *next;
        struct addrinfo hints = { .ai_flags = AI_PASSIVE,
                                  .ai_family = AF_UNSPEC };
        char port_buf[NI_MAXSERV];
        int error;
        int success = 0;
        int flags =1;

        hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;

        if (port == -1) {
            port = 0;
        }
        snprintf(port_buf, sizeof(port_buf), "%d", port);
        error= getaddrinfo(interface, port_buf, &hints, &ai);
        if (error != 0) {
            if (error != EAI_SYSTEM)
              fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
            else
              perror("getaddrinfo()");
            return 1;
        }

        for (next= ai; next; next= next->ai_next) {
            conn *listen_conn_add;
            if ((sfd = new_socket(next)) == -1) {
                /* getaddrinfo can return "junk" addresses,
                 * we make sure at least one works before erroring.
                 */
                if (errno == EMFILE) {
                    /* ...unless we're out of fds */
                    perror("server_socket");
                    exit(EX_OSERR);
                }
                continue;
            }

    #ifdef IPV6_V6ONLY
            if (next->ai_family == AF_INET6) {
                error = setsockopt(sfd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &flags, sizeof(flags));
                if (error != 0) {
                    perror("setsockopt");
                    close(sfd);
                    continue;
                }
            }
    #endif

            setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
            if (IS_UDP(transport)) {
                maximize_sndbuf(sfd);
            } else {
                error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
                if (error != 0)
                    perror("setsockopt");

                error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
                if (error != 0)
                    perror("setsockopt");

                error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
                if (error != 0)
                    perror("setsockopt");
            }

            if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
                if (errno != EADDRINUSE) {
                    perror("bind()");
                    close(sfd);
                    freeaddrinfo(ai);
                    return 1;
                }
                close(sfd);
                continue;
            } else {
                success++;
                if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {
                    perror("listen()");
                    close(sfd);
                    freeaddrinfo(ai);
                    return 1;
                }
                if (portnumber_file != NULL &&
                    (next->ai_addr->sa_family == AF_INET ||
                     next->ai_addr->sa_family == AF_INET6)) {
                    union {
                        struct sockaddr_in in;
                        struct sockaddr_in6 in6;
                    } my_sockaddr;
                    socklen_t len = sizeof(my_sockaddr);
                    if (getsockname(sfd, (struct sockaddr*)&my_sockaddr, &len)==0) {
                        if (next->ai_addr->sa_family == AF_INET) {
                            fprintf(portnumber_file, "%s INET: %u\n",
                                    IS_UDP(transport) ? "UDP" : "TCP",
                                    ntohs(my_sockaddr.in.sin_port));
                        } else {
                            fprintf(portnumber_file, "%s INET6: %u\n",
                                    IS_UDP(transport) ? "UDP" : "TCP",
                                    ntohs(my_sockaddr.in6.sin6_port));
                        }
                    }
                }
            }

            if (IS_UDP(transport)) {
                int c;

                for (c = 0; c < settings.num_threads_per_udp; c++) {
                    /* this is guaranteed to hit all threads because we round-robin */
                    dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
                                      UDP_READ_BUFFER_SIZE, transport);
                }
            } else {
                if (!(listen_conn_add = conn_new(sfd, conn_listening,
                                                 EV_READ | EV_PERSIST, 1,
                                                 transport, main_base)))
    //这里是设置listenging状态 {
                    fprintf(stderr, "failed to create listening connection\n");
                    exit(EXIT_FAILURE);
                }
                listen_conn_add->next = listen_conn;
                listen_conn = listen_conn_add;
            }
        }

        freeaddrinfo(ai);

        /* Return zero iff we detected no errors in starting up connections */
        return success == 0;
    }

    conn *conn_new(const int sfd, enum conn_states init_state,
                    const int event_flags,
                    const int read_buffer_size, enum network_transport transport,
                    struct event_base *base) {
        conn *c = conn_from_freelist();

        if (NULL == c) {
            if (!(c = (conn *)calloc(1, sizeof(conn)))) {
                fprintf(stderr, "calloc()\n");
                return NULL;
            }
            MEMCACHED_CONN_CREATE(c);

            c->rbuf = c->wbuf = 0;
            c->ilist = 0;
            c->suffixlist = 0;
            c->iov = 0;
            c->msglist = 0;
            c->hdrbuf = 0;

            c->rsize = read_buffer_size;
            c->wsize = DATA_BUFFER_SIZE;
            c->isize = ITEM_LIST_INITIAL;
            c->suffixsize = SUFFIX_LIST_INITIAL;
            c->iovsize = IOV_LIST_INITIAL;
            c->msgsize = MSG_LIST_INITIAL;
            c->hdrsize = 0;

            c->rbuf = (char *)malloc((size_t)c->rsize);
            c->wbuf = (char *)malloc((size_t)c->wsize);
            c->ilist = (item **)malloc(sizeof(item *) * c->isize);
            c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
            c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
            c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);

            if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
                    c->msglist == 0 || c->suffixlist == 0) {
                conn_free(c);
                fprintf(stderr, "malloc()\n");
                return NULL;
            }

            STATS_LOCK();
            stats.conn_structs++;
            STATS_UNLOCK();
        }

        c->transport = transport;
        c->protocol = settings.binding_protocol;

        /* unix socket mode doesn't need this, so zeroed out.  but why
         * is this done for every command?  presumably for UDP
         * mode.  */
        if (!settings.socketpath) {
            c->request_addr_size = sizeof(c->request_addr);
        } else {
            c->request_addr_size = 0;
        }

        if (settings.verbose > 1) {
            if (init_state == conn_listening) {
                fprintf(stderr, "<%d server listening (%s)\n", sfd,
                    prot_text(c->protocol));
            } else if (IS_UDP(transport)) {
                fprintf(stderr, "<%d server listening (udp)\n", sfd);
            } else if (c->protocol == negotiating_prot) {
                fprintf(stderr, "<%d new auto-negotiating client connection\n",
                        sfd);
            } else if (c->protocol == ascii_prot) {
                fprintf(stderr, "<%d new ascii client connection.\n", sfd);
            } else if (c->protocol == binary_prot) {
                fprintf(stderr, "<%d new binary client connection.\n", sfd);
            } else {
                fprintf(stderr, "<%d new unknown (%d) client connection\n",
                    sfd, c->protocol);
                assert(false);
            }
        }

        c->sfd = sfd;
        c->state = init_state;
        c->rlbytes = 0;
        c->cmd = -1;
        c->rbytes = c->wbytes = 0;
        c->wcurr = c->wbuf;
        c->rcurr = c->rbuf;
        c->ritem = 0;
        c->icurr = c->ilist;
        c->suffixcurr = c->suffixlist;
        c->ileft = 0;
        c->suffixleft = 0;
        c->iovused = 0;
        c->msgcurr = 0;
        c->msgused = 0;

        c->write_and_go = init_state;
        c->write_and_free = 0;
        c->item = 0;

        c->noreply = false;

        event_set(&c->event, sfd, event_flags, event_handler, (void *)c);

        event_base_set(base, &c->event);
    这里设置处理函数是eventhandler


        c->ev_flags = event_flags;

        if (event_add(&c->event, 0) == -1) {
            if (conn_add_to_freelist(c)) {
                conn_free(c);
            }
            perror("event_add");
            return NULL;
        }

        STATS_LOCK();
        stats.curr_conns++;
        stats.total_conns++;
        STATS_UNLOCK();

        MEMCACHED_CONN_ALLOCATE(c->sfd);

        return c;
    }

    eventhandler函数很简单
    void event_handler(const int fd, const short which, void *arg) {
        conn *c;

        c = (conn *)arg;
        assert(c != NULL);

        c->which = which;

        /* sanity */
        if (fd != c->sfd) {
            if (settings.verbose > 0)
                fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
            conn_close(c);
            return;
        }

        drive_machine(c);

        /* wait for next event */
        return;
    }
    都调用了drive_machine(c);
    static void drive_machine(conn *c) {
        bool stop = false;
        int sfd, flags = 1;
        socklen_t addrlen;
        struct sockaddr_storage addr;
        int nreqs = settings.reqs_per_event;
        int res;
        const char *str;

        assert(c != NULL);

        static void drive_machine(conn *c) {
        bool stop = false;
        int sfd, flags = 1;
        socklen_t addrlen;
        struct sockaddr_storage addr;
        int nreqs = settings.reqs_per_event;
        int res;
        const char *str;

        assert(c != NULL);

        while (!stop) {

            switch(c->state) {
            case conn_listening:
                addrlen = sizeof(addr);
                if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
                    if (errno == EAGAIN || errno == EWOULDBLOCK) {
                        /* these are transient, so don't log anything */
                        stop = true;
                    } else if (errno == EMFILE) {
                        if (settings.verbose > 0)
                            fprintf(stderr, "Too many open connections\n");
                        accept_new_conns(false);
                        stop = true;
                    } else {
                        perror("accept()");
                        stop = true;
                    }
                    break;
                }
                if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
                    fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
                    perror("setting O_NONBLOCK");
                    close(sfd);
                    break;
                }

                if (settings.maxconns_fast &&
                    stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {
                    str = "ERROR Too many open connections\r\n";
                    res = write(sfd, str, strlen(str));
                    close(sfd);
                    STATS_LOCK();
                    stats.rejected_conns++;
                    STATS_UNLOCK();
                } else {
                    dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
                                         DATA_BUFFER_SIZE, tcp_transport);

    这个函数开始向工作线程派发一个链接


    void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
                           int read_buffer_size, enum network_transport transport) {
        CQ_ITEM *item = cqi_new();
        int tid = (last_thread + 1) % settings.num_threads;
    //robin round模式轮询线程,这是最快的处理方式,也可以根据各个线程的queue number来分配,不过没太大必要,所以采用最简单的方式
        LIBEVENT_THREAD *thread = threads + tid;

        last_thread = tid;

        item->sfd = sfd;
        item->init_state = init_state;
        item->event_flags = event_flags;
        item->read_buffer_size = read_buffer_size;
        item->transport = transport;

        cq_push(thread->new_conn_queue, item);//向工作线程插入工作向
        MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
        if (write(thread->notify_send_fd, "", 1) != 1) {//向通知管道写入数据。
            perror("Writing to thread notify pipe");
        }
    }
                }

    前面工作线程函数接收管道通知后就开始建立连接,连接的状态设置为EV_READ | EV_PERSIST
    由此可知始终只有一个listening线程,其他都是处理工作线程。

  • 相关阅读:
    Ubuntu升级软件和ubuntu升级系统的命令
    ASP 中如何根据数据库中取出的值来判定 checkbox或radio 的状态是否为选中
    C# 根据年、月、周、星期获得日期等
    鼠标右击事件
    【原创】VB利用堆栈实现算术表达式计算
    【算法】VB6实现哈夫曼编码生成的类
    【算法】VB 24点计算
    【算法】VB实现后缀表达式转中缀表达式
    C#操作Excel替换关键字 Johan
    C#递归遍历文件夹下的文件 Johan
  • 原文地址:https://www.cnblogs.com/yanzhenan/p/2288023.html
Copyright © 2011-2022 走看看