1. 轮询方式
#include <iostream>
#include <string>
#include <memory>
#include <vector>
#include <mutex>
#include <queue>
#include <thread>
#include <atomic>
// interface
class Runnable{
public:
virtual void Run() = 0;
};
// thread pool
class FixedThreadPool{
public:
FixedThreadPool(int size) :threadPoolSize(size){}
~FixedThreadPool(){
if (running){
Stop();
}
}
void Start(){
running = true;
for (int i = 0; i < threadPoolSize; ++i){
std::shared_ptr<std::thread> t = std::make_shared<std::thread>(&FixedThreadPool::Run,this);
std::lock_guard<std::mutex> lk(txThread);
threads.push_back(t);
}
}
void Stop(){
bool flag = true;
while (flag){
std::this_thread::sleep_for(std::chrono::seconds(1));
std::lock_guard<std::mutex> lk(txTasks);
flag = !tasks.empty();
}
running = false;
for (auto thread : threads){
thread->join();
}
}
void Submit(std::shared_ptr<Runnable> r){
std::lock_guard<std::mutex> lk(txTasks);
tasks.push(r);
}
private:
void Run(){
while (running){
std::shared_ptr<Runnable> task;
{
std::lock_guard<std::mutex> lk(txTasks);
if (tasks.empty()){
continue;
}
task = tasks.front();
tasks.pop();
}
task->Run();
}
}
private:
int threadPoolSize;
std::vector<std::shared_ptr<std::thread>> threads;
std::mutex txThread;
std::queue<std::shared_ptr<Runnable>> tasks;
std::mutex txTasks;
std::atomic_bool running;
};
// concrete runnable
class Analyzer:public Runnable{
public:
Analyzer() = default;
void Run(){
std::cout << "analyze is running, cnt is " << cnt++ << std::endl;
}
private:
static std::atomic_int cnt;
};
std::atomic_int Analyzer::cnt;
// thread pool test
void ThreadPoolTest(){
std::shared_ptr<FixedThreadPool> pool = std::make_shared<FixedThreadPool>(5);
pool->Start();
for (int i = 0; i < 100; ++i){
std::shared_ptr<Runnable> r = std::make_shared<Analyzer>();
pool->Submit(r);
}
// pool->Stop();
}
// main
int main(){
ThreadPoolTest();
return 0;
}
2. 条件变量方式
#include <iostream>
#include <string>
#include <memory>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>
#include <atomic>
// interface
class Runnable{
public:
virtual void Run() = 0;
};
// thread pool
class FixedThreadPool{
public:
FixedThreadPool(int size) :threadPoolSize(size){}
~FixedThreadPool(){
if (running){
Stop();
}
}
void Start(){
running = true;
for (int i = 0; i < threadPoolSize; ++i){
std::shared_ptr<std::thread> t = std::make_shared<std::thread>(&FixedThreadPool::Run,this);
std::lock_guard<std::mutex> lk(txThread);
threads.push_back(t);
}
}
void Stop(){
bool flag = true;
while (flag){
std::this_thread::sleep_for(std::chrono::seconds(1));
std::lock_guard<std::mutex> lk(txTasks);
flag = !tasks.empty();
}
running = false;
cond.notify_all();
for (auto thread : threads){
thread->join();
}
}
void Submit(std::shared_ptr<Runnable> r){
std::lock_guard<std::mutex> lk(txTasks);
tasks.push(r);
cond.notify_one();
}
private:
void Run(){
while (running){
std::shared_ptr<Runnable> task;
/*{
std::lock_guard<std::mutex> lk(txTasks);
if (tasks.empty()){
continue;
}
task = tasks.front();
tasks.pop();
}*/
{
std::unique_lock<std::mutex> lk(txTasks);
while (tasks.empty() && running){
cond.wait(lk);
}
if (!running){
break;
}
task = tasks.front();
tasks.pop();
}
task->Run();
}
}
private:
int threadPoolSize;
std::vector<std::shared_ptr<std::thread>> threads;
std::mutex txThread;
std::queue<std::shared_ptr<Runnable>> tasks;
std::mutex txTasks;
std::condition_variable cond;
std::atomic_bool running;
};
// concrete runnable
class Analyzer:public Runnable{
public:
Analyzer() = default;
void Run(){
std::cout << "analyze is running, cnt is " << cnt++ << std::endl;
}
private:
static std::atomic_int cnt;
};
std::atomic_int Analyzer::cnt;
// thread pool test
void ThreadPoolTest(){
std::shared_ptr<FixedThreadPool> pool = std::make_shared<FixedThreadPool>(10);
pool->Start();
for (int i = 0; i < 100; ++i){
std::shared_ptr<Runnable> r = std::make_shared<Analyzer>();
pool->Submit(r);
}
// pool->Stop();
}
// main
int main(){
ThreadPoolTest();
return 0;
}