贴下来学习,转自http://www.cnblogs.com/zhiranok/archive/2012/05/13/cpp_multi_thread.html
#ifndef _TASK_QUEUE_IMPL_H_ #define _TASK_QUEUE_IMPL_H_ #include<pthread.h> #include <list> #include <stdexcept> using namespace std; #include "task_queue_i.h" #include "lock.h" namespace ff { class task_queue_t: public task_queue_i { public: task_queue_t(): m_flag(true), m_cond(m_mutex) { } ~task_queue_t() { } void close() { lock_guard_t lock(m_mutex); m_flag = false; m_cond.broadcast(); } void multi_produce(const task_list_t& task_) { lock_guard_t lock(m_mutex); bool need_sig = m_tasklist.empty(); for(task_list_t::const_iterator it = task_.begin(); it != task_.end(); ++it) { m_tasklist.push_back(*it); } if (need_sig) { m_cond.signal(); } } void produce(const task_t& task_) { lock_guard_t lock(m_mutex); bool need_sig = m_tasklist.empty(); m_tasklist.push_back(task_); if (need_sig) { m_cond.signal(); } } int consume(task_t& task_) { lock_guard_t lock(m_mutex); while (m_tasklist.empty()) { if (false == m_flag) { return -1; } m_cond.wait(); } task_ = m_tasklist.front(); m_tasklist.pop_front(); return 0; } int run() { task_t t; while (0 == consume(t)) { t.run(); } return 0; } int consume_all(task_list_t& tasks_) { lock_guard_t lock(m_mutex); while (m_tasklist.empty()) { if (false == m_flag) { return -1; } m_cond.wait(); } tasks_ = m_tasklist; m_tasklist.clear(); return 0; } int batch_run() { task_list_t tasks; int ret = consume_all(tasks); while (0 == ret) { for (task_list_t::iterator it = tasks.begin(); it != tasks.end(); ++it) { (*it).run(); } tasks.clear(); ret = consume_all(tasks); } return 0; } private: volatile bool m_flag; task_list_t m_tasklist; mutex_t m_mutex; condition_var_t m_cond; }; class task_queue_pool_t { typedef task_queue_i::task_list_t task_list_t; typedef vector<task_queue_t*> task_queue_vt_t; static void task_func(void* pd_) { task_queue_pool_t* t = (task_queue_pool_t*)pd_; t->run(); } public: static task_t gen_task(task_queue_pool_t* p) { return task_t(&task_func, p); } public: task_queue_pool_t(int n): m_index(0) { for (int i = 0; i < n; ++i) { task_queue_t* p = new task_queue_t(); m_tqs.push_back(p); } } void run() { task_queue_t* p = NULL; { lock_guard_t lock(m_mutex); if (m_index >= (int)m_tqs.size()) { throw runtime_error("too more thread running!!"); } p = m_tqs[m_index++]; } p->batch_run(); } ~task_queue_pool_t() { task_queue_vt_t::iterator it = m_tqs.begin(); for (; it != m_tqs.end(); ++it) { delete (*it); } m_tqs.clear(); } void close() { task_queue_vt_t::iterator it = m_tqs.begin(); for (; it != m_tqs.end(); ++it) { (*it)->close(); } } size_t size() const { return m_tqs.size(); } task_queue_i* alloc(long id_) { return m_tqs[id_ % m_tqs.size()]; } task_queue_i* rand_alloc() { static unsigned long id_ = 0; return m_tqs[++id_ % m_tqs.size()]; } private: mutex_t m_mutex; task_queue_vt_t m_tqs; int m_index; }; } #endif