zoukankan      html  css  js  c++  java
  • Linux C++线程池

    1、为什么需要线程池?
    
    部分应用程序需要执行很多细小的任务,对于每个任务都创建一个线程来完成,任务完成后销毁线程,而这就会产生一个问题:当执行的任务所需要的时间T1小于等于创建线程时间T2和销毁线程时间T3总和时即T1 <= T2 + T3,应用处理任务的响应能力会大大减弱,从而影响了应用程序性能,为了解决这类问题,线程池技术提供了很好的解决方案。线程池顾名思义就是把线程资源池化,在应用启动时一次性创建合适数量的线程,当需要执行任务时就从线程池中分配一个已经创建好的线程来执行,执行完在把线程归还,只在应用停时再一次性销毁所有的线程。
    
    2、线程池的基本组成部分
    
    一个简单的线程池至少包括下列的组成部分:
    
    1)线程池管理器(ThreadPool):用于创建一个线程池对象并管理线程池,如分配任务给某个空闲线程,查看当前线程状态等等的操作。
    
    2)工作线程(WorkThread):线程池中线程,可能是挂起,可能是被分配了任务,若然是挂起,则用一个信号量去阻塞直到有任务分配。
    
    3)任务接口(Task):每个任务必须实行的接口,以供工作线程调度任务执行。
    
    3、Unix下的线程池实现
    
    将给大家展示的线程池实现的类如下,含有比较多的面向对象设计思想。
    主要是
    一个线程池管理多个工作线程类,每个工作线程类对象管理一个线程。

    1)Mutex:互斥量类,里面只有一个pthread_mutex_t的私有成员,对POSIX互斥量进行封装,后面用于线程池的队列和栈。

    Mutex.h

     1 #ifndef MUTEX_H
     2 #define MUTEX_H
     3 
     4 #include <iostream>
     5 #include <pthread.h>
     6 using namespace std;
     7 
     8 class Mutex
     9 {
    10 public:
    11     Mutex();
    12     void Lock();
    13     void Unlock();
    14 
    15 private:
    16     pthread_mutex_t mutex;
    17 };
    18 
    19 #endif
    View Code

    Mutex.cpp

     1 #include "Mutex.h"
     2 
     3 Mutex::Mutex()
     4 {
     5     pthread_mutex_init(&mutex,NULL);
     6 }
     7 
     8 void Mutex::Lock()
     9 {
    10     pthread_mutex_lock(&mutex);
    11 }
    12 
    13 void Mutex::Unlock()
    14 {
    15     pthread_mutex_unlock(&mutex);
    16 }
    View Code

    2)BaseTask:任务的抽象基类,任何具体的任务都要继承该类,并实现自己的void * run()函数,
    即线程运行的函数。在这里继承的类定义实现在main函数里面。抽象类不能被实例化,但可以声明为指针指向继承的子类。

    BaseTask.h

     1 #ifndef BASE_TASK_H
     2 #define BASE_TASK_H
     3 
     4 #include <iostream>
     5 using namespace std;
     6 
     7 class BaseTask
     8 {
     9     public:
    10         virtual void run() = 0;
    11 };
    12 
    13 #endif
    View Code

    3).MyTask:具体任务类,继承了BaseTask,实现了具体任务。

    MyTask.h

     1 #ifndef MYTASK_H
     2 #define MYTASK_H
     3 #include "BaseTask.h"
     4 
     5 class MyTask : public BaseTask
     6 {
     7 public:
     8     virtual void run(void);
     9 };
    10 
    11 #endif
    View Code

    MyTask.cpp

    1  #include "MyTask.h"
    2  
    3  void MyTask::run(void)
    4  {
    5      cout<<"Hello MyTask"<<endl;
    6  }
    View Code

    4).MyThread: 对POSIX线程的C++封装,实现了执行任务的基本接口

    MyThread.h

     1 #ifndef MYTHREAD_H
     2 #define MYTHREAD_H
     3 
     4 #include <iostream>
     5 #include <pthread.h>
     6 #include <semaphore.h>
     7 #include "BaseTask.h"
     8 
     9 
    10 // 前置定义
    11 class MyThreadPool;
    12 
    13 class MyThread
    14 {
    15 public:
    16     MyThread(MyThreadPool* mtp);
    17     void Set_Task(BaseTask* task);
    18     void Start_Task();
    19     /* 线程启动函数 必须写成静态成员函数 传入的参数为类对象自己*/
    20     static void* Start_Func(void* arg);
    21     /* 完成一个任务后 询问线程池管理器是否有未完成的任务*/
    22     bool Fetch_Task();
    23     /* 无更多任务 让自己进入线程池栈*/
    24     void Recycle();
    25 private:
    26     /* 用于挂起线程 */
    27     sem_t sem;
    28     pthread_t tid;
    29     BaseTask* task;
    30     MyThreadPool* mtp;
    31 };
    32 
    33 #endif
    View Code

    MyThread.cpp

     1 #include "MyThreadPool.h"
     2 #include "MyThread.h"
     3 
     4 
     5 MyThread::MyThread(MyThreadPool* mtp)
     6 {
     7     sem_init(&sem,0,0);
     8     this->mtp = mtp;
     9     pthread_create(&tid,NULL,Start_Func,(void*)this);
    10 }
    11 
    12 void MyThread::Set_Task(BaseTask* task)
    13 {
    14     this->task = task;
    15 }
    16 
    17 void MyThread::Start_Task()
    18 {
    19     sem_post(&sem);
    20 }
    21 
    22 void* MyThread::Start_Func(void* arg)
    23 {
    24     MyThread* mt = (MyThread*) arg;
    25     while(1)
    26     {
    27         sem_wait(&mt->sem);
    28         mt->task->run();
    29         if(mt->Fetch_Task())
    30             mt->Start_Task();
    31         else
    32             mt->Recycle();
    33     }
    34 }
    35 
    36 bool MyThread::Fetch_Task()
    37 {
    38     return mtp->FetchTask(this);
    39 }
    40 
    41 void MyThread::Recycle()
    42 {
    43     mtp->Recycle(this);
    44 }
    View Code

    MyThreadPool:线程池管理类,实现线程的创建管理和任务调度。

    MyThreadPool.h

     1 #ifndef MYTHREADPOOL_H
     2 #define MYTHREADPOOL_H
     3 
     4 #include <iostream>
     5 #include <stack>
     6 #include <queue>
     7 #include "Mutex.h"
     8 #include "BaseTask.h"
     9 
    10 class MyThread;
    11 
    12 class MyThreadPool
    13 {
    14 public:
    15     /* no为要开辟的线程数目*/
    16     MyThreadPool(int no);
    17     /* 添加任务*/
    18     void AddTask(BaseTask* task);
    19     /* 供线程类调用 让线程类对象询问线程池任务队列中是否仍有任务未完成*/
    20     bool FetchTask(MyThread* mt);
    21     /* 回收线程 */
    22     void Recycle(MyThread* mt);
    23 private:
    24     int no;
    25     Mutex smutex;
    26     Mutex qmutex;
    27 
    28     stack<MyThread*> sthread;
    29     queue<BaseTask*> qtask;
    30 };
    31 
    32 #endif
    View Code

    MyThreadPool.cpp

     1 #include "MyThreadPool.h"
     2 #include "MyThread.h"
     3 
     4 MyThreadPool::MyThreadPool(int no)
     5 {
     6     this->no = no;
     7 
     8     for(int i=0;i<no;++i)
     9     {
    10         sthread.push(new MyThread(this));
    11     }
    12 }
    13 
    14 void MyThreadPool::AddTask(BaseTask* task)
    15 {
    16     smutex.Lock();
    17     if (!sthread.empty())
    18     {
    19         MyThread* mt = sthread.top();
    20         sthread.pop();
    21         smutex.Unlock();
    22         mt->Set_Task(task);
    23         mt->Start_Task();
    24     }
    25     else
    26     {
    27         smutex.Unlock();    
    28         qmutex.Lock();
    29         qtask.push(task);
    30         qmutex.Unlock();
    31     }
    32 }
    33 
    34 bool MyThreadPool::FetchTask(MyThread* mt)
    35 {
    36     qmutex.Lock();
    37     if (!qtask.empty())
    38     {
    39         mt->Set_Task(qtask.front());
    40         qmutex.Unlock();
    41         return true;
    42     }
    43     else
    44     {
    45         qmutex.Unlock();
    46         return false;
    47     }
    48 }
    49 
    50 void MyThreadPool::Recycle(MyThread* mt)
    51 {
    52     smutex.Lock();
    53     sthread.push(mt);
    54     smutex.Unlock();
    55 }
    View Code

    main.cpp

     1 #include "MyThread.h"
     2 #include "MyThreadPool.h"
     3 #include "Mutex.h"
     4 #include "MyTask.h"
     5 
     6 int main()
     7 {
     8     MyThreadPool mtp(3);
     9     while(1)
    10     {
    11         BaseTask* task = new MyTask;
    12         mtp.AddTask(task);
    13         sleep(5);
    14         delete task;
    15         task = NULL;
    16     }
    17 }
    View Code

    makefile

     1 pro: main.cpp libtp.a MyTask.cpp
     2     g++ -lpthread  main.cpp -L. -ltp MyTask.cpp -o pro
     3 
     4 libtp.a: MyThreadPool.o MyThread.o Mutex.o
     5     ar cr libtp.a Mutex.o MyThreadPool.o MyThread.o
     6 
     7 MyThread.o:MyThread.cpp
     8     g++ -c -lpthread MyThread.cpp -o MyThread.o
     9 
    10 MyThreadPool.o:MyThreadPool.cpp
    11     g++ -c -lpthread MyThreadPool.cpp -o MyThreadPool.o
    12 
    13 Mutex.o:Mutex.cpp
    14     g++ -c -lpthread Mutex.cpp -o Mutex.o
    15 
    16 clean:
    17     rm libtp.a MyThreadPool.o Mutex.o MyThread.o pro
    View Code

    执行 make 成功后,执行./pro 即可

    编译: 我首先将线程类(MyThread.o) 互斥类(Mutex.o) 和 线程池类(MyThreadPool.o) 打包成一个静态库

    静态库留出两个接口 一个是线程池的初始化 另外一个是用户自己继承BaseTask的run函数后 调用线程池的类对象 AddTask接口去增加任务

    所以使用者只需自己指定要开辟的线程数(线程池的构造函数) 和 自定义 任务类对象即可使用这个线程池.

    ps:所有文件都放在同一个目录下

    Coding Life
  • 相关阅读:
    《大道至简》读后感
    PowerBuilder学习笔记之1开发环境
    PowerBuilder学习笔记之14用户自定义对象
    查询数据库大小的代码
    JAVA基础_修饰符
    SQLSERVER查询存储过程内容
    Asp.Net WebAPI中Filter过滤器的使用以及执行顺序
    运算符
    判断(if)语句
    变量的命名
  • 原文地址:https://www.cnblogs.com/lewiskyo/p/4607040.html
Copyright © 2011-2022 走看看