1 使用boost asio网络库 可参照boost asio的文档示例 (异步 同步) 可参考 boost asio 一个聊天的基本框架 asio的网络通讯代码练手
2 使用智能指针 bind function 简化编程 提升效率 参考本博客其他文章 可参考虚函数与bind 实现设计模式的练习
c++11 stl 学习之 shared_ptr c++智能指针(1) c++智能指针(2)
3 设计模式 工厂模式创建通讯包 boost factory 可参考 设计模式 工厂模式 使用shared_ptr
4 避免粘包 包头标记包长度 再发送包 可参考 网络传输 buf 封装 示例代码
5 线程池 io_service_pool调度算法可参考c++11 线程池学习笔记 (二) 线程池
6 消息队列 发送接收与处理解耦
1 #pragma once 2 3 #include <boost/bind.hpp> 4 #include <boost/function.hpp> 5 #include <boost/thread.hpp> 6 #include "job_queue.h" 7 #include <iostream> 8 9 using namespace std; 10 11 template<typename Queue> 12 class worker { 13 public: 14 typedef Queue queue_type; 15 typedef typename Queue::job_type job_type; 16 typedef boost::function<bool(job_type&)> func_type; 17 public: 18 template<typename Func> 19 worker(queue_type& q, Func func, int n = 1); 20 21 worker(queue_type& q, int n = 1); 22 23 template<typename Func> 24 void set_workerFunc(Func func) { 25 m_func = func; 26 } 27 public: 28 void start(); 29 void run(); 30 void stop(); 31 private: 32 void do_work(); 33 queue_type & m_queue; 34 func_type m_func; 35 int m_nThreadNum; 36 boost::thread_group m_threads; 37 }; 38 39 template<typename Queue> 40 template<typename Func> 41 worker<Queue>::worker(typename worker<Queue>::queue_type& q, 42 Func func, int n ) : m_queue(q), m_func(fuunc), m_nThreadNum(n) { 43 if (m_nThreadNum < 1) { 44 m_nThreadNum = 1; 45 } 46 } 47 48 template<typename Queue> 49 void worker<Queue>::do_work() { 50 while (true) { 51 job_type job = m_queue.pop(); 52 if (!m_func) { 53 break; 54 } 55 else { 56 m_func(job); 57 } 58 } 59 } 60 61 template<typename Queue> 62 void worker<Queue>::start() { 63 if (!m_func) { 64 return; 65 } 66 67 if (m_threads.size() > 0) { 68 return; 69 } 70 71 for (int i = 0; i < m_nThreadNum; ++i) { 72 m_nThreads.create_thread(boost::bind(&worker::do_work, this)); 73 } 74 } 75 76 template<typename Queue> 77 void worker<Queue>::run() { 78 start(); 79 m_threads.join_all(); 80 } 81 82 template<typename Queue> 83 void worker<Queue>::stop() { 84 m_func = nullptr; 85 m_queue.stop(); 86 }
1 #pragma once 2 3 #include <queue> 4 #include <boost/noncopyable.hpp> 5 #include <boost/utility/value_init.hpp> 6 #include <boost/thread.hpp> 7 #include <boost/concept_check.hpp> 8 9 template<typename Job> 10 class job_queue :boost::noncopyable { 11 public: 12 typedef Job Job_type; 13 typedef std::queue<Job_type> queue_type; 14 typedef boost::mutex mutex_type; 15 typedef typename mutex_type::scoped_lock lock_type; 16 typedef boost::condition_variable_any condition_type; 17 18 BOOST_CONCEPT_ASSERT((boost::SGIAssignable<Job_type>)); 19 BOOST_CONCEPT_ASSERT((boost::DefaultConstructible<Job_type>)); 20 public: 21 job_queue(); 22 void push(const Job_type& x); 23 Job_type pop(); 24 void stop(); 25 private: 26 queue_type m_queue; 27 mutex_type m_mutex; 28 condition_type m_hasJob; 29 bool m_stop_flag; 30 }; 31 32 template<typename Job> 33 job_queue<Job>::job_queue() :m_stop_flag(false) { 34 35 } 36 37 template<typename Job> 38 void job_queue<Job>::push(const Job_type& x) { 39 lock_type lock(m_mutex); 40 m_queue.push_back(x); 41 m_hasJob.notify_one(); 42 } 43 44 template<typename Job> 45 typename job_queue<Job>::Job_type job_queue<Job>::pop() { 46 lock_type lock(m_mutex); 47 while (m_queue.empty() && !m_stop_flag) { 48 m_hasJob.wait(m_mutex); 49 } 50 51 if (m_stop_flag) { 52 return boost::initialized_value; 53 } 54 55 if (m_queue.empty()) { 56 return boost::initialized_value; 57 } 58 59 Job_type tmp = m_queue.front(); 60 m_queue.pop_front(); 61 return tmp; 62 } 63 64 65 template<typename job> 66 void job_queue<job>::stop() { 67 m_stop_flag = true; 68 m_hasJob.notify_all(); 69 }
一个简单的多线程互斥设置变量类
1 #pragma once 2 #include <mutex> 3 #include <shared_mutex> 4 5 6 template<typename T> 7 class safe_atom { 8 public: 9 typedef std::shared_mutex mutex_t; 10 safe_atom(T v); 11 void setValue(const T& v); 12 void getValue(T& v); 13 private: 14 mutex_t m_mu; 15 T value; 16 }; 17 18 template<typename T> 19 safe_atom<T>::safe_atom::safe_atom(T v) : 20 value(v) {} 21 22 template<typename T> 23 void safe_atom<T>::setValue(const T& v) { 24 std::unique_lock<std::shared_mutex> lck(m_mu); 25 value = v; 26 } 27 28 template<typename T> 29 void safe_atom<T>::getValue(T& v) { 30 std::shared_lock<std::shared_mutex> lck(m_mu); 31 v = value; 32 }
7 线程调度
8 序列化(目前基本不用自己实现 可以考虑使用 protobuf与json 或者XML)
可参考google protobuf VC下的使用笔记 rapidjson 的封装学习 rapidjson 的练习
boost 序列化 例子
1 #pragma once 2 3 typedef enum empakcet_type { 4 PACKET_TEST = 0x0800, 5 }packet_type;
1 #pragma once 2 3 #include <iostream> 4 #include <strstream> 5 #include <sstream> 6 7 #include <boost/serialization/serialization.hpp> 8 #include <boost/archive/text_iarchive.hpp> 9 #include <boost/archive/text_oarchive.hpp> 10 #include <boost/serialization/vector.hpp> 11 #include <boost/serialization/export.hpp> 12 #include <boost/iostreams/stream.hpp> 13 14 #include "packet_type.h" 15 16 using namespace boost::archive; 17 18 #define _SERIALIZATION_PACKET() 19 public: 20 virtual void serial(text_oarchive& oa) { 21 oa << *this; 22 } 23 virtual void unserial(text_iarchive& ia) { 24 ia >> *this; 25 } 26 int get_packet_type() { 27 return static_cast<int>(m_packet_type); 28 } 29 protected: 30 friend boost::serialization::access; 31 32 33 34 35 class serial_packet { 36 _SERIALIZATION_PACKET() 37 protected: 38 39 template<typename Archive> 40 void serialize(Archive& ar, const unsigned int version) { 41 ar& m_packet_type; 42 } 43 44 45 serial_packet(packet_type pkt_type) :m_packet_type(pkt_type) { 46 47 } 48 packet_type m_packet_type; 49 };
1 #pragma once 2 3 #include "serial_packet.h" 4 5 6 class test_packet :public serial_packet { 7 _SERIALIZATION_PACKET() 8 public: 9 test_packet() :serial_packet(PACKET_TEST) {} 10 private: 11 template<typename Archive> 12 void serialize(Archive& ar, const unsigned int version) { 13 ar& m_nTestNo & m_strName & m_strPwd & m_vStr; 14 } 15 public: 16 int m_nTestNo; 17 std::string m_strName; 18 std::string m_strPwd; 19 std::vector<std::string> m_vStr; 20 }; 21 22 void print(const test_packet& packet) { 23 std::cout << packet.m_strName << std::endl; 24 std::cout << packet.m_strPwd << std::endl; 25 std::cout << packet.m_nTestNo << std::endl; 26 27 for (int i = 0; i < (int)packet.m_vStr.size(); ++i) { 28 std::cout << packet.m_vStr[i] << std::endl; 29 } 30 } 31 32 void test_mypacket() { 33 test_packet set; 34 set.m_strName = "hello"; 35 set.m_strPwd = "world!"; 36 set.m_nTestNo = 9999; 37 38 set.m_vStr.push_back("a"); 39 set.m_vStr.push_back("b"); 40 set.m_vStr.push_back("c"); 41 set.m_vStr.push_back("c"); 42 set.m_vStr.push_back("d"); 43 const int SEND_BUFFER = 100; 44 char buf[SEND_BUFFER]; 45 memset(buf, 0, SEND_BUFFER); 46 47 std::ostrstream sa(buf, SEND_BUFFER); 48 boost::archive::text_oarchive oa(sa); 49 set.serial(oa); 50 51 test_packet get; 52 std::stringstream ss(buf); 53 text_iarchive ia(ss); 54 55 get.unserial(ia); 56 print(get); 57 }
#include "test_packet.h"
int main()
{
test_mypacket();
return 0;
}
//===============================================================
9 计时器 可参考 boost timer代码学习笔记