zoukankan      html  css  js  c++  java
  • 05C++11生产者消费者模式2

    #pragma once
    #include <iostream>
    #include <chrono>
    #include <ctime>
    #include <iomanip>
    #include <string>
    #include <thread>
    #include <mutex>
    #include <condition_variable>
    #include <list>
    #include <chrono>
    #include <memory>
    
    using namespace std;
    using namespace chrono;
    
    typedef std::function<void(const int&)> NofityProc;
    typedef std::function<void(const string&, const int&)> NofityProc2;
    
    class MyFactory
    {
    public:
    	MyFactory();
    	virtual ~MyFactory();
    	int startProduct();
    	int stopProduct();
    
    	int startConsumes(const NofityProc& proc, const int& threadCount = 4);
    	int stopConsumes();
    private:
    	int startConsume(const NofityProc& proc);
    
    	thread m_producerThread;							//生产者线程
    	list<shared_ptr<thread>>	m_consumeThreadList;	//消费者线程列表
    	bool m_isIntialized;								//初始化标识
    	bool m_isStartedtoConsume;							//消费者现线程标识
    	mutex m_mtx;										// 全局互斥锁.
    	condition_variable m_cv;							// 全局条件变量.
    
    	list<int> m_dataList;
    };
    
    
    
    #include "my_factory.h"
    
    MyFactory::MyFactory():
    m_isIntialized(false),
    m_isStartedtoConsume(false)
    {
    }
    
    MyFactory::~MyFactory()
    {
    }
    
    int MyFactory::startProduct()
    {
    	if (m_isIntialized)
    	{
    		return -1;
    	}
    	m_dataList.clear();
    	m_isIntialized = true;
    
    	//生产者线程
    	m_producerThread = thread([this](){
    		while (m_isIntialized)
    		{
    			std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    			std::unique_lock <std::mutex> lck(m_mtx);
    
    			//处理业务
    			auto d = system_clock::now().time_since_epoch();
    			auto sec = duration_cast<seconds>(d);
    			m_dataList.push_back((int)sec.count());
    			m_cv.notify_all();
    			cout << "product thread notify all..." << endl;
    		}
    	});
    
    	return 0;
    }
    
    int MyFactory::stopProduct()
    {
    	m_isIntialized = false;
    	m_producerThread.join();
    	return 0;
    }
    
    int MyFactory::startConsume(const NofityProc& proc)
    {
    	//消费者
    // 	if (m_isStarted)
    // 	{
    // 		return -1;
    // 	}
    	m_isStartedtoConsume = true;
    
    	m_consumeThreadList.push_back(make_shared<thread>([this, &proc]{
    		while (m_isStartedtoConsume)
    		{
    			std::unique_lock <std::mutex> lck(m_mtx);
    			while (m_dataList.empty())
    			{
    				m_cv.wait(lck);
    			}
    			//消费数据data
    			auto data = m_dataList.front();
    			m_dataList.pop_front();
    			lck.unlock();
    			//处理数据
    			cout << "consume thread " << this_thread::get_id() << " get data..." << endl;
    			//模拟保存数据库等耗时操作
    			std::this_thread::sleep_for(std::chrono::milliseconds(500));
    			
    			//通知消费结果
    			if (proc)
    			{
    				proc(data);
    			}
    		}
    	}));
    
    	return 0;
    }
    
    int MyFactory::startConsumes(const NofityProc& proc, const int& threadCount /*= 4*/)
    {
    	for (int i = 0; i < threadCount; ++i)
    	{
    		startConsume(proc);
    	}
    	return 0;
    }
    
    int MyFactory::stopConsumes()
    {
    	m_isStartedtoConsume = false;
    	for (auto& thread : m_consumeThreadList)
    	{
    		thread->join();
    	}
    	return 0;
    }
    
    
    #pragma once
    #pragma execution_character_set("utf-8")
    
    #include <iostream>
    #include "my_factory.h"
    
    using namespace std;
    
    int main()
    {
    	cout << "Hello world!" << endl;
    
    
    	MyFactory mf;
    	auto ret = mf.startProduct();
    	if (0 != ret)
    	{
    		return -1;
    	}
    	std::this_thread::sleep_for(std::chrono::milliseconds(2000));
    
    	NofityProc proc = [](const int& data){
    		cout << "data:" << data << endl;
    	};
    	ret = mf.startConsumes(proc);
    
    
    	std::this_thread::sleep_for(std::chrono::milliseconds(5000));
    	ret = mf.stopConsumes();
    	if (0 != ret)
    	{
    		return -1;
    	}
    
    	std::this_thread::sleep_for(std::chrono::milliseconds(3000));
    	ret = mf.stopProduct();
    	if (0 != ret)
    	{
    		return -1;
    	}
    
    	cout << "return from main()" << endl;
    	std::this_thread::sleep_for(std::chrono::milliseconds(3000));
    	return 0;
    }
    

    image

  • 相关阅读:
    ORA-28040: No matching authentication protocol
    kettle增量抽取数据--高效抽取方式
    为什么MySQL的索引要使用B+树而不是其它树形结构?比如B树?
    echarts饼图指示线文字换行
    数据库概论
    Java学习笔记
    案例分析
    软件工程作业3
    软件工程作业2
    软件工程作业1
  • 原文地址:https://www.cnblogs.com/rock-cc/p/12725802.html
Copyright © 2011-2022 走看看