zoukankan      html  css  js  c++  java
  • Linux pthread 线程池实现

      基于pthread封装了一个简易的ThreadPool,具有以下特性:

      1.具有优先级的任务队列

      2.线程池大小可以二次调整,增加线程或者删除空闲线程

      3.任务两种重写方式,重写run或者使用函数回调

    首先是任务类的声明

    class Task{
    public:
        string taskName;
    
    public:
        Task(){}
        Task(string _taskName):taskName(_taskName){
            priority=1;
        }
        void setPriority(int pri){
            priority=pri;
        }
        int getPriority(){
            return priority;
        }
    
        virtual ~Task(){}
    
        virtual void run()=0;
    public:
        int priority;
    };
    struct TaskCom{
        bool operator()(const Task* t1,const Task* t2){
            return t1->priority<t2->priority;
        }
    };
    class CbTask:public Task{//回调版本的task
    public:
        CbTask(string name,int pri):Task(name){
            setPriority(pri);
        }
        void setCallBack(void *(*_process) (void *_arg), void *_arg){
            process=_process;
            arg=_arg;
        }
        void run(){
             (*process) (arg);
        }
    private:
        void*(*process)(void *arg);
        void *arg;
    };
    class MyTask1:public Task{
    public:
        MyTask1(string name,int pri):Task(name){
            setPriority(pri);
        }
        void run(){
            printf("hello,this is MyTask1
    ");
            sleep(2);
        }
    };
    MyTask1 *task1=new MyTask1("mytask1",4);
    
    void *printStr(void * str){
        printf("%s
    ",str);
    }
    CbTask *task6=new CbTask("mytask6",1);
        char *str="你好";
        task6->setCallBack(printStr,static_cast<void*>(str));
            

    线程池声明

    class ThreadPool{
    private:
        static priority_queue<Task*,vector<Task*>,TaskCom > taskList;//带优先级
        static map<pthread_t,int> threads;
        bool shutdown;
        int maxThreadNum;
        int ThreadNum;
    
        static pthread_mutex_t mutex;
        static pthread_mutex_t map_mutex;
        static pthread_cond_t cond;
    
    protected:
        static void *threadRoutine(void *arg);
        static void setThreadStat(pthread_t tid,int stat);
    public:
        void poolInit();
        void poolDestroy();
        void addThread();
        void delThread(int n);
        void addTask(Task *task);
        int getTaskListSize();
        int getPoolSize();
    
        ThreadPool(int _threadNum);
        ~ThreadPool();
    enum ThreadStat{
        THREAD_RUN=1,
        THREAD_WAIT,
        THREAD_SHUT
    };

    线程池实现

    #include "ThreadPool.h"
    
    
    priority_queue<Task*,vector<Task*>,TaskCom> ThreadPool::taskList;
    map<pthread_t,int> ThreadPool::threads;
    
    pthread_mutex_t ThreadPool::mutex=PTHREAD_MUTEX_INITIALIZER;
    pthread_mutex_t ThreadPool::map_mutex=PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t ThreadPool::cond=PTHREAD_COND_INITIALIZER;
    
    ThreadPool::ThreadPool(int _threadNum):maxThreadNum(_threadNum){
        poolInit();
    }
    ThreadPool::~ThreadPool(){
        poolDestroy();
    }
    void *ThreadPool::threadRoutine(void *arg){
        pthread_t tid=pthread_self();
        pthread_mutex_lock(&map_mutex);
        threads.insert(make_pair(tid,THREAD_WAIT));
        int &threadStat=threads[tid];
        pthread_mutex_unlock(&map_mutex);
        while(1){
            pthread_mutex_lock(&mutex);
            while(taskList.size()==0&&threadStat==THREAD_WAIT){
                pthread_cond_wait(&cond,&mutex);
            }
            if(threadStat==THREAD_SHUT){
                pthread_mutex_unlock(&mutex);
                printf("thread %lu will exit
    ",tid);
                pthread_exit(NULL);
            }
    //        printf("task num=%d
    ",taskList.size());
            Task *task=taskList.top();
            taskList.pop();
    //        printf("task num=%d
    ",taskList.size());
            setThreadStat(tid,THREAD_RUN);
            printf("thread %lu is running with task--> %s*** %d
    ",tid,task->taskName.c_str(),task->getPriority());
            pthread_mutex_unlock(&mutex);
    
            task->run();
            setThreadStat(tid,THREAD_WAIT);
    
            printf("thread %lu has done with task--> %s
    ",tid,task->taskName.c_str());
    
        }
        return NULL;
    }
    void ThreadPool::setThreadStat(pthread_t tid,int stat){
        threads[tid]=stat;
    }
    void ThreadPool::addThread(){
        pthread_t tid;
        pthread_create(&tid,NULL,threadRoutine,NULL);
        ThreadNum++;
    }
    
    void ThreadPool::delThread(int n){
        int num=0;
        int size=getPoolSize();
        if(n>size){
            printf("pool size is less than you input
    ");
            return;
        }
        while(num<n){
            for(map<pthread_t,int>::iterator ite=threads.begin();ite!=threads.end();){
                if(ite->second==THREAD_WAIT){
                    setThreadStat(ite->first,THREAD_SHUT);
    //                printf("**thread %lu 
    ",ite->first);
                    pthread_cond_broadcast(&cond);
                    pthread_join(ite->first,NULL);
                    map<pthread_t,int>::iterator tmp=++ite;
                    pthread_mutex_lock(&map_mutex);
                    threads.erase(--ite);
                    ThreadNum--;
                    if(ThreadNum!=threads.size())
                        printf("thread num is wrong
    ");
                    pthread_mutex_unlock(&map_mutex);
                    ite=tmp;
    //                printf("**thread %lu 
    ",ite->first);
    //                printf("**thread %d
    ",threads.size());
                    num++;
                    if(num==n)
                        break;
                }else{
                    ++ite;
                }
            }
    
        }
    }
    void ThreadPool::poolInit(){
        for(int i=0;i<maxThreadNum;i++)
            addThread();
    }
    void ThreadPool::poolDestroy(){
        printf("thread pool begin to destory
    ");
        while(threads.size()!=0){
            for(map<pthread_t,int>::iterator ite=threads.begin();ite!=threads.end();){
                if(ite->second==THREAD_WAIT){
                    setThreadStat(ite->first,THREAD_SHUT);
                    pthread_cond_broadcast(&cond);
                    pthread_join(ite->first,NULL);
                    map<pthread_t,int>::iterator tmp=++ite;
                    pthread_mutex_lock(&map_mutex);
                    threads.erase(--ite);
                    ThreadNum--;
                    if(ThreadNum!=threads.size())
                        printf("thread num is wrong
    ");
                    pthread_mutex_unlock(&map_mutex);
                    ite=tmp;
                }
            }
        }
        pthread_mutex_destroy(&mutex);
        pthread_cond_destroy(&cond);
        printf("thread pool has destoryed
    ");
    }
    void ThreadPool::addTask(Task *task){
        taskList.push(task);
        pthread_cond_signal(&cond);
    }
    int ThreadPool::getTaskListSize(){
        return taskList.size();
    }
    int ThreadPool::getPoolSize(){
        return ThreadNum;
    }

    工程

    https://github.com/tla001/ThreadPool

  • 相关阅读:
    python连接字符串的几种方法--转子(香草拿铁的园子)
    winform属性
    C# Timer
    SQL基础
    SQL 基础
    File类 ReadAllBytes() ReadAllLines() ReadAllText()
    学习C#20天有感
    装箱和拆箱
    机器学习基础:朴素贝叶斯小结
    分类问题样本不均衡问题
  • 原文地址:https://www.cnblogs.com/tla001/p/6862308.html
Copyright © 2011-2022 走看看