// condition.h
#ifndef _CONDITION_H_
#define _CONDITION_H_
#include<pthread.h>
typedef struct condition{
pthread_mutex_t pmutex;
pthread_cond_t pcond;
}condition_t;
int condition_init(condition_t * cond); //初始化
int condition_lock(condition_t * cond); //加锁
int condition_unlock(condition_t * cond); //解锁
int condition_wait(condition_t* cond); //等待事件的调用
int condition_timedwait(condition_t * cond , const struct timespec * abstime); // 超时处理
int condition_signal(condition_t * cond); //向等待线程发起通知
int condition_broadcast(condition_t * cond); //向等待线程发起广播
int condition_destroy(condition_t *cond); //销毁条件变量
#endif
// condition.cpp
#include <iostream>
#include "condition.h"
int condition_init(condition_t * cond){
int status;
if((itatus = pthread_mutex_init(&cond->pmutex, NULL)))
return status;
if((status = pthread_cond_init(&cond->pcond ,NULL)))
return status;
return 0;
}
int condition_lock(condition_t * cond){
return pthread_mutex_lock(&cond->pmutex);
}
int condition_nulock(condition_t * cond){
return pthread_mutex_nulock(&cond->pmutex);
}
int condition_wait(condition_t * cond){
return pthread_cond_wait(&cond->pcond , &cond->pmutex);
}
int condition_timedwait(condition_t * cond){
return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime);
}
int condition_signal(condition_t * cond){
return pthread_cond_signal(&cond->pcond);
}
int condition_broadcast(condition_t *cond){
return pthread_cond_broadcast(*cond->pcond);
}
int condition_destroy(condition_t * cond){
int status;
if((status = pthread_mutex_destroy(&cond->pmutex)))
return status;
if((status = pthread_mutex_destroy(&cond->pcond)))
return status;
return 0;
}
//pthreadpool.h
#ifndef _PTHREADPOOL_H_
#define _PTHREADPOOL_H_
#include "condition.h"
//任务结构体,将任务放入队列由线程池中的线程来执行
typedef struct task {
void *(*run)(void* arg); // 任务回调函数
void *arg; // 回调函数参数
struct task *next ;
}task_t;
//线程池结构体
typedef struct threadpool{
condition_t ready ; //任务准备或者线程池销毁通知
task_t *first ;
task_t *last;
int counter ; //当前线程数
int idle ; // 等待任务的线程数
int max_threads; // 最大线程数
int quit ; // 置1 销毁线程池
}threadpool_t;
//初始化线程池
void threadpool_init(threadpool_t * pool , int threads);
//往线程池中添加任务
void threadpool_add_task(threadpool_t* pool , void* (*run)(void *arg), void * arg);
//销毁线程池
void threadpool_destroy(threadpool_t * pool );
#endif
//pthreadpool.cpp
#include <iostream>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <pthread.h>
#include "threadpool.h"
using namespace std;
void* thread_routine(void* arg){
struct timespec abstime;
int timeout;
cout << "线程ID"<<(int)pthread_self()<<"启动"<< endl;
threadpool_t *pool = (threadpool_t *)arg;
while(1){
timeout = 0;
condition_lock(&pool->ready);
pool ->idle++;
//等待队列有任务到来 或者 销毁消息
while((pool->first == NULL)&& (!(pool->quit))){
cout << "线程ID"<<(int)pthread_self()<<"等待中"<< endl;
//condition_wait(&pool->ready);
clock_gettime(CLOCK_REALTIME , &abstime);
abstime.tv_sec += 2;
int status = condition_timedwait(&pool->ready , &abstime);
if(status == ETIMEDOUT){
cout << "线程ID"<<(int)pthread_self()<<"等待超时"<< endl;
timeout = 1;
break ;
}
}
//等待到条件 ,处于工作状态
pool->idle--;
if(pool->first != NULL ){
task_t *t = pool->first;
pool->first = t->next;
//执行任务需要一定时间 ,所以需要先解锁
condition_nulock(&pool->ready);
t->run(t->arg); //线程函数消费开始执行
delete t; //删除任务结构体
condition_lock(&pool->ready);
}
//等待到销毁
if((pool->quit) && (pool->first == NULL)){
pool->counter--;
if(pool->counter == 0){
condition_signal(&pool->ready);
}
condition_nulock(&pool->ready);
break ;
}
if(timeout && (pool->first == NULL)){
pool->counter--;
condition_nulock(&pool->ready);
break ;
}
condition_unlock(&pool->ready);
}
cout << "线程ID"<<(int)pthread_self()<< "销毁"<< endl;
return (void*) 0;
}
// 线程池的初始化
void pthreadpool_init(threadpool_t * pool , int threads){
condition_init(& pool ->ready);
pool->first = NULL;
pool->last = NULL ;
pool->counter = 0;
pool->idle = 0;
pool->max_threads = threads;
pool->quit = 0;
}
//添加任务
void pthreadpool_add_task(threadpool_t * pool , void*(*run)(void* arg), void* arg){
//void *(*run)(void* arg); // 任务回调函数
//void *arg; // 回调函数参数
//struct task *next ;
task_t *newtask = new struct task ;
newtask-> run = run ;
newtask-> arg = arg;
newtask-> next= NULL ;
if(pool->first == NULL ){
pool->first = newtask;
pool->last = newtask;
}
else{
pool->last-next = newtask;
pool->last = newtask->next; //添加后尾指针位置发生改变 后移一位
}
//如果有等待线程就唤醒 ,执行其中一
if(pool->idle >0 ){
condition_signal(&pool->ready);
}else if(pool->counter < pool->max_threads){
//没有线程等待 ,并且当前线程数不操过最大线程 ,创建新线程
pthread_t tid;
pthread_create(&tid, NULL ,thread_routine , pool);
pool->counter++;
}
}
//销毁线程池
void threadpool_destroy(threadpool_t * pool){
if(pool->quit)
return ;
condition_lock(&pool->ready);
pool->quit = 1;
if(pool->counter > 0){
if(pool->idle >0){
condition_broadcast(&pool->ready); //向等待线程发起广播
while(pool->counter >0){
condition_wait(&pool->ready);
}
}
}
condition_unlock(&pool->ready);
condition_destroy(&pool->ready); //销毁条件变量
}
//main.cpp
#include <iostream>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <pthread.h>
#include "threadpool.h"
using namespace std;
void * mytask(void *arg){
cout << "线程ID="<<(int)pthread_self()<<"正在执行"<< *(int*)arg<<endl;
delete (int*)arg;
return (void* ) 0;
}
int main(void){
threadpool_t pool;
threadpool_init(&pool , 3);
size_t i;
for(i=0 ; i<10 ; i++){
int *arg = new int ;
*arg = i;
threadpool_add_task( &pool , mytask , arg ); //添加任务
}
sleep(6);
threadpool_destroy(&pool); //销毁线程池
return 0;
}