1 #include <thread>
2 #include <iostream>
3 #include <chrono>
4 #include <string>
5 #include <mutex>
6 #include <deque>
7 #include <condition_variable>
8 #include <vector>
9 using namespace std;
10
11 int remain = 5; //控制5个生产者是否结束,当为0时,消费者退出等待。
12
13 /**
14 * 生产者类
15 *
16 * */
17 class producer{
18 string name;
19 public:
20 thread* prothread;
21 producer(const string& Name ) : name(Name), prothread(nullptr){};
22 ~producer(){ delete prothread; cout<< name << "销毁" << endl; }
23 string getName(){ return name; }
24 void produce(deque<string>* task, mutex* m_mutex, condition_variable* m_cond);
25 void run(deque<string>* task, mutex* m_mutex, condition_variable* m_cond);
26 };
27
28 void producer::produce(deque<string>* task, mutex* m_mutex, condition_variable* m_cond){
29 using namespace std::chrono;
30 string s;
31 int i = 20;
32 while(i--){
33 s+="0";
34 this_thread::sleep_for(chrono::milliseconds(10));
35 m_mutex->lock();
36 task->push_back(s);
37 std::cout << name << " : " << s << std::endl;
38 m_mutex->unlock();
39 m_cond->notify_one();
40 }
41 m_mutex->lock();
42 remain--;
43 if(remain == 0) m_cond->notify_all();
44 m_mutex->unlock();
45 }
46
47 void producer::run(deque<string>* task, mutex* m_mutex, condition_variable* m_cond){
48 prothread = new thread(produce, this, task, m_mutex, m_cond);
49 }
50
51 /**
52 * 消费者类
53 *
54 * */
55 class consumer{
56 string name;
57 public:
58 thread* conthread;
59 consumer(const string& Name ) : name(Name), conthread(nullptr){};
60 ~consumer(){ delete conthread; cout<< name << "销毁" << endl; }
61 string getName(){ return name; }
62 void consume(deque<string>* task, mutex* m_mutex, condition_variable* m_cond);
63 void run(deque<string>* task, mutex* m_mutex, condition_variable* m_cond);
64 };
65
66 void consumer::consume(deque<string>* task, mutex* m_mutex, condition_variable* m_cond){
67 using namespace std::chrono;
68 while(1){
69 std::unique_lock<std::mutex> locker(*m_mutex);
70 while(task->size() == 0 && remain)
71 m_cond->wait(locker);
72 if(task->size() == 0) break;
73 this_thread::sleep_for(chrono::milliseconds(50));
74 std::cout << name << " : " << task->front() << std::endl;
75 task->pop_front();
76 locker.unlock();
77 }
78 }
79
80 void consumer::run(deque<string>* task, mutex* m_mutex, condition_variable* m_cond){
81 conthread = new thread(consume, this, task, m_mutex, m_cond);
82 }
83
84
85 int main() {
86 deque<string> task; //任务队列
87 mutex m_mutex; //互斥量
88 condition_variable m_cond; //条件变量
89 vector<producer*> producers;
90 vector<consumer*> consumers;
91 for(int i = 0; i < 5; i++)
92 {
93 string name = "生产者";
94 name += '0' + i;
95 producers.push_back(new producer(name));
96 producers[i]->run(&task, &m_mutex, &m_cond);
97 }
98 for(int i = 0; i < 10; i++){
99 string name = "消费者";
100 name += '0' + i;
101 consumers.push_back(new consumer(name));
102 consumers[i]->run(&task, &m_mutex, &m_cond);
103 }
104 for(int i = 0; i < 5; i++){
105 producers[i]->prothread->join();
106 }
107 for(int i = 0; i < 10; i++){
108 consumers[i]->conthread->join();
109 }
110 for(int i = 0; i < 5; i++){
111 delete producers[i];
112 }
113 for(int i = 0; i < 10; i++){
114 delete consumers[i];
115 }
116 getchar();
117 return 0;
118 }