typedef struct task_node { void *arg; /* fun arg. */ void *(*fun) (void *); /* the real work of the task. */ pthread_t tid; /* which thread exec this task. */ int work_id; /* task id. */ int flag; /* 1: assigned, 0: unassigned. */ struct task_node *next; pthread_mutex_t mutex; /* when modify this ds and exec the work,lock the task ds. */ } TASK_NODE; /* *the ds of the task_queue */ typedef struct task_queue { pthread_mutex_t mutex; pthread_cond_t cond; /* when no task, the manager thread wait for ;when a new task come, signal. */ struct task_node *head; /* point to the task_link. */ int number; /* current number of task, include unassinged and assigned but no finished. */ } TASK_QUEUE_T; /* *the ds of every thread, make all thread in a double link queue. */ typedef struct pthread_node { pthread_t tid; /* the pid of this thread in kernel,the value is syscall return . */ int flag; /* 1:busy, 0:free. */ struct task_node *work; /* if exec a work, which work. */ struct pthread_node *next; struct pthread_node *prev; pthread_cond_t cond; /* when assigned a task, signal this child thread by manager. */ pthread_mutex_t mutex; } THREAD_NODE; /* *the ds of the thread queue */ typedef struct pthread_queue { int number; /* the number of thread in this queue. */ struct pthread_node *head; struct pthread_node *rear; pthread_cond_t cond; /* when no idle thread, the manager wait for ,or when a thread return with idle, signal. */ pthread_mutex_t mutex; } PTHREAD_QUEUE_T;
四大结构:
1、任务池struct,用于任务的管理,其中的head表示任务队列的第一个元素。内部的cond和mutex用于多线程操作此任务池
2、任务结点task_node,对应于相应的任务,这个任务结点中,包含任务对应的要执行的函数(此函数完成我们真正的socket数据交互等待,write或者read等),完成此任务对应的线程id(用于找到线程),任务分配与否的标志flag,以及操作此任务node的mutex和connd
3、线程池struct pthread_queue,用于管理线程,有线程的首指针和尾指针,线程的个数number,线程池在代码中包括空闲线程和工作线程两大类,分别用两个变量表示
4、线程struc pthread_node,对应线程。包括线程的id,工作状态,此线程要运行的任务task_node指针,用于找到任务,然后 struct pthread_node *next;struct pthread_node *prev形成一个线程结构的双向链表
主程序:
PTHREAD_QUEUE_T * pthread_queue_idle; /* the idle thread double link queue. */ PTHREAD_QUEUE_T *pthread_queue_busy; /* the work thread double link queue. */ TASK_QUEUE_T *task_queue_head; /* the task queuee single link list. */ int main (int argc, char *argv[]) { pthread_t thread_manager_tid, task_manager_tid, monitor_id; init_system (); pthread_create (&thread_manager_tid, NULL, thread_manager, NULL); /* create thread to manage the thread pool. */ pthread_create (&task_manager_tid, NULL, task_manager, NULL); /* create thread recive task from client. */ pthread_create (&monitor_id, NULL, monitor, NULL); /* create thread to monitor the system info. */ pthread_join (thread_manager_tid, NULL); pthread_join (task_manager_tid, NULL); pthread_join (monitor_id, NULL); sys_clean (); return 0; }
开了三个线程进行管理,
1、第一个线程是我们的线程分发和任务分发的thread_manager函数对应的线程。
pthread_create (&thread_manager_tid, NULL, thread_manager, NULL);
2、第二个线程是我们进行任务管理的线程,用于处理client新连接,对应产生的新任务,这个线程中,包含着socket创建,bind,listen,accept等过程,然后开始创建task_node结点,并初始化此节点,注册对应的任务工作函数fun,接下来的就是把此任务结点,加入到我们的任务池中,最后的步骤,就是通知我们运行thread_manger函数的线程,任务池有新的任务了,如果你堵塞在这个任务池的connd中,现在可以被唤醒了,接着我们的thread_manger函数对应的线程就可以继续工作了。
pthread_create (&task_manager_tid, NULL, task_manager, NULL);
我们的thread_manger和task_manager函数的实现:其实thread_manger函数改名为thread_task_dispath更加准确
void * thread_manager (void *ptr) { while (1) { THREAD_NODE * temp_thread = NULL; TASK_NODE * temp_task = NULL; /* *get a new task, and modify the task_queue. *if no task block om task_queue_head->cond. */ pthread_mutex_lock (&task_queue_head->mutex); if (task_queue_head->number == 0) pthread_cond_wait (&task_queue_head->cond, &task_queue_head->mutex); temp_task = task_queue_head->head; task_queue_head->head = task_queue_head->head->next; task_queue_head->number--; pthread_mutex_unlock (&task_queue_head->mutex); /* *get a new idle thread, and modify the idle_queue. *if no idle thread, block on pthread_queue_idle->cond. */ pthread_mutex_lock (&pthread_queue_idle->mutex); if (pthread_queue_idle->number == 0) pthread_cond_wait (&pthread_queue_idle->cond, &pthread_queue_idle->mutex); temp_thread = pthread_queue_idle->head; /*if this is the last idle thread ,modiry the head and rear pointor */ if (pthread_queue_idle->head == pthread_queue_idle->rear) { pthread_queue_idle->head = NULL; pthread_queue_idle->rear = NULL; } /*if idle thread number>2, get the first one,modify the head pointor */ else { pthread_queue_idle->head = pthread_queue_idle->head->next; pthread_queue_idle->head->prev = NULL; } pthread_queue_idle->number--; pthread_mutex_unlock (&pthread_queue_idle->mutex); /*modify the task attribute. */ pthread_mutex_lock (&temp_task->mutex); temp_task->tid = temp_thread->tid; temp_task->next = NULL; temp_task->flag = 1; pthread_mutex_unlock (&temp_task->mutex); /*modify the idle thread attribute. */ pthread_mutex_lock (&temp_thread->mutex); temp_thread->flag = 1; temp_thread->work = temp_task; temp_thread->next = NULL; temp_thread->prev = NULL; pthread_mutex_unlock (&temp_thread->mutex); /*add the thread assinged task to the busy queue. */ pthread_mutex_lock (&pthread_queue_busy->mutex); /*if this is the first one in busy queue */ if (pthread_queue_busy->head == NULL) { pthread_queue_busy->head = temp_thread; pthread_queue_busy->rear = temp_thread; temp_thread->prev = temp_thread->next = NULL; } else { /*insert in thre front of the queue */ pthread_queue_busy->head->prev = temp_thread; temp_thread->prev = NULL; temp_thread->next = pthread_queue_busy->head; pthread_queue_busy->head = temp_thread; pthread_queue_busy->number++; } pthread_mutex_unlock (&pthread_queue_busy->mutex); /*signal the child thread to exec the work */ pthread_cond_signal (&temp_thread->cond); } }
thread_manger主要过程:
首先从任务池中得到一个任务,如果没有空闲任务,就堵塞,直到我们有新的client连接,产生了一个新的任务,并加入到我们的任务池中,此过程由task_manager对应的线程完成,得到任务之后,我们再通过得到线程池中的空闲线程,如果没有空闲线程就堵塞,如果有空闲线程,首先从空闲线程池中获取首个空闲线程(由双向链表组成的空闲线程),然后修改此空闲线程池。完成之后,修改之前得到的task的属性,包括下面的步骤:
temp_task->tid = temp_thread->tid;
temp_task->next = NULL;
temp_task->flag = 1;
其中temp是我们得到的任务task_node,把得到的空闲线程的id赋值给我们的task中的tid字段,这样我们的task和我们的刚刚得到空闲线程,就通过tid联系在一起了,同时修改task中的其他字段,例如标志字段flag等等
修改了task属性之后,我们开始修改temp_thread结构(就是得到的空闲线程),包括以下操作:
temp_thread->flag = 1;
temp_thread->work = temp_task;
temp_thread->next = NULL;
temp_thread->prev = NULL;
同理,我们把thread对应的work字段,赋值我们刚才的task,这样在thread和task中,都建立了他们之间的联系。一个线程有自己的任务指针,一个任务同样有一个线程的tid。两者紧密联系起来。
最后我们修改我们的线程池thread_queue,这包括空闲线程池和工作线程池(因为我们声明了两个struct thread_queue的变量),第一个已经修改了空闲线程池,这一步我们修改工作线程池,把这个线程加入工作线程中
pthread_cond_signal (&temp_thread->cond); //结尾我们通知,堵塞在这里的工作线程work_thread,你们可以开始工作了,因为上面的步骤,就是为工作线程争取任务队列,和空闲线程(上面的空闲线程,其实是空闲线程对应的结构体,但是可以通过或许他们,来表示获取线程,真正的线程其实是运行child_work函数的线程。)
关键点:通过使用thread_node来代表线程,这个结构体虽然不是线程,但是可以通过它,我们进行线程的操作和模拟过程
工作线程 pthread_create (&temp[i].tid, NULL, child_work, (void *) &temp[i]); 这里才是创建工作线程的地方
关键点:我们创建的pthread_node结点是不被free的,就是一旦在初始化函数中,这些结构体被创建,然后这些结构就一直存在,我们后续的线程,会改变这些thread_node结构的内容。
void * child_work (void *ptr) { THREAD_NODE * self = (THREAD_NODE *) ptr; /*modify the tid attribute the first time exec */ pthread_mutex_lock (&self->mutex); self->tid = syscall (SYS_gettid); pthread_mutex_unlock (&self->mutex); while (1) { pthread_mutex_lock (&self->mutex); /*if no task exec,blocked */ if (NULL == self->work) { pthread_cond_wait (&self->cond, &self->mutex); } pthread_mutex_lock (&self->work->mutex); /*execute the real work. */ self->work->fun (self->work->arg); /*after finished the work */ self->work->fun = NULL; self->work->flag = 0; self->work->tid = 0; self->work->next = NULL; free (self->work->arg); pthread_mutex_unlock (&self->work->mutex); //unlock the task pthread_mutex_destroy (&self->work->mutex); /*free the task space */ free (self->work); /*make self thread no work */ self->work = NULL; self->flag = 0; pthread_mutex_lock (&task_queue_head->mutex); /* *get new task from the task_link if not NULL. *there no idle thread if there are task to do. *if on task ,make self idle and add to the idle queue. */ if (task_queue_head->head != NULL) { TASK_NODE * temp = task_queue_head->head; /*get the first task */ task_queue_head->head = task_queue_head->head->next; /*modify self thread attribute */ self->flag = 1; self->work = temp; temp->tid = self->tid; temp->next = NULL; temp->flag = 1; task_queue_head->number--; pthread_mutex_unlock (&task_queue_head->mutex); pthread_mutex_unlock (&self->mutex); continue; } else { /*no task need to exec, add self to idle queue and del from busy queue */ pthread_mutex_unlock (&task_queue_head->mutex); pthread_mutex_lock (&pthread_queue_busy->mutex); /*self is the last execte thread */ if (pthread_queue_busy->head == self && pthread_queue_busy->rear == self) { pthread_queue_busy->head = pthread_queue_busy->rear = NULL; self->next = self->prev = NULL; } /*the first one thread in busy queue */ else if (pthread_queue_busy->head == self && pthread_queue_busy->rear != self) { pthread_queue_busy->head = pthread_queue_busy->head->next; pthread_queue_busy->head->prev = NULL; self->next = self->prev = NULL; } /*the last one thread in busy queue */ else if (pthread_queue_busy->head != self && pthread_queue_busy->rear == self) { pthread_queue_busy->rear = pthread_queue_busy->rear->prev; pthread_queue_busy->rear->next = NULL; self->next = self->prev = NULL; } /*middle one */ else { self->next->prev = self->prev; self->prev->next = self->next; self->next = self->prev = NULL; } pthread_mutex_unlock (&pthread_queue_busy->mutex); /*add self to the idle queue */ pthread_mutex_lock (&pthread_queue_idle->mutex); /*now the idle queue is empty */ if (pthread_queue_idle->head == NULL || pthread_queue_idle->head == NULL) { pthread_queue_idle->head = pthread_queue_idle->rear = self; self->next = self->prev = NULL; } else { self->next = pthread_queue_idle->head; self->prev = NULL; self->next->prev = self; pthread_queue_idle->head = self; pthread_queue_idle->number++; } pthread_mutex_unlock (&pthread_queue_idle->mutex); pthread_mutex_unlock (&self->mutex); /*signal have idle thread */ pthread_cond_signal (&pthread_queue_idle->cond); } } }
这里,我们的工作线程,首先会查看,通过参数传递进来的,线程结构thread_node,是否未被分配任务,通过self-work==NULL.如果已经分配任务,就开始实行任务函数fun,执行完毕之后,在此函数中,修改任务的属性,然后free掉task_node结构体,同时也要进行thread_node属性的修改:
/*make self thread no work */
self->work = NULL;
self->flag = 0;
完成了一个任务之后,如果任务队列中还有任务的话,则直接运行此任务(说明,我们只需要通过thread_manager线程第一次分配任务和线程,一旦得到这个线程,并开始工作之后,我们就可以一直工作,直到工作线程中得不到任务,才等待线程池中的cond
如果工作线程完成一个任务之后,没有新的任务,则我们首先要把此线程加入空闲队列,删除工作队列,然后最后一步就是通知我们的线程管理线程,已经新的空闲线程了。
/*signal have idle thread */
pthread_cond_signal (&pthread_queue_idle->cond);
(这个线程池的实现,并没有动态的增加和删除线程,如果我们当前没有空闲线程,这个就会等待,知道有空闲线程被放进来)
初始化过程:
init_system ();
void create_pthread_pool (void);
void create_pthread_pool (void) { THREAD_NODE * temp = (THREAD_NODE *) malloc (sizeof (THREAD_NODE) * THREAD_DEF_NUM); if (temp == NULL) { printf (" malloc failure "); exit (EXIT_FAILURE); } /*init as a double link queue */ int i; for (i = 0; i < THREAD_DEF_NUM; i++) { temp[i].tid = i + 1; temp[i].work = NULL; temp[i].flag = 0; if (i == THREAD_DEF_NUM - 1) temp[i].next = NULL; if (i == 0) temp[i].prev = NULL; temp[i].prev = &temp[i - 1]; temp[i].next = &temp[i + 1]; pthread_cond_init (&temp[i].cond, NULL); pthread_mutex_init (&temp[i].mutex, NULL); /*create this thread */ pthread_create (&temp[i].tid, NULL, child_work, (void *) &temp[i]); } /*modify the idle thread queue attribute */ pthread_mutex_lock (&pthread_queue_idle->mutex); pthread_queue_idle->number = THREAD_DEF_NUM; pthread_queue_idle->head = &temp[0]; pthread_queue_idle->rear = &temp[THREAD_DEF_NUM - 1]; pthread_mutex_unlock (&pthread_queue_idle->mutex); } /* *init_system :init the system glob pointor. */ void init_system (void) { /*init the pthread_queue_idle */ pthread_queue_idle = (PTHREAD_QUEUE_T *) malloc (sizeof (PTHREAD_QUEUE_T)); pthread_queue_idle->number = 0; pthread_queue_idle->head = NULL; pthread_queue_idle->rear = NULL; pthread_mutex_init (&pthread_queue_idle->mutex, NULL); pthread_cond_init (&pthread_queue_idle->cond, NULL); /*init the pthread_queue_busy */ pthread_queue_busy = (PTHREAD_QUEUE_T *) malloc (sizeof (PTHREAD_QUEUE_T)); pthread_queue_busy->number = 0; pthread_queue_busy->head = NULL; pthread_queue_busy->rear = NULL; pthread_mutex_init (&pthread_queue_busy->mutex, NULL); pthread_cond_init (&pthread_queue_busy->cond, NULL); /*init the task_queue_head */ task_queue_head = (TASK_QUEUE_T *) malloc (sizeof (TASK_QUEUE_T)); task_queue_head->head = NULL; task_queue_head->number = 0; pthread_cond_init (&task_queue_head->cond, NULL); pthread_mutex_init (&task_queue_head->mutex, NULL); /*create thread poll */ create_pthread_pool (); }
重点是创建我们的工作线程work_thread,
for (i = 0; i < THREAD_DEF_NUM; i++) { temp[i].tid = i + 1; temp[i].work = NULL; temp[i].flag = 0; if (i == THREAD_DEF_NUM - 1) temp[i].next = NULL; if (i == 0) temp[i].prev = NULL; temp[i].prev = &temp[i - 1]; temp[i].next = &temp[i + 1]; pthread_cond_init (&temp[i].cond, NULL); pthread_mutex_init (&temp[i].mutex, NULL); /*create this thread */ pthread_create (&temp[i].tid, NULL, child_work, (void *) &temp[i]); }
这里我们创建了我们的工作线程,还有也创建了与工作线程对于的thread_node结构。这样我们就可以通过这个结构去代表我们的线程,这个结构从这里创建之后,之后只会被修改,但是不会free,而我们的task_node则在child_work()函数中被free掉。
结构框架:
运行三个管理管控线程,thread_manager,task_mangager,和monitor.
初始化若干个工作线程,同时初始化线程池结构和任务池结构task_quque,thread_queue