线性池的应用来源
为满足多客户端可同时登陆的要求,服务器端必须实现并发工作方式。当服务器主进程持续等待客户端连接时,每连接上一个客户端都需一个单独的进程或线程处理客户端的任务。但考虑到多进程对系统资源消耗大,单一线程存在重复创建、销毁等动作产生过多的调度开销,故采用线性池的方法。
线性池是一种多线程并发的处理形式,它就是由一堆已创建好的线程组成。有新任务 -> 取出空闲线程处理任务 -> 任务处理完成放入线程池等待。避免了处理短时间任务时大量的线程重复创建、销毁的代价,非常适用于连续产生大量并发任务的场合。
线行池实现原理过程:
1)初始设置任务队列(链表)作为缓冲机制,并初始化创建n个线程,加锁去任务队列取任务运行(多线程互斥)。
2)在处理任务过程中,当任务队列为空时,所有线程阻塞(阻塞IO)处于空闲(wait)状态;
3)当任务队列加入新的任务时,队列加锁,然后使用条件变量唤醒(work)处于阻塞中的某一线程来执行任务;
4)执行完后再次返回线程池中成为空闲(wait)状态,依序或等待执行下一个任务。
最后完成所有任务将线程池中的线程统一销毁。
-----------------------------------------------------------------
加入程序框架中:(运用了线程互斥、线程同步)
main主函数下:
//1.初始化线程池
pool_init(5);
// 等待连接
while(1)
{
new_fd = accept(sockfd,(struct sockaddr *)(&client_addr),&sin_size);
//2.执行process,将process任务交给线程池
pool_add_task(process,new_fd);
}
void pool_init (int max_thread_num)
{ //申请堆空间
pool->threadid = (pthread_t *) malloc (max_thread_num * sizeof (pthread_t));
for (i = 0; i < max_thread_num; i++) //3
{ //创建线程 保存线程id
pthread_create (&(pool->threadid[i]), NULL, thread_routine, NULL);
}
}
void * thread_routine (void *arg)
{
while (1)
{
pthread_mutex_lock (&(pool->queue_lock)); //锁互斥锁
while (pool->cur_task_size == 0 && !pool->shutdown)
{ //1.当前没有任务,且线程池处于非关闭
printf ("thread 0x%x is waiting
", pthread_self ());
//等待条件成熟函数->线程等待
pthread_cond_wait (&(pool->queue_ready), &(pool->queue_lock));
}
if (pool->shutdown) /*2.线程池要销毁了*/
{
/*遇到break,continue,return等跳转语句,千万不要忘记先解锁*/
pthread_mutex_unlock (&(pool->queue_lock)); //解互斥锁
printf ("thread 0x%x will exit
", pthread_self ());
pthread_exit (NULL); //线程退出函数
}
/*待处理任务减1,并取出链表中的头元素*/ 3.处理任务
pool->cur_task_size--; //等待任务-1
Cthread_task *task = pool->queue_head; //取第一个任务处理
pool->queue_head = task->next; //链表头指向下一个任务
pthread_mutex_unlock (&(pool->queue_lock)); //解互斥锁
/*调用回调函数,执行任务*/
(*(task->process)) (task->arg);
free (task);
task = NULL;
}
pthread_exit (NULL);
}
int pool_add_task (void *(*process) (int arg), int arg)
{
/*构造一个新任务 初始化*/
Cthread_task *task = (Cthread_task *) malloc (sizeof (Cthread_task));
task->process = process;
task->arg = arg;
task->next = NULL;
pthread_mutex_lock (&(pool->queue_lock));
~~~~~~
pthread_mutex_unlock (&(pool->queue_lock));
pthread_cond_signal (&(pool->queue_ready)); //条件成熟函数->唤醒线程
return 0;
}
void * process(int arg) //读取操作符,读取对应的命令响应
{
if(cmd == 'Q')
{
/*关闭SSL*/
SSL_shutdown(ssl);
SSL_free(ssl);
close(tmp_fd);
break;
}
else
handle(cmd,ssl);
}