- /*
- * 文件开头先啰嗦几句:
- *
- * thread.c文件代表的是线程模块。但是你会看到这个模块里面有很多其它方法,
- 例如关于item的各种操作函数,item_alloc,item_remove,item_link等等。
- 我们有个items模块,这些不都是items模块要做的事情吗?为什么thread模块也有?
- 你仔细看会发现,thread里面的这种函数,例如item_remove,items模块里面
- 都会有一个对应的do_item_remove函数,而thread中的item_remove仅仅是调用
- items模块中的do_item_remove,唯一多出来的就是thread在do_item_remove前后
- 加了加锁和解锁的操作。
- 其实这是很好的一种设计。
- 1)因为像"删除item"这样的一个逻辑都是由某个线程,而且这里是工作线程执行,
- 所以这是一个线程层面的事情。就是说是“某个工作线程去删除item”这样一件事。
- 2)更重要的是原子性及一致性问题,某个item数据,很有可能同时多个线程在修改,
- 那么需要加锁,那么锁最应该加在哪个地方?既然问题是线程引起的,那么负责
- 解决的无疑是线程模块。
- 3)所以这里像这种函数,thread此时相当于是items的外壳,起调控作用,在线程层面
- 开放给外部调用,同时在内部加锁。而items模块里面定义的do_xxx函数都不需要多
- 加考虑,无条件执行对item进行修改,而由外部被调用方来控制。相信很多需要加锁
- 的项目都会面临这样的问题:锁应该加在哪一层?可以参考memcached这样的设计。
- *
- */
- #include "memcached.h"
- #include <assert.h>
- #include <stdio.h>
- #include <errno.h>
- #include <stdlib.h>
- #include <errno.h>
- #include <string.h>
- #include <pthread.h>
- #ifdef __sun
- #include <atomic.h>
- #endif
- #define ITEMS_PER_ALLOC 64
- /**
- 下面这个CQ_ITEM结构体:
- 可以这么理解,主线程accept连接后,把client fd
- 分发到worker线程的同时会顺带一些与此client连接相关的信息,
- 而CQ_ITEM是包装了这些信息的一个对象,有点"参数对象"的概念。
- 记住这货是主线程那边丢过来的。
- CQ_ITEM中的CQ虽然是connection queue的缩写,
- 它与memcached.h中定义的conn结构体是完全不一样的概念,
- 但worker线程会利用这个CQ_ITEM对象去初始化conn对象
- */
- typedef struct conn_queue_item CQ_ITEM;
- struct conn_queue_item {
- int sfd;
- enum conn_states init_state;
- int event_flags;
- int read_buffer_size;
- enum network_transport transport;
- CQ_ITEM *next;
- };
- /*
- 上面的CQ_ITEM的队列对象,每个worker线程对象都保存着这样一个队列,处理
- 主线程那边分发过来的连接请求时用到。
- */
- typedef struct conn_queue CQ;
- struct conn_queue {
- CQ_ITEM *head;
- CQ_ITEM *tail;
- pthread_mutex_t lock;
- };
- //下面是各种锁
- /**
- 个人认为这个锁用于锁住全局数量不变的对象,例如slabclass,LRU链表等等
- 区别于item锁,由于item对象是动态增长的,数量非常多,
- item锁是用hash的方式分配一张大大的item锁表来控制锁的粒度
- */
- pthread_mutex_t cache_lock;
- pthread_mutex_t conn_lock = PTHREAD_MUTEX_INITIALIZER; //连接锁
- #if !defined(HAVE_GCC_ATOMICS) && !defined(__sun)
- pthread_mutex_t atomics_mutex = PTHREAD_MUTEX_INITIALIZER;
- #endif
- static pthread_mutex_t stats_lock; //统计锁
- static CQ_ITEM *cqi_freelist;
- static pthread_mutex_t cqi_freelist_lock;
- static pthread_mutex_t *item_locks; //item锁
- static uint32_t item_lock_count; //item锁总数
- static unsigned int item_lock_hashpower; //item锁的hash表 指数,锁总数为2的item_lock_hashpower个,见下面的hashsize
- #define hashsize(n) ((unsigned long int)1<<(n))
- #define hashmask(n) (hashsize(n)-1)
- static pthread_mutex_t item_global_lock;
- static pthread_key_t item_lock_type_key;
- static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;
- static LIBEVENT_THREAD *threads;
- static int init_count = 0; //有多少个worker线程已经被初始化
- static pthread_mutex_t init_lock; //初始化锁
- static pthread_cond_t init_cond; //初始化条件变量
- static void thread_libevent_process(int fd, short which, void *arg);
- //引用计数加1
- unsigned short refcount_incr(unsigned short *refcount) {
- return __sync_add_and_fetch(refcount, 1);
- #elif defined(__sun)
- return atomic_inc_ushort_nv(refcount);
- #else
- unsigned short res;
- mutex_lock(&atomics_mutex);
- (*refcount)++;
- res = *refcount;
- mutex_unlock(&atomics_mutex);
- return res;
- #endif
- }
- //引用计数减1
- unsigned short refcount_decr(unsigned short *refcount) {
- return __sync_sub_and_fetch(refcount, 1);
- #elif defined(__sun)
- return atomic_dec_ushort_nv(refcount);
- #else
- unsigned short res;
- mutex_lock(&atomics_mutex);
- (*refcount)--;
- res = *refcount;
- mutex_unlock(&atomics_mutex);
- return res;
- #endif
- }
- void item_lock_global(void) {
- mutex_lock(&item_global_lock);
- }
- void item_unlock_global(void) {
- mutex_unlock(&item_global_lock);
- }
- void item_lock(uint32_t hv) {
- uint8_t *lock_type = pthread_getspecific(item_lock_type_key);
- if (likely(*lock_type == ITEM_LOCK_GRANULAR)) {
- mutex_lock(&item_locks[hv & hashmask(item_lock_hashpower)]);
- } else {
- mutex_lock(&item_global_lock);
- }
- }
- void *item_trylock(uint32_t hv) {
- pthread_mutex_t *lock = &item_locks[hv & hashmask(item_lock_hashpower)];
- if (pthread_mutex_trylock(lock) == 0) {
- return lock;
- }
- return NULL;
- }
- void item_trylock_unlock(void *lock) {
- mutex_unlock((pthread_mutex_t *) lock);
- }
- void item_unlock(uint32_t hv) {
- uint8_t *lock_type = pthread_getspecific(item_lock_type_key);
- if (likely(*lock_type == ITEM_LOCK_GRANULAR)) {
- mutex_unlock(&item_locks[hv & hashmask(item_lock_hashpower)]);
- } else {
- mutex_unlock(&item_global_lock);
- }
- }
- static void wait_for_thread_registration(int nthreads) {
- while (init_count < nthreads) {
- pthread_cond_wait(&init_cond, &init_lock); //主线程利用条件变量等待所有worker线程启动完毕
- }
- }
- //worker线程注册函数,主要是统计worker线程完成初始化个数。
- static void register_thread_initialized(void) {
- pthread_mutex_lock(&init_lock);
- init_count++;
- pthread_cond_signal(&init_cond);
- pthread_mutex_unlock(&init_lock);
- }
- //item锁的粒度有几种,这里是切换类型
- void switch_item_lock_type(enum item_lock_types type) {
- char buf[1];
- int i;
- switch (type) {
- buf[0] = 'l';
- break;
- buf[0] = 'g';
- break;
- default:
- fprintf(stderr, "Unknown lock type: %d ", type);
- assert(1 == 0);
- break;
- }
- pthread_mutex_lock(&init_lock);
- init_count = 0;
- for (i = 0; i < settings.num_threads; i++) {
- if (write(threads[i].notify_send_fd, buf, 1) != 1) {
- perror("Failed writing to notify pipe");
- /* TODO: This is a fatal problem. Can it ever happen temporarily? */
- }
- }
- wait_for_thread_registration(settings.num_threads);
- pthread_mutex_unlock(&init_lock);
- }
- /*
- * Initializes a connection queue.
- 初始化一个CQ对象,CQ结构体和CQ_ITEM结构体的作用见它们定义处。
- */
- static void cq_init(CQ *cq) {
- pthread_mutex_init(&cq->lock, NULL);
- cq->head = NULL;
- cq->tail = NULL;
- }
- /**
- 从worker线程的CQ队列里面pop出一个CQ_ITEM对象
- */
- static CQ_ITEM *cq_pop(CQ *cq) {
- CQ_ITEM *item;
- pthread_mutex_lock(&cq->lock);
- item = cq->head;
- if (NULL != item) {
- cq->head = item->next;
- if (NULL == cq->head)
- cq->tail = NULL;
- }
- pthread_mutex_unlock(&cq->lock);
- return item;
- }
- /**
- push一个CQ_ITEM对象到worker线程的CQ队列中
- */
- static void cq_push(CQ *cq, CQ_ITEM *item) {
- item->next = NULL;
- pthread_mutex_lock(&cq->lock);
- if (NULL == cq->tail)
- cq->head = item;
- else
- cq->tail->next = item;
- cq->tail = item;
- pthread_mutex_unlock(&cq->lock);
- }
- /*
- * Returns a fresh connection queue item.
- 分配一个CQ_ITEM对象
- */
- static CQ_ITEM *cqi_new(void) {
- CQ_ITEM *item = NULL;
- pthread_mutex_lock(&cqi_freelist_lock);
- if (cqi_freelist) {
- item = cqi_freelist;
- cqi_freelist = item->next;
- }
- pthread_mutex_unlock(&cqi_freelist_lock);
- if (NULL == item) {
- int i;
- /* Allocate a bunch of items at once to reduce fragmentation */
- item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC);
- if (NULL == item) {
- stats.malloc_fails++;
- return NULL;
- }
- for (i = 2; i < ITEMS_PER_ALLOC; i++)
- item[i - 1].next = &item[i];
- pthread_mutex_lock(&cqi_freelist_lock);
- item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
- cqi_freelist = &item[1];
- pthread_mutex_unlock(&cqi_freelist_lock);
- }
- return item;
- }
- /*
- * Frees a connection queue item (adds it to the freelist.)
- */
- static void cqi_free(CQ_ITEM *item) {
- pthread_mutex_lock(&cqi_freelist_lock);
- item->next = cqi_freelist;
- cqi_freelist = item;
- pthread_mutex_unlock(&cqi_freelist_lock);
- }
- /*
- 创建并启动worker线程,在thread_init主线程初始化时调用
- */
- static void create_worker(void *(*func)(void *), void *arg) {
- pthread_t thread;
- pthread_attr_t attr;
- int ret;
- pthread_attr_init(&attr);
- if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) {
- fprintf(stderr, "Can't create thread: %s ",
- strerror(ret));
- exit(1);
- }
- }
- void accept_new_conns(const bool do_accept) {
- pthread_mutex_lock(&conn_lock);
- do_accept_new_conns(do_accept);
- pthread_mutex_unlock(&conn_lock);
- }
- /****************************** LIBEVENT THREADS *****************************/
- /*
- * 装备worker线程,worker线程的event_base在此设置
- */
- static void setup_thread(LIBEVENT_THREAD *me) {
- me->base = event_init(); //为每个worker线程分配自己的event_base
- if (! me->base) {
- fprintf(stderr, "Can't allocate event base ");
- exit(1);
- }
- /* Listen for notifications from other threads */
- event_set(&me->notify_event, me->notify_receive_fd,
- EV_READ | EV_PERSIST, thread_libevent_process, me); //监听管道接收fd,这里即监听
- //来自主线程的消息,事件处理函数为thread_libevent_process
- event_base_set(me->base, &me->notify_event);
- if (event_add(&me->notify_event, 0) == -1) {
- fprintf(stderr, "Can't monitor libevent notify pipe ");
- exit(1);
- }
- me->new_conn_queue = malloc(sizeof(struct conn_queue)); //CQ_ITEM队列
- if (me->new_conn_queue == NULL) {
- perror("Failed to allocate memory for connection queue");
- }
- cq_init(me->new_conn_queue); //初始化CQ_ITEM对象队列
- if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
- perror("Failed to initialize mutex");
- }
- me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
- if (me->suffix_cache == NULL) {
- fprintf(stderr, "Failed to create suffix cache ");
- }
- }
- /*
- * 这里主要是让worker线程进入event_base_loop
- */
- static void *worker_libevent(void *arg) {
- LIBEVENT_THREAD *me = arg;
- /* Any per-thread setup can happen here; thread_init() will block until
- * all threads have finished initializing.
- */
- /* set an indexable thread-specific memory item for the lock type.
- * this could be unnecessary if we pass the conn *c struct through
- * all item_lock calls...
- */
- me->item_lock_type = ITEM_LOCK_GRANULAR;
- pthread_setspecific(item_lock_type_key, &me->item_lock_type);
- //每一个worker线程进入loop,全局init_count++操作,
- //见thread_init函数后面几行代码和wait_for_thread_registration函数,
- //主线程通过init_count来确认所有线程都启动完毕。
- register_thread_initialized();
- event_base_loop(me->base, 0);
- return NULL;
- }
- //主线程分发client fd给worker线程后,同时往管道写入buf,唤醒worker线程调用此函数
- static void thread_libevent_process(int fd, short which, void *arg) {
- LIBEVENT_THREAD *me = arg;
- CQ_ITEM *item;
- char buf[1];
- if (read(fd, buf, 1) != 1)
- if (settings.verbose > 0)
- fprintf(stderr, "Can't read from libevent pipe ");
- switch (buf[0]) {
- case 'c':
- item = cq_pop(me->new_conn_queue); //取出主线程丢过来的CQ_ITEM
- if (NULL != item) {
- /*
- worker线程创建 conn连接对象,注意由主线程丢过来的CQ_ITEM的init_state为conn_new_cmd (TCP情况下)
- */
- conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
- item->read_buffer_size, item->transport, me->base);
- if (c == NULL) {
- if (IS_UDP(item->transport)) {
- fprintf(stderr, "Can't listen for events on UDP socket ");
- exit(1);
- } else {
- if (settings.verbose > 0) {
- fprintf(stderr, "Can't listen for events on fd %d ",
- item->sfd);
- }
- close(item->sfd);
- }
- } else {
- c->thread = me; //设置监听连接的线程为当前worker线程
- }
- cqi_free(item);
- }
- break;
- /* we were told to flip the lock type and report in */
- case 'l':
- me->item_lock_type = ITEM_LOCK_GRANULAR;
- register_thread_initialized();
- break;
- case 'g':
- me->item_lock_type = ITEM_LOCK_GLOBAL;
- register_thread_initialized();
- break;
- }
- }
- void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
- int read_buffer_size, enum network_transport transport) {
- /**
- 这下面有一个CQ_ITEM结构体,可以这么理解,主线程accept连接后,把client fd
- 分发到worker线程的同时会顺带一些与此client连接相关的信息,例如dispatch_conn_new的形参上面列的,
- 而CQ_ITEM是包装了这些信息的一个对象。
- CQ_ITEM中的CQ是connection queue的缩写,但它与conn结构体是完全不一样的概念,CQ_ITEM仅仅是把client连接相关的信息
- 打包成一个对象而已。
- */
- CQ_ITEM *item = cqi_new();
- char buf[1];
- if (item == NULL) {
- close(sfd);
- /* given that malloc failed this may also fail, but let's try */
- fprintf(stderr, "Failed to allocate memory for connection object ");
- return ;
- }
- int tid = (last_thread + 1) % settings.num_threads;
- LIBEVENT_THREAD *thread = threads + tid; //通过简单的轮叫方式选择处理当前client fd的worker线程
- last_thread = tid;
- //初始化CQ_ITEM对象,即把信息包装
- item->sfd = sfd;
- item->init_state = init_state;
- item->event_flags = event_flags;
- item->read_buffer_size = read_buffer_size;
- item->transport = transport;
- cq_push(thread->new_conn_queue, item); //每个worker线程保存着所有被分发给自己的CQ_ITEM,即new_conn_queue
- MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
- /*
- 主线程向处理当前client fd的worker线程管道中简单写进一个'c'字符,
- 由于每个worker线程都监听了管道的receive_fd,于是相应的worker进程收到事件通知,
- 触发注册的handler,即thread_libevent_process
- */
- buf[0] = 'c';
- if (write(thread->notify_send_fd, buf, 1) != 1) {
- perror("Writing to thread notify pipe");
- }
- }
- int is_listen_thread() {
- return pthread_self() == dispatcher_thread.thread_id;
- }
- /********************************* ITEM ACCESS *******************************/
- /**
- 下面是一堆关于item操作的函数,具体逻辑代码都放在items::do_xxx相应的地方
- 就像本文件开头说的,这里主要是加了锁而已
- */
- /*
- * Allocates a new item.
- 分配item空间
- */
- item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
- item *it;
- /* do_item_alloc handles its own locks */
- /**
- 这里比较特殊,与其它item_xxx函数不一样,这里把锁放在do_item_alloc里面做了。
- 个人猜测是因为do_item_alloc这个逻辑实在有点复杂,甚至加解锁有可能在某个if条件下要发
- 生,加解锁和逻辑本身代码耦合,所以外部不好加锁。因此把锁交给do_item_alloc内部进行考虑。
- */
- it = do_item_alloc(key, nkey, flags, exptime, nbytes, 0);
- return it;
- }
- /*
- * Returns an item if it hasn't been marked as expired,
- * lazy-expiring as needed.
- 取得item,上面这里有句英文注释,说返回不超时的item,因为memcached并没有做实时或者定时把
- 超时item清掉的逻辑,而是用了延迟超时。就是当要用这个item的时候,再来针对这个item做超时处理
- */
- item *item_get(const char *key, const size_t nkey) {
- item *it;
- uint32_t hv;
- hv = hash(key, nkey);
- item_lock(hv);
- it = do_item_get(key, nkey, hv);
- item_unlock(hv);
- return it;
- }
- item *item_touch(const char *key, size_t nkey, uint32_t exptime) {
- item *it;
- uint32_t hv;
- hv = hash(key, nkey);
- item_lock(hv);
- it = do_item_touch(key, nkey, exptime, hv);
- item_unlock(hv);
- return it;
- }
- /*
- * Links an item into the LRU and hashtable.
- */
- int item_link(item *item) {
- int ret;
- uint32_t hv;
- hv = hash(ITEM_key(item), item->nkey);
- item_lock(hv);
- ret = do_item_link(item, hv);
- item_unlock(hv);
- return ret;
- }
- void item_remove(item *item) {
- uint32_t hv;
- hv = hash(ITEM_key(item), item->nkey);
- item_lock(hv);
- do_item_remove(item);
- item_unlock(hv);
- }
- int item_replace(item *old_it, item *new_it, const uint32_t hv) {
- return do_item_replace(old_it, new_it, hv);
- }
- /*
- * Unlinks an item from the LRU and hashtable.
- * 见items::item_unlink
- */
- void item_unlink(item *item) {
- uint32_t hv;
- hv = hash(ITEM_key(item), item->nkey);
- item_lock(hv);
- do_item_unlink(item, hv);
- item_unlock(hv);
- }
- /**
- 主要作用是重置在最近使用链表中的位置,更新最近使用时间,见items::do_item_update
- */
- void item_update(item *item) {
- uint32_t hv;
- hv = hash(ITEM_key(item), item->nkey);
- item_lock(hv);
- do_item_update(item);
- item_unlock(hv);
- }
- enum delta_result_type add_delta(conn *c, const char *key,
- const size_t nkey, int incr,
- const int64_t delta, char *buf,
- uint64_t *cas) {
- enum delta_result_type ret;
- uint32_t hv;
- hv = hash(key, nkey);
- item_lock(hv);
- ret = do_add_delta(c, key, nkey, incr, delta, buf, cas, hv);
- item_unlock(hv);
- return ret;
- }
- /*
- * Stores an item in the cache (high level, obeys set/add/replace semantics)
- * 保存item信息,主要是调用items::do_store_item,但由于是多线程,所以需求加锁
- * store_item是线程上的操作,所以写在thread模块,在此对外开放,而内部加锁。
- * 除了store_item函数,其它关于item的操作均如此。
- */
- enum store_item_type store_item(item *item, int comm, conn* c) {
- enum store_item_type ret;
- uint32_t hv;
- hv = hash(ITEM_key(item), item->nkey); //锁住item
- item_lock(hv);
- ret = do_store_item(item, comm, c, hv);
- item_unlock(hv);
- return ret;
- }
- void item_flush_expired() {
- mutex_lock(&cache_lock);
- do_item_flush_expired();
- mutex_unlock(&cache_lock);
- }
- char *item_cachedump(unsigned int slabs_clsid, unsigned int limit, unsigned int *bytes) {
- char *ret;
- mutex_lock(&cache_lock);
- ret = do_item_cachedump(slabs_clsid, limit, bytes);
- mutex_unlock(&cache_lock);
- return ret;
- }
- void item_stats(ADD_STAT add_stats, void *c) {
- mutex_lock(&cache_lock);
- do_item_stats(add_stats, c);
- mutex_unlock(&cache_lock);
- }
- void item_stats_totals(ADD_STAT add_stats, void *c) {
- mutex_lock(&cache_lock);
- do_item_stats_totals(add_stats, c);
- mutex_unlock(&cache_lock);
- }
- void item_stats_sizes(ADD_STAT add_stats, void *c) {
- mutex_lock(&cache_lock);
- do_item_stats_sizes(add_stats, c);
- mutex_unlock(&cache_lock);
- }
- /******************************* GLOBAL STATS ******************************/
- void STATS_LOCK() {
- pthread_mutex_lock(&stats_lock);
- }
- void STATS_UNLOCK() {
- pthread_mutex_unlock(&stats_lock);
- }
- void threadlocal_stats_reset(void) {
- int ii, sid;
- for (ii = 0; ii < settings.num_threads; ++ii) {
- pthread_mutex_lock(&threads[ii].stats.mutex);
- threads[ii].stats.get_cmds = 0;
- threads[ii].stats.get_misses = 0;
- threads[ii].stats.touch_cmds = 0;
- threads[ii].stats.touch_misses = 0;
- threads[ii].stats.delete_misses = 0;
- threads[ii].stats.incr_misses = 0;
- threads[ii].stats.decr_misses = 0;
- threads[ii].stats.cas_misses = 0;
- threads[ii].stats.bytes_read = 0;
- threads[ii].stats.bytes_written = 0;
- threads[ii].stats.flush_cmds = 0;
- threads[ii].stats.conn_yields = 0;
- threads[ii].stats.auth_cmds = 0;
- threads[ii].stats.auth_errors = 0;
- for(sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
- threads[ii].stats.slab_stats[sid].set_cmds = 0;
- threads[ii].stats.slab_stats[sid].get_hits = 0;
- threads[ii].stats.slab_stats[sid].touch_hits = 0;
- threads[ii].stats.slab_stats[sid].delete_hits = 0;
- threads[ii].stats.slab_stats[sid].incr_hits = 0;
- threads[ii].stats.slab_stats[sid].decr_hits = 0;
- threads[ii].stats.slab_stats[sid].cas_hits = 0;
- threads[ii].stats.slab_stats[sid].cas_badval = 0;
- }
- pthread_mutex_unlock(&threads[ii].stats.mutex);
- }
- }
- void threadlocal_stats_aggregate(struct thread_stats *stats) {
- int ii, sid;
- /* The struct has a mutex, but we can safely set the whole thing
- * to zero since it is unused when aggregating. */
- memset(stats, 0, sizeof(*stats));
- for (ii = 0; ii < settings.num_threads; ++ii) {
- pthread_mutex_lock(&threads[ii].stats.mutex);
- stats->get_cmds += threads[ii].stats.get_cmds;
- stats->get_misses += threads[ii].stats.get_misses;
- stats->touch_cmds += threads[ii].stats.touch_cmds;
- stats->touch_misses += threads[ii].stats.touch_misses;
- stats->delete_misses += threads[ii].stats.delete_misses;
- stats->decr_misses += threads[ii].stats.decr_misses;
- stats->incr_misses += threads[ii].stats.incr_misses;
- stats->cas_misses += threads[ii].stats.cas_misses;
- stats->bytes_read += threads[ii].stats.bytes_read;
- stats->bytes_written += threads[ii].stats.bytes_written;
- stats->flush_cmds += threads[ii].stats.flush_cmds;
- stats->conn_yields += threads[ii].stats.conn_yields;
- stats->auth_cmds += threads[ii].stats.auth_cmds;
- stats->auth_errors += threads[ii].stats.auth_errors;
- for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
- stats->slab_stats[sid].set_cmds +=
- threads[ii].stats.slab_stats[sid].set_cmds;
- stats->slab_stats[sid].get_hits +=
- threads[ii].stats.slab_stats[sid].get_hits;
- stats->slab_stats[sid].touch_hits +=
- threads[ii].stats.slab_stats[sid].touch_hits;
- stats->slab_stats[sid].delete_hits +=
- threads[ii].stats.slab_stats[sid].delete_hits;
- stats->slab_stats[sid].decr_hits +=
- threads[ii].stats.slab_stats[sid].decr_hits;
- stats->slab_stats[sid].incr_hits +=
- threads[ii].stats.slab_stats[sid].incr_hits;
- stats->slab_stats[sid].cas_hits +=
- threads[ii].stats.slab_stats[sid].cas_hits;
- stats->slab_stats[sid].cas_badval +=
- threads[ii].stats.slab_stats[sid].cas_badval;
- }
- pthread_mutex_unlock(&threads[ii].stats.mutex);
- }
- }
- void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out) {
- int sid;
- out->set_cmds = 0;
- out->get_hits = 0;
- out->touch_hits = 0;
- out->delete_hits = 0;
- out->incr_hits = 0;
- out->decr_hits = 0;
- out->cas_hits = 0;
- out->cas_badval = 0;
- for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
- out->set_cmds += stats->slab_stats[sid].set_cmds;
- out->get_hits += stats->slab_stats[sid].get_hits;
- out->touch_hits += stats->slab_stats[sid].touch_hits;
- out->delete_hits += stats->slab_stats[sid].delete_hits;
- out->decr_hits += stats->slab_stats[sid].decr_hits;
- out->incr_hits += stats->slab_stats[sid].incr_hits;
- out->cas_hits += stats->slab_stats[sid].cas_hits;
- out->cas_badval += stats->slab_stats[sid].cas_badval;
- }
- }
- //初始化主线程
- void thread_init(int nthreads, struct event_base *main_base) {
- int i;
- int power;
- pthread_mutex_init(&cache_lock, NULL);
- pthread_mutex_init(&stats_lock, NULL);
- pthread_mutex_init(&init_lock, NULL);
- pthread_cond_init(&init_cond, NULL);
- pthread_mutex_init(&cqi_freelist_lock, NULL);
- cqi_freelist = NULL;
- /* Want a wide lock table, but don't waste memory */
- /**
- 初始化item lock
- */
- //调配item锁的数量
- //之所以需要锁是因为线程之间的并发,所以item锁的数量当然是根据线程的个数进行调配了。
- if (nthreads < 3) {
- power = 10; //这个power是指数
- } else if (nthreads < 4) {
- power = 11;
- } else if (nthreads < 5) {
- power = 12;
- } else {
- /* 8192 buckets, and central locks don't scale much past 5 threads */
- power = 13;
- }
- item_lock_count = hashsize(power);
- item_lock_hashpower = power;
- item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
- if (! item_locks) {
- perror("Can't allocate item locks");
- exit(1);
- }
- for (i = 0; i < item_lock_count; i++) {
- pthread_mutex_init(&item_locks[i], NULL);
- }
- pthread_key_create(&item_lock_type_key, NULL);
- pthread_mutex_init(&item_global_lock, NULL);
- //_mark2_1
- threads = calloc(nthreads, sizeof(LIBEVENT_THREAD)); //创建worker线程对象
- if (! threads) {
- perror("Can't allocate thread descriptors");
- exit(1);
- }
- //_mark2_3
- dispatcher_thread.base = main_base; //设置主线程对象的event_base
- dispatcher_thread.thread_id = pthread_self(); //设置主线程对象pid
- //_mark2_5
- for (i = 0; i < nthreads; i++) { //为每个worker线程创建与主线程通信的管道
- int fds[2];
- if (pipe(fds)) {
- perror("Can't create notify pipe");
- exit(1);
- }
- threads[i].notify_receive_fd = fds[0]; //worker线程管道接收fd
- threads[i].notify_send_fd = fds[1]; //worker线程管道写入fd
- //_mark2_6
- setup_thread(&threads[i]); //装载 worker线程
- /* Reserve three fds for the libevent base, and two for the pipe */
- stats.reserved_fds += 5;
- }
- /* Create threads after we've done all the libevent setup. */
- for (i = 0; i < nthreads; i++) {
- //_mark2_7
- create_worker(worker_libevent, &threads[i]); //启动worker线程,见worker_libevent
- }
- /* Wait for all the threads to set themselves up before returning. */
- pthread_mutex_lock(&init_lock);
- wait_for_thread_registration(nthreads); //等待所有worker线程启动完毕
- pthread_mutex_unlock(&init_lock);
- }