zoukankan      html  css  js  c++  java
  • linux下的c++线程池实现

    我设计这个线程池的初衷是为了与socket对接的。线程池的实现千变万化,我得这个并不一定是最好的,但却是否和我心目中需求模型的。现把部分设计思路和代码贴出,以期抛砖引玉。个人比较喜欢搞开源,所以大家如果觉得有什么需要改善的地方,欢迎给予评论。思前想后,也没啥设计图能表达出设计思想,就把类图贴出来吧。

    类图设计如下:

    Command类是我们的业务类。这个类里只能存放简单的内置类型,这样方便与socket的直接传输。我定义了一个cmd_成员用于存放命令字,arg_用于存放业务的参数。这个参数可以使用分隔符来分隔各个参数。我设计的只是简单实现,如果有序列化操作了,完全不需要使用我这种方法啦。

    ThreadProcess就是业务处理类,这里边定义了各个方法用于进行业务处理,它将在ThreadPool中的Process函数中调用。

    ThreadPool就是我们的线程池类。其中的成员变量都是静态变量,Process就是线程处理函数。

    #define MAX_THREAD_NUM 50 // 该值目前需要设定为初始线程数的整数倍
    #define ADD_FACTOR 40 // 该值表示一个线程可以处理的最大任务数
    #define THREAD_NUM 10 // 初始线程数

    bshutdown_:用于线程退出。

    command_:用于存放任务队列

    command_cond_:条件变量

    command_mutex_:互斥锁

    icurr_thread_num_:当前线程池中的线程数

    thread_id_map_:这个map用于存放线程对应的其它信息,我只存放了线程的状态,0为正常,1为退出。还可以定义其它的结构来存放更多的信息,例如存放套接字。

    InitializeThreads:用于初始化线程池,先创建THREAD_NUM个线程。后期扩容也需要这个函数。

    Process:线程处理函数,这里边会调用AddThread和DeleteThread在进行线程池的伸缩。

    AddWork:往队列中添加一个任务。

    ThreadDestroy:线程销毁函数。

    AddThread:扩容THREAD_NUM个线程

    DeleteThread:如果任务队列为空,则将原来的线程池恢复到THREAD_NUM个。这里可以根据需要进行修改。

     

    以下贴出代码以供大家参考。

    command.h

    #ifndef COMMAND_H_
    #define COMMAND_H_
    
    class Command
    {
    public:
        int get_cmd();
        char* get_arg();
        void set_cmd(int cmd);
        void set_arg(char* arg);
    private:
        int cmd_;
        char arg_[65];
    };
    
    #endif /* COMMAND_H_ */

    command.cpp

    #include <string.h>
    #include "command.h"
    
    
    int Command::get_cmd()
    {
        return cmd_;
    }
    
    char* Command::get_arg()
    {
        return arg_;
    }
    
    void Command::set_cmd(int cmd)
    {
        cmd_ = cmd;
    }
    
    void Command::set_arg(char* arg)
    {
        if(NULL == arg)
        {
            return;
        }
        strncpy(arg_,arg,64);
        arg_[64] = '\0';
    }

    thread_process.h

    #ifndef THREAD_PROCESS_H_
    #define THREAD_PROCESS_H_
    
    class ThreadProcess
    {
    public:
        void Process0(void* arg);
        void Process1(void* arg);
        void Process2(void* arg);
    };
    
    #endif /* THREAD_PROCESS_H_ */

    thread_process.cpp

    #include <pthread.h>
    #include <stdio.h>
    #include <unistd.h>
    #include "thread_process.h"
    
    
    void ThreadProcess::Process0(void* arg)
    {
        printf("thread %u is starting process %s\n",pthread_self(),arg);
        usleep(100*1000);
    }
    void ThreadProcess::Process1(void* arg)
    {
        printf("thread %u is starting process %s\n",pthread_self(),arg);
        usleep(100*1000);
    }
    
    void ThreadProcess::Process2(void* arg)
    {
        printf("thread %u is starting process %s\n",pthread_self(),arg);
        usleep(100*1000);
    }

    thread_pool.h

    #ifndef THREAD_POOL_H_
    #define THREAD_POOL_H_
    
    #include <map>
    #include <vector>
    #include "command.h"
    
    #define MAX_THREAD_NUM 50 // 该值目前需要设定为初始线程数的整数倍
    #define ADD_FACTOR 40 // 该值表示一个线程可以处理的最大任务数
    #define THREAD_NUM 10 // 初始线程数
    
    class ThreadPool
    {
    public:
        ThreadPool() {};
        static void InitializeThreads();
        void AddWork(Command command);
        void ThreadDestroy(int iwait = 2);
    private:
        static void* Process(void* arg);
        static void AddThread();
        static void DeleteThread();
        static bool bshutdown_;
        static int icurr_thread_num_;
        static std::map<pthread_t,int> thread_id_map_;
        static std::vector<Command> command_;
        static pthread_mutex_t command_mutex_;
        static pthread_cond_t command_cond_;
    };
    
    
    #endif /* THREAD_POOL_H_ */

    thread_pool.cpp

    #include <pthread.h>
    #include <stdlib.h>
    #include "thread_pool.h"
    #include "thread_process.h"
    #include "command.h"
    
    bool ThreadPool::bshutdown_ = false;
    int ThreadPool::icurr_thread_num_ = THREAD_NUM;
    std::vector<Command> ThreadPool::command_;
    std::map<pthread_t,int> ThreadPool::thread_id_map_;
    pthread_mutex_t ThreadPool::command_mutex_ = PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t ThreadPool::command_cond_ = PTHREAD_COND_INITIALIZER;
    
    void ThreadPool::InitializeThreads()
    {
        for (int i = 0; i < THREAD_NUM ; ++i)
        {
            pthread_t tempThread;
            pthread_create(&tempThread, NULL, ThreadPool::Process, NULL);
            thread_id_map_[tempThread] = 0;
        }
    }
    
    void* ThreadPool::Process(void* arg)
    {
        ThreadProcess threadprocess;
        Command command;
        while (true)
        {
            pthread_mutex_lock(&command_mutex_);
            // 如果线程需要退出,则此时退出
            if (1 == thread_id_map_[pthread_self()])
            {
                pthread_mutex_unlock(&command_mutex_);
                printf("thread %u will exit\n", pthread_self());
                pthread_exit(NULL);
            }
            // 当线程不需要退出且没有需要处理的任务时,需要缩容的则缩容,不需要的则等待信号
            if (0 == command_.size() && !bshutdown_)
            {
                if(icurr_thread_num_ >  THREAD_NUM)
                {
                    DeleteThread();
                    if (1 == thread_id_map_[pthread_self()])
                    {
                        pthread_mutex_unlock(&command_mutex_);
                        printf("thread %u will exit\n", pthread_self());
                        pthread_exit(NULL);
                    }
                }
                pthread_cond_wait(&command_cond_,&command_mutex_);
            }
            // 线程池需要关闭,关闭已有的锁,线程退出
            if(bshutdown_)
            {
                pthread_mutex_unlock (&command_mutex_);
                printf ("thread %u will exit\n", pthread_self ());
                pthread_exit (NULL);
            }
            // 如果线程池的最大线程数不等于初始线程数,则表明需要扩容
            if(icurr_thread_num_ < command_.size()))
            {
                AddThread();
            }
            // 从容器中取出待办任务
            std::vector<Command>::iterator iter = command_.begin();
            command.set_arg(iter->get_arg());
            command.set_cmd(iter->get_cmd());
            command_.erase(iter);
            pthread_mutex_unlock(&command_mutex_);
            // 开始业务处理
            switch(command.get_cmd())
            {
            case 0:
                threadprocess.Process0(command.get_arg());
                break;
            case 1:
                threadprocess.Process1(command.get_arg());
                break;
            case 2:
                threadprocess.Process2(command.get_arg());
                break;
            default:
                break;
            }
        }
        return NULL; // 完全为了消除警告(eclipse编写的代码,警告很烦人)
    }
    
    void ThreadPool::AddWork(Command command)
    {
        bool bsignal = false;
        pthread_mutex_lock(&command_mutex_);
        if (0 == command_.size())
        {
            bsignal = true;
        }
        command_.push_back(command);
        pthread_mutex_unlock(&command_mutex_);
        if (bsignal)
        {
            pthread_cond_signal(&command_cond_);
        }
    }
    
    void ThreadPool::ThreadDestroy(int iwait)
    {
        while(0 != command_.size())
        {
            sleep(abs(iwait));
        }
        bshutdown_ = true;
        pthread_cond_broadcast(&command_cond_);
        std::map<pthread_t,int>::iterator iter = thread_id_map_.begin();
        for (; iter!=thread_id_map_.end(); ++iter)
        {
            pthread_join(iter->first,NULL);
        }
        pthread_mutex_destroy(&command_mutex_);
        pthread_cond_destroy(&command_cond_);
    }
    
    void ThreadPool::AddThread()
    {
        if(((icurr_thread_num_*ADD_FACTOR) < command_.size())
                && (MAX_THREAD_NUM != icurr_thread_num_))
        {
            InitializeThreads();
            icurr_thread_num_ += THREAD_NUM;
        }
    }
    
    void ThreadPool::DeleteThread()
    {
        int size = icurr_thread_num_ - THREAD_NUM;
        std::map<pthread_t,int>::iterator iter = thread_id_map_.begin();
        for(int i=0; i<size; ++i,++iter)
        {
            iter->second = 1;
        }
    }

    main.cpp

    #include "thread_pool.h"
    #include "command.h"
    
    int main()
    {
        ThreadPool thread_pool;
        thread_pool.InitializeThreads();
        Command command;
        char arg[8] = {0};
        for(int i=1; i<=1000; ++i)
        {
            command.set_cmd(i%3);
            sprintf(arg,"%d",i);
            command.set_arg(arg);
            thread_pool.AddWork(command);
        }
        sleep(10); // 用于测试线程池缩容
        thread_pool.ThreadDestroy();
        return 0;
    }

    代码是按照google的开源c++编码规范编写。大家可以通过改变那几个宏的值来调整线程池。有问题大家一起讨论。

  • 相关阅读:
    Java vs Python
    Compiled Language vs Scripting Language
    445. Add Two Numbers II
    213. House Robber II
    198. House Robber
    276. Paint Fence
    77. Combinations
    54. Spiral Matrix
    82. Remove Duplicates from Sorted List II
    80. Remove Duplicates from Sorted Array II
  • 原文地址:https://www.cnblogs.com/osyun/p/2664938.html
Copyright © 2011-2022 走看看