在前面工作线程初始化的分析中讲到Memcached采用典型的Master_Worker模式,也即半同步/半异步的高效网络并发模式。其中主线程(异步线程)负责接收客户端连接,然后分发给工作线程,具体由工作线程完成客户端的求情任务。
在memcached中,主线程负责监听所有socket上的事件,当socket上有可读事件发生,即新的客户连接求情到来,主线程就接受之得到新的连接socket,并将该连接socket信息放入一个任务对象(CQ_ITEM)结构体中,然后选择一个工作线程,将该CQ_ITEM放入该工作线程的任务队列(CQ)中,并通过管道通知工作线程:“我接到一个新的客户端请求,我已经把把放入你的任务队列中了,你赶紧去处理吧!”,后面就由工作线程去完成客户端请求的任务。 需要主要的是,在memcached中,主线程和每个工作线程都关联一个Libevent实例来负责与主线程的通信和处理客户端的任务事件。因此,实际上这里的工作线程也是异步的,每个工作线程处理多个客户端请求任务,这正是由Libevent实现的。
具体的半同步/半异步模式结构如下图所示:
I/O框架库Libevent基本结构如下图所示:
连接请求队列CQ结构:
//连接队列(循环单链表)
//队列的主要操作:初始化 push pop
typedef struct conn_queue CQ;
struct conn_queue {
CQ_ITEM *head;
CQ_ITEM *tail;
pthread_mutex_t lock;//每个队列维持一个线程锁,因此主线程在向该队列中push ITEM
//时都是要加锁的
};
CQ队列节点CQ_ITEM:
//连接对象CQ_ITEM
typedef struct conn_queue_item CQ_ITEM;
struct conn_queue_item {
int sfd;//连接socket
enum conn_states init_state;//所有可能状态组成的结构体
int event_flags;
int read_buffer_size;
enum network_transport transport;//网络传输方式
CQ_ITEM *next;//指向所在连接队列的下一ITEM对象
};
连接队列CQ的主要操作:
push 与pop,即主线程将先到来的连接对象ITEM push到CQ队列中; 工作线程从CQ队列中pop出一个ITEM进行处理。
//主线程将一个ITEM放入到连接连接队列的尾部
static void cq_push(CQ *cq, CQ_ITEM *item) {
item->next = NULL;
pthread_mutex_lock(&cq->lock);//需要加锁
if (NULL == cq->tail)
cq->head = item;
else
cq->tail->next = item;
cq->tail = item;
pthread_mutex_unlock(&cq->lock);
}
//工作线程读取CQ队列中头部的ITEM(同时从队列中删除该ITEM)
//注意:读写CQ队列都需要加锁,以防工作线程正在读取时,主线程往队列中加入新的ITEM
static CQ_ITEM *cq_pop(CQ *cq) {
CQ_ITEM *item;
pthread_mutex_lock(&cq->lock);//
item = cq->head;
if (NULL != item) {
cq->head = item->next;
if (NULL == cq->head)
cq->tail = NULL;
}
pthread_mutex_unlock(&cq->lock);
return item;
}
CQ_ITEM内存池:
memcached在申请一个CQ_ITEM结构体时,并不是直接使用malloc申请的。因为这样做可能会导致大量的内存碎片(作为长期运行的服务器进程memcached需要考虑这个问题)。为此,memcached也为CQ_ITEM使用类似内存池的技术:每次在主线程在请求一个空闲ITEM时,检测该空闲ITEM链表,如果为空,则请于预分配一块内存(默认情况一次分配64个ITEM),分割为多个ITEM,将第一个返回给调用者使用,剩余的连成空闲ITEM链表,以备主线程使用。在工作线程中释放一个ITEM时,只需要将ITEM放回该空闲链表即可。
空闲链表:static CQ_ITEM *cqi_freelist;
具体实现如下:
//这里采取的优化方法是,一次性分配64个CQ_ITEM大小的内存(即预分配).
//下次调用本函数的时候,直接从之前分配64个中要一个即可。
//由于是为了防止内存碎片,所以不是以链表的形式放置这64个CQ_ITEM。而是数组的形式。
//于是,cqi_free函数就有点特别了。它并不会真正释放.而是像内存池那样归还
static CQ_ITEM *cqi_new(void) {
//所有线程都会访问cqi_freelist的。所以需要加锁
CQ_ITEM *item = NULL;
pthread_mutex_lock(&cqi_freelist_lock);
if (cqi_freelist) { //如果空闲链表中还有ITEM,则之间返回一个ITEM给调用者
item = cqi_freelist;
cqi_freelist = item->next;
}
pthread_mutex_unlock(&cqi_freelist_lock);
if (NULL == item) {//没有多余的CQ_ITEM了
int i;
item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC);//该宏等于64
//item[0]直接返回为调用者,不用next指针连在一起。调用者负责将
//item[0].next赋值为NULL
for (i = 2; i < ITEMS_PER_ALLOC; i++)//将这块内存分成一个个的item并且用next指针像链表一样连起来
item[i - 1].next = &item[i];
pthread_mutex_lock(&cqi_freelist_lock);
//因为主线程负责申请CQ_ITEM,子线程负责释放CQ_ITEM。所以cqi_freelist此刻
//可能并不等于NULL。由于使用头插法,所以无论cqi_freeelist是否为NULL,都能
//把链表连起来的。
item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
cqi_freelist = &item[1];
pthread_mutex_unlock(&cqi_freelist_lock);
}
return item;
}
//并非释放,而是像内存池那样归还
static void cqi_free(CQ_ITEM *item) {
pthread_mutex_lock(&cqi_freelist_lock);
item->next = cqi_freelist;
cqi_freelist = item; //头插法归还
pthread_mutex_unlock(&cqi_freelist_lock);
}
工作线程:
全局变量:static LIBEVENT_THREAD *threads;
threads指向所有的工程线程组成的数组,主线程通过该threads指针即可遍历所有的工作线程,从该数组中选出一个工作线程,然后通过管道即可实现与工作线程的通信,同时也可以将CQ_ITEM放入对应的CQ队列中。
其中Memcached封装了工作线程结构体,包括了Libevent实例对象,以及通信管道描述符等。
//LIBEVENT_THREAD是Memcached内部对工作线程的一个封装
typedef struct {
pthread_t thread_id; //线程id /* unique ID of this thread */
struct event_base *base; //libevent的不是线程安全的,每个工作线程持有一个libevent实例,用于pipe管道通信和socket通信
struct event notify_event; //线程的通知事件 /* listen event for notify pipe */
int notify_receive_fd; //通知管道接收端(读) /* receiving end of notify pipe */
int notify_send_fd; //通知管道写端 /* sending end of notify pipe */
struct thread_stats stats; //该线程的状态 /* Stats generated by this thread */
struct conn_queue *new_conn_queue; //每个线程都有一个工作队列,主线程接受的连接,挂载到该消息队列中
cache_t *suffix_cache; //后缀cache
uint8_t item_lock_type; //线程操作hash表持有的锁类型,有局部锁和全局锁
} LIBEVENT_THREAD;
工作线程的创建包括由其初始化函数完成,包括:配置Libevent实例对象、创建CQ队列等过程,然后创建线程,启动事件循环。
下面给初始化函数:
//工作线程的初始化,创建线程,分别执行libevent事件循环:Memcached采用了典型的Master-Worker的线程模式,
//Master就是由main线程来充当,而Worker线程则是通过Pthread创建的。
void thread_init(int nthreads, struct event_base *main_base) {
int i;
int power;
pthread_mutex_init(&cache_lock, NULL);
pthread_mutex_init(&stats_lock, NULL);
pthread_mutex_init(&init_lock, NULL);
pthread_cond_init(&init_cond, NULL);
pthread_mutex_init(&cqi_freelist_lock, NULL);
cqi_freelist = NULL;
/* Want a wide lock table, but don't waste memory */
if (nthreads < 3) {
power = 10;
} else if (nthreads < 4) {
power = 11;
} else if (nthreads < 5) {
power = 12;
} else {
/* 8192 buckets, and central locks don't scale much past 5 threads */
power = 13;
}
item_lock_count = hashsize(power);
item_lock_hashpower = power;
item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
if (! item_locks) {
perror("Can't allocate item locks");
exit(1);
}
for (i = 0; i < item_lock_count; i++) {
pthread_mutex_init(&item_locks[i], NULL);
}
pthread_key_create(&item_lock_type_key, NULL);
pthread_mutex_init(&item_global_lock, NULL);
threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));//申请nthreads个工作线程空间,由全局变量threads维护
if (! threads) {
perror("Can't allocate thread descriptors");
exit(1);
}
dispatcher_thread.base = main_base;
dispatcher_thread.thread_id = pthread_self();
//工作线程的初始化,工作线程和主线程(main线程)是通过pipe管道进行通信的
for (i = 0; i < nthreads; i++) {
int fds[2];
if (pipe(fds)) {
perror("Can't create notify pipe");
exit(1);
}
threads[i].notify_receive_fd = fds[0];//读管道绑定到工作线程的接收消息的描述符
threads[i].notify_send_fd = fds[1];//写管道绑定到工作线程的发送消息的描述符
//为每个线程配置一个Libevent实例 和一个CQ队列
setup_thread(&threads[i]);
/* Reserve three fds for the libevent base, and two for the pipe */
stats.reserved_fds += 5;
}
/* Create threads after we've done all the libevent setup. */
for (i = 0; i < nthreads; i++) {
create_worker(worker_libevent, &threads[i]);//创建线程,执行libevnt事件循环
}
/* Wait for all the threads to set themselves up before returning. */
pthread_mutex_lock(&init_lock);
wait_for_thread_registration(nthreads);
pthread_mutex_unlock(&init_lock);
}
上面初始化函数中的:
create_worker函数中,调用工作线程函数worker_libevent启动Libevent事件循环:event_base_loop,处理主线程的管道通知和所分配的客户端请求任务等事件。
而setup_thread函数主要负责两种事件的注册:
1、将读管道事件注册到libevent上。
2、设置其中读管道事件回调函数为thread_libevent_process(),该函数负责对读取并解析管道的一个字节的通知信息。然后从连接队列CQ的头部取出该ITEM,为该连接socket在该线程的libeven上注册事件(调用函数conn_new设置),连接socket事件的回调的函数也event_handler(因为实际上主线程也是通过conn_new初始化监听socket 的libevent可读事件),而其中event_handler函数的核心依然是 memcached网络事件处理的最核心部分- drive_machine 。drive_machine 将留到后面分析。
并设定该线程为该连接socket的服务线程,然后将该ITEM放入到ITEM内存池空闲链表中。
其中管道事件回调thread_libevent_process函数,具体代码如下:
//回调函数:处理主线程发来的读管道事件,将收到的连接socket注册到libevent上
//fd:读管道描述符notify_receive_fd
static void thread_libevent_process(int fd, short which, void *arg) {
LIBEVENT_THREAD *me = arg;
CQ_ITEM *item;
char buf[1];
//首先将管道的1个字节通知信号读出
//(这是必须的,在水平触发模式下如果不处理该事件,则会被循环通知,直到事件被处理)
if (read(fd, buf, 1) != 1)
if (settings.verbose > 0)
fprintf(stderr, "Can't read from libevent pipe
");
//解析主线程发送来的通知类容
switch (buf[0]) {
case 'c'://从CQ队列取出头部ITEM进行处理
item = cq_pop(me->new_conn_queue);
if (NULL != item) {
//创建连接结构体,并向该线程libevent注册该连接socket的事件。
conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
item->read_buffer_size, item->transport, me->base);
if (c == NULL) {
if (IS_UDP(item->transport)) {
fprintf(stderr, "Can't listen for events on UDP socket
");
exit(1);
} else {
if (settings.verbose > 0) {
fprintf(stderr, "Can't listen for events on fd %d
",
item->sfd);
}
close(item->sfd);
}
} else {
c->thread = me;//指定该连接的服务线程为当前线程
}
cqi_free(item);//将该ITEM加入空闲ITEM链表(ITEM内存池),以循环利用
}
break;
/* we were told to flip the lock type and report in */
case 'l':
me->item_lock_type = ITEM_LOCK_GRANULAR;
register_thread_initialized();
break;
case 'g':
me->item_lock_type = ITEM_LOCK_GLOBAL;
register_thread_initialized();
break;
}
}
上面的setup_thread函数将管道事件与连接socket事件都注册到了linevent上,工作线程的基础设施已经完成。
因此线程函数worker_libevent则负责启动事件循环,如下:
//线程处理函数 :执行libevent事件循环
static void *worker_libevent(void *arg) {
LIBEVENT_THREAD *me = arg;
/* Any per-thread setup can happen here; thread_init() will block until
* all threads have finished initializing.
*/
/* set an indexable thread-specific memory item for the lock type.
* this could be unnecessary if we pass the conn *c struct through
* all item_lock calls...
*/
//默认的hash表的锁为局部锁
me->item_lock_type = ITEM_LOCK_GRANULAR;
pthread_setspecific(item_lock_type_key, &me->item_lock_type);//设定线程的属性
//用于控制工作线程初始化,通过条件变量来控制
register_thread_initialized();
//工作线程的libevent实例启动
event_base_loop(me->base, 0);
return NULL;
}
主线程:
上面是工作过线程的基本工作过程,而主线的的主要工作就是:
1、接受新的客户端连接请求
2、将连接socket分发给工作线程,并通知工作线程来处理之。
当然,主要函数也就是我们的main函数了。在main函数中,主线程创建了属于自己的Libevent实例,存放在全局变量main_base中。在main函数的最后,主线程调用event_base_loop进入事件循环中。中间的server_sockets函数是创建一个监听客户端的socket,并将创建一个event监听该socket的可读事件。
main函数具体分析如下(仅仅展示main函数的基本流程):
int main (int argc, char **argv) {
//检查libevent的版本是否足够新.1.3即可
if (!sanitycheck()) {
return EX_OSERR;
}
//对memcached的关键设置取默认值
settings_init();
...//解析memcached启动参数
//是否以守护进程方式运行 memcached
if (do_daemonize) {
if (sigignore(SIGHUP) == -1) {
perror("Failed to ignore SIGHUP");
}
if (daemonize(maxcore, settings.verbose) == -1) {
fprintf(stderr, "failed to daemon() in order to daemonize
");
exit(EXIT_FAILURE);
}
}
//main_base是一个struct event_base类型的全局变量
main_base = event_init();//为主线程创建一个event_base
<span style="white-space:pre"> </span>//如果以多线程模式运行 memcached,则启动工作者线程
<span style="white-space:pre"> </span>//配置,创建并启动多线程模式中的每个工作线程
thread_init(settings.num_threads, main_base);
conn_init();//先不管,后面会说到
//创建settings.num_threads个worker线程,并且为每个worker线程创建一个CQ队列
//并为这些worker申请各自的event_base,worker线程然后进入事件循环中
thread_init(settings.num_threads, main_base);
//设置一个定时event(也叫超时event),定时(频率为一秒)更新current_time变量
//这个超时event是add到全局变量main_base里面的,所以主线程负责更新current_time(这是一个很重要的全局变量)
clock_handler(0, 0, 0);
/* create the listening socket, bind it, and init */ //创建监听套接字,绑定到端口
if (settings.socketpath == NULL) {
FILE *portnumber_file = NULL;
//创建监听客户端的socket
if (settings.port && server_sockets(settings.port, tcp_transport,//tcp_transport是枚举类型
portnumber_file)) {
vperror("failed to listen on TCP port %d", settings.port);
exit(EX_OSERR);
}
...
}
if (event_base_loop(main_base, 0) != 0) {//主线程进入事件循环
retval = EXIT_FAILURE;
}
//完成善后工作
return retval;
}
这节主要是分析了memcached的网络并发处理模式,且分析了主线程和工作线程协调运行的具体实现。
其中工作线程的初始化函数中主要完成了:
1、基本设施的配置,如设置libevent实例等
2、在libevent上注册管道读通知事件。回调函数为thread_libevent_process,该函数负责根据管道通知,执行相关任务,比如下面的3也是在该函数中调用其他函数完成的:
3、在libevent上注册主线程分配的连接socket事件,回调函数通过conn_new函数设为event_handler(其中主要是调用了memcached网络事件处理的最核心部分- drive_machine ),实际上主线程也是通过conn_new初始化监听socket 的libevent可读事件
通过以上分析,对memcached的基本框架已经有了较为深入的理解。对于主线程,在这仅仅简单介绍了其基本执行流程,其具体设计细节将留到下节分析。