zoukankan      html  css  js  c++  java
  • 04C++11生产者消费者模式

    #pragma once
    #include <iostream>
    #include <chrono>
    #include <ctime>
    #include <iomanip>
    #include <string>
    #include <thread>
    #include <mutex>
    #include <condition_variable>
    #include <list>
    #include <chrono>
    
    using namespace std;
    using namespace chrono;
    
    typedef std::function<void(const int&)> NofityProc;
    
    class MyFactory
    {
    public:
    	MyFactory();
    	virtual ~MyFactory();
    	int startProduct();
    	int stopProduct();
    
    	int startConsume(const NofityProc& proc);
    	int stopConsume();
    
    private:
    	thread m_producerThread;				//生产者线程
    	thread m_consumerThread;				//消费者线程
    	bool m_isIntialized;
    	bool m_isStarted;
    	mutex m_mtx;							// 全局互斥锁.
    	condition_variable m_cv;				// 全局条件变量.
    
    	list<int> m_dataList;
    };
    
    
    #include "my_factory.h"
    
    MyFactory::MyFactory():
    m_isIntialized(false),
    m_isStarted(false)
    {
    }
    
    MyFactory::~MyFactory()
    {
    }
    
    int MyFactory::startProduct()
    {
    	if (m_isIntialized)
    	{
    		return -1;
    	}
    	m_dataList.clear();
    	m_isIntialized = true;
    
    	//生产者线程
    	m_producerThread = thread([this](){
    		while (m_isIntialized)
    		{
    			std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    			std::unique_lock <std::mutex> lck(m_mtx);
    
    			//处理业务
    			auto d = system_clock::now().time_since_epoch();
    			auto sec = duration_cast<seconds>(d);
    			m_dataList.push_back(sec.count());
    			m_cv.notify_all();
    			cout << "product thread notify all..." << endl;
    		}
    	});
    
    	return 0;
    }
    
    int MyFactory::stopProduct()
    {
    	m_isIntialized = false;
    	m_producerThread.join();
    	return 0;
    }
    
    int MyFactory::startConsume(const NofityProc& proc)
    {
    	//消费者
    	if (m_isStarted)
    	{
    		return -1;
    	}
    	m_isStarted = true;
    
    	m_consumerThread = thread([this, &proc]{
    		while (m_isStarted)
    		{
    			std::unique_lock <std::mutex> lck(m_mtx);
    			while (m_dataList.empty())
    			{
    				m_cv.wait(lck);
    			}
    			//消费数据data
    			auto data = m_dataList.front();
    			m_dataList.pop_front();
    			lck.unlock();
    			//传出消费数据/通知消费结果
    			if (proc)
    			{
    				proc(data);
    			}
    		}
    	});
    
    	return 0;
    }
    
    int MyFactory::stopConsume()
    {
    	m_isStarted = false;
    	m_consumerThread.join();
    	return 0;
    }
    
    
    #pragma once
    #pragma execution_character_set("utf-8")
    
    #include <iostream>
    #include "my_factory.h"
    
    using namespace std;
    
    int main()
    {
    	cout << "Hello world!" << endl;
    
    
    	MyFactory mf;
    	auto ret = mf.startProduct();
    	if (0 != ret)
    	{
    		return -1;
    	}
    	std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    
    	NofityProc proc = [](const int& data){
    		cout << "data:" << data << endl;
    	};
    	ret = mf.startConsume(proc);
    	if (0 != ret)
    	{
    		return -1;
    	}
    
    	std::this_thread::sleep_for(std::chrono::milliseconds(5000));
    	ret = mf.stopConsume();
    	if (0 != ret)
    	{
    		return -1;
    	}
    
    	std::this_thread::sleep_for(std::chrono::milliseconds(3000));
    	ret = mf.stopProduct();
    	if (0 != ret)
    	{
    		return -1;
    	}
    
    	cout << "return from main()" << endl;
    	std::this_thread::sleep_for(std::chrono::milliseconds(3000));
    	return 0;
    }
    

    image

  • 相关阅读:
    浅析 MySQL Replication(转)
    mysql优化案例
    create index 与 alter table add index 区别
    /etc/sysctl.conf参数解释(转)
    Linux内核 TCP/IP参数调优
    OneProxy常用参数说明
    转载:如何在面试中写出好的代码
    F面经:painting house
    Lintcode: Merge Sorted Array II
    Lintcode: Median
  • 原文地址:https://www.cnblogs.com/rock-cc/p/12724354.html
Copyright © 2011-2022 走看看