zoukankan      html  css  js  c++  java
  • 简单Linux C线程池2

      线程池的原理及意义,请移步这篇博文:http://www.cnblogs.com/venow/archive/2012/11/22/2779667.html

      下面,介绍的这个线程池与上面提到的那个线程池有一部分相似的地方。

      主要区别为:

        1、线程池中的每个线程都有自己的互斥量和条件变量,而不是线程池共享一个。

        2、线程池中的线程在程序结束时,等待线程池中线程停止的机制不同。

      该程序主要由两个文件构成,分别为ThreadPool.h和ThreadPool.cpp文件。

      ThreadPool.h文件:

    #define MAXT_IN_POOL 200
    #define BUSY_THRESHOlD 0.5
    #define MANAGE_INTREVAL 2
    
    class ThreadPool;
    
    typedef void (*dispatch_fn)(void*);    
    
    //线程函数参数
    typedef struct tagThread           
    {
        pthread_t thread_id;           //线程ID
        pthread_mutex_t thread_mutex;  //信号量
        pthread_cond_t thread_cond;    //条件变量
        dispatch_fn do_job;            //调用的函数,任务
        void* args;                    //函数参数
        ThreadPool *parent;            //线程池指针
    }_thread;
    
    //线程池
    class ThreadPool
    {    
    public:
        //================================================================================================
        //函数名:                  ThreadPool
        //函数描述:                构造函数
        //输入:                    [in] max_threads_in_pool 线程池最大线程数
        //输入:                    [in] min_threads_in_pool 线程池最小问题数
        //输出:                    无
        //返回:                    无
        //================================================================================================
        ThreadPool(unsigned int max_threads_in_pool, unsigned int min_threads_in_pool = 2);
        ~ThreadPool();
        
        //================================================================================================
        //函数名:                  dispatch_threadpool
        //函数描述:                将任务加入线程池,由线程池进行分发
        //输入:                    [in] dispatch_me 调用的函数地址
        //输入:                    [in] dispatch_me 函数参数
        //输出:                    无
        //返回:                    无
        //================================================================================================
        void dispatch_threadpool(dispatch_fn dispatch_me, void* dispatch_me);
    private:
        pthread_mutex_t tp_mutex;  //信号量
        pthread_cond_t tp_idle;    //线程池中线程有空闲线程的条件变量
        pthread_cond_t tp_full;    //线程池中线程为满的条件变量 
        pthread_cond_t tp_empty;   //线程池中线程为空的条件变量
        int tp_min;                //线程池的最小线程数
        int tp_max;                //线程池的最大线程数
        int tp_avail;              //线程池中空闲的线程数
        int tp_total;              //线程池中已创建的线程数
        _thread** tp_list;         //指向线程池中所有空闲线程的参数的指针
        bool tp_stop;              //线程池是否已停止
        
        //================================================================================================
        //函数名:                  add_avail
        //函数描述:                加入空闲线程
        //输入:                    [in] avail 线程的参数
        //输出:                    无
        //返回:                    成功:true,失败:false
        //================================================================================================
        bool add_avail(_thread* avail);
        
        //================================================================================================
        //函数名:                  work_thread
        //函数描述:                线程函数
        //输入:                    [in] args 参数
        //输出:                    无
        //返回:                    无
        //================================================================================================
        static void* work_thread(void* args);
        
        //================================================================================================
        //函数名:                  add_thread
        //函数描述:                添加一个线程
        //输入:                    [in] dispatch_me 函数指针
        //输入:                    [in] args        函数参数
        //输出:                    无
        //返回:                    无
        //================================================================================================
        bool add_thread(dispatch_fn dispatch_me, void* args);
        
        //================================================================================================
        //函数名:                  syn_all
        //函数描述:                等待线程池中所有线程空闲
        //输入:                    无
        //输出:                    无
        //返回:                    无
        //================================================================================================
        void syn_all();    
    };

      ThreadPool.cpp文件:

    ThreadPool::ThreadPool(unsigned int max_threads_in_pool, unsigned int min_threads_in_pool)
    {
        pthread_t manage_id;
    
        if (min_threads_in_pool <= 0 || max_threads_in_pool < 0 || min_threads_in_pool > max_threads_in_pool || max_threads_in_pool > MAXT_IN_POOL)
        {
            return ;
        }
    
        tp_avail = 0;      //初始化线程池
        tp_total = 0;
        tp_min = min_threads_in_pool;
        tp_max = max_threads_in_pool;
        tp_stop = false;
        tp_list = (_thread * *)    malloc(sizeof(void *) * max_threads_in_pool);    
        if (NULL == tp_list)
        {
            return;
        }
        memset(tp_list, 0, sizeof(void *) * max_threads_in_pool);
        
        pthread_mutex_init(&tp_mutex, NULL);
        pthread_cond_init(&tp_idle, NULL);
        pthread_cond_init(&tp_full, NULL);
        pthread_cond_init(&tp_empty, NULL);
    }
    
    bool ThreadPool::add_avail(_thread* avail)
    {
        bool ret = false;
    
        pthread_mutex_lock(&tp_mutex);
        if (tp_avail < tp_max)
        {
            tp_list[tp_avail] = avail;
            tp_avail++;        
            pthread_cond_signal(&tp_idle);  //线程池中有线程为空闲
            if (tp_avail >= tp_total)
            {
                pthread_cond_signal(&tp_full); //线程池中所有线程都为为空闲
            }
            ret = true;
        }
        pthread_mutex_unlock(&tp_mutex);
        
        return ret;
    }
    
    void* ThreadPool::work_thread(void* args)
    {
        _thread* thread = (_thread*) args;
        ThreadPool *pool = thread->parent;
        while (pool->tp_stop == false) 
        {
            thread->do_job(thread->args);
            pthread_mutex_lock(&thread->thread_mutex); //执行完任务之后,添加到空闲线程队列中
            if (pool->add_avail(thread))
            {    
                pthread_cond_wait(&thread->thread_cond, &thread->thread_mutex);
                pthread_mutex_unlock(&thread->thread_mutex);
            }
            else
            {
                pthread_mutex_unlock(&thread->thread_mutex);
                pthread_mutex_destroy(&thread->thread_mutex);
                pthread_cond_destroy(&thread->thread_cond);
                free(thread);
                break;
            }
        }
    
        pthread_mutex_lock(&pool->tp_mutex);
        pool->tp_total--;
        if (pool->tp_total <= 0)
        {
            pthread_cond_signal(&pool->tp_empty);
        }
        pthread_mutex_unlock(&pool->tp_mutex);
    
        return NULL;
    }
    
    bool ThreadPool::add_thread(dispatch_fn dispatch_me, void* args)  //添加一个线程
    {
        _thread* thread = NULL;
        pthread_attr_t attr;
        
        thread = (_thread *) malloc(sizeof(_thread));
        if (NULL == thread)
        {
            return false;
        }
    
        pthread_mutex_init(&thread->thread_mutex, NULL);
        pthread_cond_init(&thread->thread_cond, NULL);
        thread->do_job = dispatch_me;
        thread->args = args;
        thread->parent = this;
        pthread_attr_init(&attr);
        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
    
        if (pthread_create(&thread->thread_id, &attr, work_thread, (void *) thread) != 0)
        {
            pthread_mutex_destroy(&thread->thread_mutex);
            pthread_cond_destroy(&thread->thread_cond);
            pthread_attr_destroy(&attr);
            free(thread);
            return false;
        }                
        tp_total++;
    
        return true;
    }
    
    void ThreadPool::dispatch_threadpool(dispatch_fn dispatch_me, void* args)
    {
        _thread* thread = NULL;
    
        pthread_mutex_lock(&tp_mutex);
        
        if (tp_avail <= 0 && tp_total >= tp_max) //无可用线程,而且线程数已达最大值,等待空闲线程
        {
            pthread_cond_wait(&tp_idle, &tp_mutex);
        }
    
        if (tp_avail <= 0)  //无可用线程,而且线程数未达最大值,添加线程
        {
            if (!add_thread(dispatch_me, args))
            {
                return;
            }
        }
        else   //有可用线程
        {
            tp_avail--;
            thread = tp_list[tp_avail];
            tp_list[tp_avail] = NULL;        
            thread->do_job = dispatch_me;
            thread->args = args;
    
            pthread_mutex_lock(&thread->thread_mutex);
            pthread_cond_signal(&thread->thread_cond);
            pthread_mutex_unlock(&thread->thread_mutex);
        }
    
        pthread_mutex_unlock(&tp_mutex);
    }
    
    
    void ThreadPool::syn_all()
    {    
        if (tp_avail < tp_total)   //等待线程池中所有线程都为空闲状态
        {
            pthread_cond_wait(&tp_full, &tp_mutex);
        }    
        
        tp_stop = true;    
        int i = 0;    
        for (i = 0; i < tp_avail; i++)  //唤醒线程池中所有线程
        {
            _thread *thread = tp_list[i];        
            pthread_mutex_lock(&thread->thread_mutex);
            pthread_cond_signal(&thread->thread_cond);
            pthread_mutex_unlock(&thread->thread_mutex);
        }    
        if (tp_total > 0)
        {
            pthread_cond_wait(&tp_empty, &tp_mutex);  //等待线程池中所有线程都结束
        }
    }
    
    ThreadPool::~ThreadPool()
    {
        sleep(MANAGE_INTREVAL);
        pthread_mutex_lock(&tp_mutex);
        syn_all();                        //等待线程池为空
        int i = 0;
        for (i = 0; i < tp_total; i++)  //资源释放
        {
            free(tp_list[i]);
            tp_list[i] = NULL;
        }
        pthread_mutex_unlock(&tp_mutex);
        pthread_mutex_destroy(&tp_mutex);
        pthread_cond_destroy(&tp_idle);
        pthread_cond_destroy(&tp_full);
        pthread_cond_destroy(&tp_empty);
        free(tp_list);
    }
  • 相关阅读:
    采购到入库所经历的表
    PO 收料SQL
    关于PO 和PR 的联系问题
    在Oracle Form中,如何实现自动编号(行号)的功能
    订单暂挂问题sql解决:
    类和结构的区别?
    DataTable.Select 方法 (String, String, DataViewRowState)
    Ref与Out的区别
    C# 反射
    委托
  • 原文地址:https://www.cnblogs.com/venow/p/2809846.html
Copyright © 2011-2022 走看看