引言:上篇文章说到了多进程并发式的服务端模型,如上一篇文章所述,进程的频繁创建会导致服务器不堪负载,那这一篇博客主要讲述的是线程模型和线程池的方式来提高服务端的负载能力。同时比较一下不同的模型的好处与坏处。
(如果不加以说明,我们都是考虑开发是基于GNU/Linux的)在Linux下创建一个线程的方式很简单,pthread_create() 函数来创建线程,其中的一个参数的回调函数,也就是线程本身的执行体函数。
void *thread_entry( void * args );
这里不过多的强调怎样利用线程等来创建执行体以及其他的系统调用怎样使用的。
那么,在服务端的线程使用方式一般为三种种:
(1)按需生成(来一个连接生成一个线程)
(2)线程池(预先生成很多线程)
(3)Leader follower(LF)
主要讲解第一种和第二种,第三种暂时手上没有实例代码,最近也没写、
第一种方式的范式大概是这样:
回调函数:
void *thread_entry( void *args ) { int fd = *(int *)args ; do_handler_fd( fd ); }
程序主体:
for(;;){ fd = accept(); pthread_create(...,thread_entry,&fd); }
这里所展示的只是一个最简单的方式,但是可以代表多线程的服务器端模型。
大体服务端分为主线程和工作线程,主线程负责accept()连接,而工作线程负责处理业务逻辑和流的读取等。这样,即使在工作线程阻塞的情况下,也只是阻塞在线程范围内,关于这部分内容,可以参考《C++网络编程》第一卷的第五章。在应用层和内核之间的线程比例为1:1的操作系统线程机制中,一个线程在内核中会有一个内核线程实例,那么就是说,如果这个线程阻塞,不会引起在同一个进程里面的线程也阻塞。现在大多是的操作系统采用的都是 1:1的模型,但是这个比传统的N:1模型更消耗资源。 N:1模型就是,在应用层级别的多个线程在操作系统中只有一个实例,可以看做一个组,一旦一个线程阻塞,这个工作组的其他线程都会阻塞。
故上述代码的 do_handler_fd( fd ) 里面的系统调用如果阻塞,不会引起整个进程阻塞,线程的阻塞只是在线程范围内。所以,主线程可以一直等待客户连接,而把工作处理过程放到线程中去。
这个是传统的线程方式,这种方式也会带来一些问题:
(1)工作开销过大,线程的频繁创建的销毁也是一个很消耗资源的过程,虽然较进程小很多。
(2)对于临界资源的访问需要控制加锁等操作,加大了程序设计的复杂性。
(3)一个线程的崩溃会导致整个进程的崩溃,比如调用了exit() 函数等,虽然阻塞操作只阻塞一个线程,但是其他一些系统调用的失败或崩溃将导致服务器整个down机。后果不堪设想。
但是在很多地方也提到了,多线程的方式适合IO密集型的程序,比如大文件传输等,这样可以在用户看来所有的操作都是并行的。
下面来说说线程池的方式,它改进了上述的问题的第一个,频繁的创建线程。
线程池的基本思想就是预先创建一部分线程,然后等到任务来的时候,通过条件变量或者其他的机制来唤醒一个工作线程。
下面详细的讲述一下前段时间写的一个简单的线程池方案。
线程池有一个任务队列,即由任务对象组成的一组队列。
我们为这个任务队列提供两个接口:
void mc_thread_pool_add_task(void *task , size_t tasksize )
解释一下这个接口的含义和参数, task 是一个指向任务实例的指针,tasksize 一般取 sizeof( instance_task ) 为的是在加入任务队列的时候队列的一些其他操作。为了简单化,这里没有提供任务优先级的考虑。
void *mc_thread_pool_get_task()
这个函数用来取得一个指向任务实例的指针,然后可以操作这个任务。
一般情况下,由主线程调用第一个函数,而工作线程调用第二个函数。
我们来看看线程池的结构:
typedef struct _thread_pool_t { pthread_mutex_t queue_lock ; pthread_cond_t task_cond ; list_t * tasks // treat it as queue thread_task_t type pthread_t * pthreads ; int isdestoried; int workersnum ; char ready ; thread_task_handler thread_pool_task_handler; }thread_pool_t; /* * this structure is a global control block of threads poll * as you can see , queue_lock and task_cond is define to protecte access of this whole poll * and task_cond is used to signal to threads that the task queue is ready * tasks is a queue of tasks , each task should posted to this queue and threads * in this pool can get it , we defined this task as void * to use wildly * isdestoried is a boolean flag as his/her name * workersnum is the total number of threads * ready is a flag also and used to judge if the tasks queue is ready * thread_pool_task_handler is a function point which points to the task handler you defined */
在线程池的结构中,我们定义了两个变量, queue_lock 和 task_cond
一个是锁,用来控制线程对于 task 任务队列的访问,另一个 task_cond 用来唤醒工作线程。
说说基本原理:工作线程默认情况下是阻塞在 pthread_cond_wait() 系统调用下的,如果有任务到来,我们可用使用 pthread_cond_singal() 来唤醒一个处于阻塞状态的线程,这样这个线程就可以执行 mc_thread_pool_get_task() 来取得一个任务,并调用相应的回调函数。
tasks就是上面所说的任务队列,pthreads是一个pthread_t 的数组,也就是用来标示线程id 的数组。每一次创建线程的时候都会返回线程id,所以我们需要记录。
ready 是一个flag , 标示是否任务队列可用。thread_task_handler 是一个函数指针,定义是这样的:
typedef void ( *thread_task_handler )( void * args ) ;
结构体里的 thread_pool_task_handler 就是在初始化的时候设置的线程的执行体。
下面看看初始化函数:
void mc_thread_pool_ini( mc_thread_pool_t * par_tp , int workersnum ,thread_task_handler par_handler ) { int err ; //par_tp = ( thread_pool_t *)malloc( sizeof(thread_pool_t) ); if( par_tp == NULL ) { fprintf( stderr , "thread_pool_t malloc\n"); return ; } par_tp->workersnum = workersnum ; pthread_mutex_init( &par_tp->queue_lock ,NULL ); pthread_cond_init(&par_tp->task_cond , NULL ); /* par_tp->queue_lock = PTHREAD_MUTEX_INITIALIZER ; par_tp->task_cond = PTHREAD_COND_INITIALIZER ; */ par_tp->tasks = mc_listcreate() ; if( par_tp->tasks == NULL ) { fprintf( stderr , "listcreate() error\n"); //free( par_tp ) ; return ; } par_tp->pthreads = ( pthread_t *)malloc( sizeof( pthread_t )*workersnum ); if( par_tp->pthreads == NULL ) { fprintf( stderr , "pthreads malloc\n"); //free( par_tp ); mc_freelist( par_tp->tasks ) ; return NULL ; } int i = 0 ; for( ; i < workersnum ; i++ ) { fprintf(stderr,"start to create threads\n"); err = pthread_create(&(par_tp->pthreads[i]),NULL,mc_thread_entry,NULL) ; if( err == -1 ) { fprintf( stderr , "pthread_create error\n"); //free( par_tp ); mc_freelist( par_tp->tasks ) ; free(par_tp->pthreads) ; } } par_tp->thread_pool_task_handler = par_handler ; par_tp->ready = 0 ; fprintf(stderr,"successed to create threads\n"); }
在初始化函数中,我们传递了一个函数执行体的入口点,也就是函数指针给线程池,当我们有任务的时候,一个线程被唤醒,执行相应的回调函数。
其他需要注意的地方是使用 for循环来创建很多的线程,并利用数组方式记录了线程的id 。
创建线程时候的回调函数并不是我们的参数传递的回调函数地址。因为在创建线程好线程的时候,我们需要一个阻塞操作,使得线程处于睡眠状态,不然函数执行完毕后线程就退出了。所以,创建线程时候的回调函数是这样的:
static void *mc_thread_entry( void *args ) { void * task ; for(;;) { pthread_mutex_lock( &mc_global_threads_pool.queue_lock ) ; fprintf(stderr, " locked to wait task\n"); while( mc_global_threads_pool.ready == 0 ) { pthread_cond_wait( &mc_global_threads_pool.task_cond , &mc_global_threads_pool.queue_lock ) ; } task = mc_thread_pool_get_task() ; fprintf(stderr, "get a task and ready to unlock \n"); pthread_mutex_unlock( &mc_global_threads_pool.queue_lock ) ; mc_global_threads_pool.thread_pool_task_handler( task ) ; } }
需要注意的一点是,我们要用两个变量来判断一个队列是否就绪,ready 和条件变量本身。
判断条件是 while() 而不是 if,这样可以使得线程在没有工作任务的时候,也就是工作队列为空的时候阻塞在 pthread_cond_wait 上,关于pthread_cond_wait 的工作机制可以参考IBM developerworks上的很多好文章。
pthread_cond_wait 在发现没有任务的时候,条件不成立的时候,是会有一个默认的操作的,就是释放锁,第二个参数的锁,使得其他线程可以得到condition 的竞争权利。所以我们在函数体内 pthread_cond_wait 的调用上下有一个加锁和释放锁的操作。
在函数内部有一个 mc_global_threads_pool.thread_pool_task_handler( task ) 这个操作就是线程内部得到了任务后调用回调函数过程。
将任务队列加入的函数实例如下:
void mc_thread_pool_add_task(void *task , size_t tasksize ) { pthread_mutex_lock( &mc_global_threads_pool.queue_lock ); fprintf( stderr ,"thread locked and append to list\n"); mc_list_append( mc_global_threads_pool.tasks , task , tasksize ) ; pthread_mutex_unlock( &mc_global_threads_pool.queue_lock ); fprintf( stderr ,"thread unlocked and successed append to list\n"); mc_global_threads_pool.ready = 1 ; if( mc_global_threads_pool.ready == 1 ) { fprintf( stderr ,"signal to threads\n"); pthread_cond_signal( &mc_global_threads_pool.task_cond ) ; } }
这里使用了 ready 来判断是有任务,如果有,使用 pthread_cond_signal 来唤醒一个等待的线程。
取得一个队列的任务方式很简单,直接返回队列的第一个任务:
void *mc_thread_pool_get_task() { void * ret_task ; ret_task = mc_getnode_del( mc_global_threads_pool.tasks , 0 ); if( ret_task == NULL ) { fprintf(stderr,"get node_del error\n"); } fprintf( stderr ," got a task\n"); mc_global_threads_pool.ready = 0 ; if( ret_task == NULL ) { fprintf(stderr, "getnode_del error\n"); return NULL ; } else return ret_task ; }
主体框架是这样的:
定义一个自己的task结构体比如:
typedef struct _thread_task_t { int task_num ; }mc_thread_task_t ;
定义自己的回调函数:
void my_thread_task_handler( void * task ) { fprintf(stderr,"task->tasknum %d\n",((mc_thread_task_t *)task)->task_num ); /* * if the task is a event we can like this demo: * (event_t *)task->handler( (event_t *)task ); * so in event_t structure there should be a callback called handler */ }
函数主体就是这样:
int main() { mc_thread_task_t ltask; ltask.task_num = 1 ; fprintf(stderr,"begin to ini pool\n"); mc_thread_pool_ini( &mc_global_threads_pool , 20 , my_thread_task_handler ); mc_thread_pool_add_task( <ask , sizeof(mc_thread_task_t) ); int i = 0 ; for(;i < 10000; i++) { ltask.task_num = i ; mc_thread_pool_add_task( <ask , sizeof(mc_thread_task_t) ); sleep(1); } return 0; }
线程池初始化的时候所传入的结构体就是自己定义的 task 的回调函数。
上述所说的是线程池一个方案。回到我们的服务端模型上来看。
我们的服务端的改写方式可以换成这样:
定义只的一个任务结构,比如说,我们定义为:
struct task { int fd ; } void *task_handler( void *task ) { int fd = *(int *)task ; do_handler_fd( fd ); }
好了,我们的服务器主体框架可以是这样:
mc_thread_pool_ini( &mc_global_threads_pool , N , task_handler ); // 第二个参数为线程池工作线程数 for(;;) { fd = accept(); struct task * newtask = ( struct task *)malloc( sizeof(struct task) ); newtask->fd = fd ; mc_thread_pool_add_task( &newtask,sizeof(struct task*) ); //将newtask 指针加入队列,而不是实例,可以减少队列的存储空间 }
总结:
线程池的方案能够减少线程创建时候带来的开销,但是对于临界资源的访问控制等变得更加的复杂,考虑的因素更多。这里没有完整的贴出线程池的代码。上述模型在平常使用的过程中适合并发连接数目不大的情况,IO密集型。对于CPU 密集型的服务端,线程池返回会加大资源消耗。下一篇文章我们来看看反应堆模型,异步事件驱动,非阻塞IO,并贴出一个简单的 epoll 的反应堆。