写了一个简易线程池,
原理简单介绍下,就是设置一个任务队列queue,用来放要执行的函数,还有一个线程数组vector,用来存放所有的线程。
线程创建以后就存放在相应的vector里,空闲的线程去queue里去取要执行的函数地址,在run函数中执行,假如一个线程的run函数执行好后,
发现队列没有任务可取,则阻塞该线程,通过conidtion_variable变量的wait()函数进行阻塞,等待新的任务被添加进来后,会有一个cond变量的notify_one()
函数来唤醒阻塞中的run函数。
现在放代码吧!
线程池头文件Thread_Pool.h
/******************************************** 线程池头文件 Author:十面埋伏但莫慌 Time:2020/05/03 *********************************************/ #pragma once #ifndef _THREAD_POOL_H_ #define _THREAD_POOL_H_ #include<thread> #include<queue> #include<mutex> #include<atomic> #include<vector> #include<condition_variable> typedef std::function<void()> Func;//定义线程执行函数类型,方便后面编码使用。 //任务类 template<typename T> class Task { public: Task() {} ~Task() {} int push(T func)//添加任务; { try { tasks.emplace(func); } catch (std::exception e) { throw e; return -1; } return 1; } int getTaskNum()//获得当前队列中的任务数; { return tasks.size(); } T pop()//取出待执行的任务; { T temp; if (tasks.empty()) return temp; else { temp = tasks.front(); tasks.pop(); return temp; } } private: std::queue<T> tasks;//任务队列 }; //线程池类 class Thread_Pool { public: Thread_Pool() :IsStart(false) {} ~Thread_Pool(); int addTasks(Func&& tasks);//添加任务; void start();//开启线程池; void stop();//关闭线程池; int getTaskNum();//获得当前队列中的任务数; private: void run();//线程工作函数; private: static const int maxThreadNum = 3;//最大线程数为3; std::mutex mx;//锁; std::condition_variable cond;//条件量; std::vector<std::thread*> threads;//线程向量; bool IsStart;//原子变量,判断线程池是否运行; Task<Func> tasks;//任务变量; }; #endif
线程池实现文件 Thread_Pool.cpp
/******************************************** 线程池CPP文件 Author:十面埋伏但莫慌 Time:2020/05/03 *********************************************/ #include"Thread_Pool.h" #include<iostream> int Thread_Pool::addTasks(Func&& func) { std::unique_lock<std::mutex> lock(mx); int ret = tasks.push(func); if (ret == 1) { std::cout << "添加任务成功" << std::endl; cond.notify_one(); } return ret; } void Thread_Pool::start() { if (!IsStart) { { std::unique_lock<std::mutex> lock(mx); IsStart = true; } threads.reserve(maxThreadNum); for (int i = 0; i < maxThreadNum; i++) { threads.emplace_back(new std::thread(std::bind(&Thread_Pool::run,this))); } } } void Thread_Pool::run() { while (IsStart) { Func f; if (tasks.getTaskNum() == 0 && IsStart) { std::unique_lock<std::mutex> lock(mx); cond.wait(lock); } { std::unique_lock<std::mutex> lock(mx); f = tasks.pop(); } if (f) f(); } } int Thread_Pool::getTaskNum() { return tasks.getTaskNum(); } void Thread_Pool::stop() { { std::unique_lock<std::mutex> lock(mx); IsStart = false; } cond.notify_all(); for (auto T : threads) { std::cout << "线程 " << T->get_id() << " 已停止。" << std::endl; T->join(); if (T != nullptr) { delete T; T = nullptr; } } std::cout << "所有线程已停止。" << std::endl; } Thread_Pool::~Thread_Pool() { if (IsStart) { stop(); } }
测试用的main.cpp文件
#include<iostream> #include"Thread_Pool.h" using namespace std; void string_out_one() { int n = 500; while (n--) { cout << "One!" << endl; } } void string_out_two() { int n = 500; while (n--) cout << "Two!" << endl; } void string_out_three() { int n = 500; while(n--) cout << "Three!" << endl; } void string_out_four() { int n = 500; while (n--) cout << "Four!" << endl; } int main() { clock_t start, finish; double totaltime; start = clock(); { /*实验对比代码段1开始处*/ Thread_Pool Pool; try { Pool.start(); } catch (std::exception e) { throw e; cout << "线程池创建失败。" << endl; } Pool.addTasks(move(string_out_one)); Pool.addTasks(move(string_out_two)); Pool.addTasks(move(string_out_three)); Pool.addTasks(move(string_out_four)); /*实验对比代码段1结束处*/ /*实验对比代码段2开始处*/ //string_out_one(); //string_out_two(); //string_out_three(); //string_out_four(); /*实验对比代码段2结束处*/ finish = clock(); totaltime = (double)(finish - start) / CLOCKS_PER_SEC; cout << " 此程序的运行时间为" << totaltime << "秒!" << endl; getchar(); } getchar(); return 0; }
总结下这个线程池,主要是要注意锁的防止位置,放太多,就变成单线程,执行效率还不如单线程,放太少,可能会造成多线程之间的误读,放的位置不对,会造成死锁。。。是真的麻烦。
两个想改进的地方,
一、希望可以在添加任务时确定任务的类型,而不是在Thread_Pool类中就确定task的类型,并且能支持传入函数形参。
二、程序的优化做的不是很好吧,虽然不知道但是觉得肯定还有优化空间。
若有不足之处欢迎指出。