正所谓“工欲善其事,必先利其器”, 我们在实现通信设计任务的过程中需要一些基础工具来帮助我们搭建部分基础组件,这些基础工具包括消息队列,线程池,缓冲区抽象,事件循环和日志工具。接下来对这部分基础工具进行描述和实现。
1. 消息队列
1.1 linux消息队列应用
(1)成员函数和数据结构
struct msgbuf{ long mtype; char mtext[1]; } // msgid_ds内核数据结构 struct msgid_ds
// 生成唯一的键 key ftok(const char *pathname, int proj_id) int msgget(key_t key, int msgflg) msgflg是一个标志参数: * IPC_CREATE 如果内核不存在与key相等的消息队列,则创建一个 一个消息队列,如果存在这样的消息队列,返回该消息队列的描述符 * IPC_EXCL 和IPC_CREATE一起使用,如果对应键值的消息队列已 经存在,则出错,返回-1 int msgsnd(int msgid, struct msgbuf* msgp. size_t msgsz, int magflg) * 特别注意第三个参数,可以设置为0或IPC_NOWAIT, 当为0时,消息 已满的时候会阻塞,如果为IPC_NOWAIT,则不等待立即返回,常见错 误码有EAGAIN(消息队列已满),EIDRN(消息队列已被删除) EACCESS(无法访问消息对列)
int msgrcv(int msgid, struct msgbuff* , size_t msgsz, long int msgtype, int msgflag) * msgflg 操作标志位,IPC_NOWAIT(如果没有满足条件的消息立马返回) IPC_EXCEPT
(返回队列中第一个类型不会msgtype的消息),IPC_NOERROR(如果队列中满足条件的消息
内容大于所请求的实际字节数,则把该消息截断,截断部分将被丢弃)
int msgctl(int msgid, int cmd, struct msgid_ds*) *cmd有以下三个参数: *IPC_STAT(获取消息队列对应的msgid_ds数据结构) *IPC_SET(设置消息队列的属性) *IPC_RMID(从内核中删除msgid标识的消息队列)
(2)示例
// linux系统的消息队列使用 #include "stdio.h" #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> #include <stdint.h> #include <errno.h> #include <string> #include <string.h> #include <iostream> using namespace std; const int32_t BUFFSIZE = 256; struct msgBuff { long msgType; char buff[BUFFSIZE]; }; int main() { int32_t proj_id = 32; key_t key = ftok("./messagekey", proj_id); if(-1 == key) { cout << "ftok error" <<endl; } int msgid = msgget(key, IPC_CREAT); if (msgid == -1) { cout << "msgget error" << endl; } msgBuff msg; memset(&msg, 0, sizeof(msgBuff)); msg.msgType = 3; strcpy(msg.buff, "message tset"); cout << msg.buff << endl; int32_t msgLen = sizeof(msgBuff) - sizeof(long); cout << "msgLen:" << msgLen << endl; int nsize; if ((nsize = msgsnd(msgid, &msg, msgLen, 0)) == -1){ cout << strerror(errno) << endl; } return 0; }
// linux系统的消息队列使用 #include "stdio.h" #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> #include <stdint.h> #include <errno.h> #include <string> #include <string.h> #include <iostream> using namespace std; #include <thread> #include <chrono> const int32_t BUFFSIZE = 256; struct msgBuff { long msgType; char buff[BUFFSIZE]; }; int main() { int32_t proj_id = 32; key_t key = ftok("./messagekey", proj_id); if(-1 == key) { cout << "ftok error" <<endl; } int msgid = msgget(key, IPC_CREAT); if (msgid == -1) { cout << "msgget error" << endl; } msgBuff msg; memset(&msg, 0, sizeof(msgBuff)); int32_t msgLen = sizeof(msgBuff) - sizeof(long); int nsize; while(1) { if (msgrcv(msgid, &msg, msgLen, 0, 0) == -1){ cout << strerror(errno) << endl; } cout << msg.msgType << "--" << msg.buff <<endl; std::this_thread::sleep_for(std::chrono::milliseconds (2000)); } return 0; }
1.2 自定义消息队列实现
(1)类图
(2)代码实现
#pragma once #include <stdint.h> class Message { public: struct Type { enum{Stop = 0}; }; Message() { } Message(int32_t type) :_type(type) { } virtual ~Message() { } int32_t GetType() const { return _type; } void SetType(int32_t type) { _type = type; } private: int32_t _type; };
/* @线程安全的消息队列 */ #pragma once #include <mutex> #include <memory> #include <queue> #include <condition_variable> #include <iostream> using namespace std; template <class MSG> class MessageQueue { public: MessageQueue():_mutex(), _condition(), _queue(){} MessageQueue(const MessageQueue&) = delete; const MessageQueue& operator=(const MessageQueue&) = delete; virtual ~MessageQueue(){}; void Push(const MSG& msg) { std::lock_guard<std::mutex> lock(_mutex); _queue.push(msg); cout << "add msg" << endl; _condition.notify_one(); } bool Pop(MSG& msg, bool isBlocked = true) { std::unique_lock<std::mutex> lock(_mutex); if(isBlocked) { while(_queue.empty()) { cout << "block state, MessageQueue is empty,please wait..." << endl; _condition.wait(lock); } } else { if(IsEmpty()) return false; } msg = std::move(_queue.front()); _queue.pop(); return true; } bool IsEmpty() { std::lock_guard<std::mutex> lock(_mutex); return _queue.empty(); } int32_t Size() { std::lock_guard<std::mutex> lock(_mutex); return _queue.size(); } private: std::mutex _mutex; std::condition_variable _condition; std::queue<MSG> _queue; };
注意:unique_lock和lock_guard的使用,unique_lock不像lock_guard只能在析构时才释放锁,它可以随时释放锁,因此在wait时让unique_lock释放锁从语义上看更加准确,其次要防止死锁情况的出现,不要锁里面再等待锁
2. 线程池
不多说,直接上代码
#pragma once #include <thread> #include <functional> #include <stdint.h> #include <vector> #include <iostream> using namespace std; #include "MessageQueue.h" #define MIN_THREADS 3 template <typename Type> class ThreadPool{ ThreadPool(const ThreadPool&) = delete; ThreadPool& operator=(const ThreadPool&) = delete; public: ThreadPool(int32_t threads, std::function<void(Type &record)> handler); virtual ~ThreadPool(); void Submit(Type record); private: bool _shutdown; int32_t _threads; std::function<void(Type &record)> _handler; std::vector<std::thread> _workers; MessageQueue<Type> _tasks; }; template <typename Type> ThreadPool<Type>::ThreadPool(int32_t threads, std::function<void(Type &record)> handler) :_shutdown(false), _threads(threads), _handler(handler), _workers(), _tasks() { if(_threads < MIN_THREADS) { _threads = MIN_THREADS; } for(int32_t i = 0; i < _threads; i++) { std::thread workThread([this]{ while(!_shutdown) { Type record; bool ret = _tasks.Pop(record, true); _handler(record); } }); workThread.detach(); _workers.emplace_back(std::move(workThread)); } } template <typename Type> ThreadPool<Type>::~ThreadPool() { for(std::thread &worker : _workers) { worker.join(); } } template <typename Type> void ThreadPool<Type>::Submit(Type record) { _tasks.Push(record); }
3. 事件循环
以日志工具为例,为了实现高性能的日志工具,我们必须确保日志I/O全部处于一个独立线程,而不会影响后续的操作,因此,实际上日志记录就是其它线程向日志线程发送日志消息,这样一来,事件循环模型就变得非常必要。
3.1 类图设计
3.2 关键代码实现
(1)ByteArray
#pragma once #include <vector> #include <string> #include <cstring> class ByteArray : public std::vector<char> { public: ByteArray() = default; ByteArray(int32_t size) : std::vector<char>(size) { } ByteArray(const char *buffer, int32_t size) : std::vector<char>(buffer, buffer + size) { } ByteArray(const std::string &str) : std::vector<char>(str.size()) { memcpy(data(), str.c_str(), str.size()); } std::string ToStdString() const { std::string result(this->cbegin(), this->cend()); return result; } ByteArray &Concat(const ByteArray &buffer2) { size_t oldSize = size(); size_t newSize = oldSize + buffer2.size(); resize(newSize); memcpy(this->data() + oldSize, buffer2.data(), buffer2.size()); return *this; } ByteArray operator+(const ByteArray &buffer2) const { ByteArray buffer1(this->size() + buffer2.size()); memcpy(buffer1.data(), this->data(), this->size()); memcpy(buffer1.data() + this->size(), buffer2.data(), buffer2.size()); return buffer1; } };
(2)IStream
#pragma once #include "ByteArray.h" #include <functional> class IStream { public: typedef std::function<void(const char* buf, int64_t size)> DataIndicationHandler; virtual int32_t Receive(char* buffer, int32_t bufferSize, int32_t& readSize) = 0; virtual int32_t Send(const ByteArray& byteArray) = 0; virtual void OnDataIndication(DataIndicationHandler handler) = 0; virtual DataIndicationHandler GetDataIndication() = 0; };
(3) BaseEvent
#pragma once #include "ByteArray.h" #include "IStream.h" class BaseEvent { public: BaseEvent() { } BaseEvent(const std::string &type, const ByteArray &data, IStream *stream) : _type(type), _data(data), _stream(stream) { } void SetData(const ByteArray &data) { _data = data; } const ByteArray &GetData() const { return _data; } void SetType(const std::string &type) { _type = type; } const std::string &GetType() const { return _type; } void SetStream(IStream *stream) { _stream = stream; } IStream *GetStream() const { return _stream; } private: std::string _type; ByteArray _data; IStream* _stream; };
(4)EventQueue
#pragma once #include "BaseEvent.h" #include <memory> #include <mutex> #include <condition_variable> #include <chrono> class EventQueue { public: EventQueue(int timeout = 0) : _timeout(timeout) { } void PostEvent(BaseEvent *event) { std::unique_lock <std::mutex> locker(_mutex); _events.push_back(std::shared_ptr<BaseEvent>(event)); } std::shared_ptr <BaseEvent> GetEvent() { std::unique_lock <std::mutex> locker(_mutex); if (_events.empty()) { if (_timeout == 0) { return nullptr; } _waitCondition.wait_for(locker, std::chrono::milliseconds(_timeout)); } if (!_events.empty()) { std::shared_ptr <BaseEvent> event = _events.front(); _events.erase(_events.begin()); return event; } return nullptr; } private: std::vector <std::shared_ptr<BaseEvent>> _events; std::mutex _mutex; std::condition_variable _waitCondition; // ms int _timeout; };
(5)Loop
#pragma once class Loop { public: void Start() { _Run(); } virtual ~Loop(){} private: virtual void _Run() = 0; }
(6)EventQueueLoop
#pragma once #include "Loop.h" #include "EventQueue.h" #include <memory> class EventQueueLoop : public Loop { public: EventQueueLoop(EventQueue *queue); protected: virtual void _Run(); virtual void OnEvent(std::shared_ptr <BaseEvent> event) = 0; private: EventQueue *_queue; };
#include "EventQueueLoop.h" EventQueueLoop::EventQueueLoop(EventQueue *queue) : _queue(queue) { } void EventQueueLoop::_Run() { while (true) { std::shared_ptr <BaseEvent> event = _queue->GetEvent(); if (!event) { continue; } OnEvent(event); } }
4. 基于消息队列的日志实现
4.1 日志优先级
const std::string PRIORITY_STRING[] = { "DEBUG", "CONFIG", "INFO", "WARNING", "ERROR" };
4.2 日志格式
void Logger::WriteLog(Priority priority, const std::string &log) { if (priority < _priority) return; std::stringstream stream; stream << HurricaneUtils::GetCurrentTimeStamp() << " [" << PRIORITY_STRING[priority] << "] " << log; _queue.Push(stream.str()); }
std::string GetCurrentTimeStamp() { // get current time auto currentTime = std::chrono::system_clock::now(); // get milliseconds auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(currentTime.time_since_epoch()) % 1000; auto currentTimePoint = std::chrono::system_clock::to_time_t(currentTime); // output the time stamp std::ostringstream stream; #if (defined(WIN32) || defined(_WIN32) || defined(__WIN32__)) && !defined(__MINGW32__) stream << std::put_time(std::localtime(¤tTimePoint), "%T"); #else char buffer[80]; auto success = std::strftime(buffer, 80, "%T", std::localtime(¤tTimePoint)); // %T显示时分秒 assert(0 != success); stream << buffer; #endif stream << '.' << std::setfill('0') << std::setw(3) << milliseconds.count(); return stream.str(); }
4.3 代码实现
/** * licensed to the apache software foundation (asf) under one * or more contributor license agreements. see the notice file * distributed with this work for additional information * regarding copyright ownership. the asf licenses this file * to you under the apache license, version 2.0 (the * "license"); you may not use this file except in compliance * with the license. you may obtain a copy of the license at * * http://www.apache.org/licenses/license-2.0 * * unless required by applicable law or agreed to in writing, software * distributed under the license is distributed on an "as is" basis, * without warranties or conditions of any kind, either express or implied. * see the license for the specific language governing permissions and * limitations under the license. */ #pragma once #include "MessageQueue.h" #include <memory> #include <thread> #include <queue> #include <string> #include <fstream> enum Priority { DEBUG, STATE, INFO, WARNING, FAULT }; class Logger { Logger &operator=(const Logger &) = delete; Logger(const Logger &other) = delete; public: static Logger *Get(); void SetPriority(Priority priority); Priority GetPriority(); void WriteLog(Priority priority, const std::string &log); private: Logger(Priority priority); virtual ~Logger(); void _InitializeFileStream(); void _WriteThread(); std::string GetCurrentTimeStamp(); private: MessageQueue <std::string> _queue; std::ofstream *_fileStream; Priority _priority; bool _shutdown; }; #define TRACE_DEBUG(LOG_CONTENT) Logger::Get()->WriteLog(DEBUG, LOG_CONTENT); #define TRACE_STATE(LOG_CONTENT) Logger::Get()->WriteLog(STATE, LOG_CONTENT); #define TRACE_INFO(LOG_CONTENT) Logger::Get()->WriteLog(INFO, LOG_CONTENT); #define TRACE_WARNING(LOG_CONTENT) Logger::Get()->WriteLog(WARNING, LOG_CONTENT); #define TRACE_ERROR(LOG_CONTENT) Logger::Get()->WriteLog(FAULT, LOG_CONTENT);
/** * licensed to the apache software foundation (asf) under one * or more contributor license agreements. see the notice file * distributed with this work for additional information * regarding copyright ownership. the asf licenses this file * to you under the apache license, version 2.0 (the * "license"); you may not use this file except in compliance * with the license. you may obtain a copy of the license at * * http://www.apache.org/licenses/license-2.0 * * unless required by applicable law or agreed to in writing, software * distributed under the license is distributed on an "as is" basis, * without warranties or conditions of any kind, either express or implied. * see the license for the specific language governing permissions and * limitations under the license. */ #include "logger.h" #include <iostream> #include <sstream> #include <iomanip> const std::string PRIORITY_STRING[] = { "DEBUG", "CONFIG", "INFO", "WARNING", "ERROR" }; Logger *Logger::Get() { static Logger logger(DEBUG); return &logger; } Logger::Logger(Priority priority) : _queue(), _fileStream(nullptr), _shutdown(false) { _priority = priority; _InitializeFileStream(); auto func = std::bind(&Logger::_WriteThread, this); std::thread writeThread(func); writeThread.detach(); } Logger::~Logger() { _shutdown = true; if (nullptr != _fileStream) { _fileStream->close(); delete _fileStream; _fileStream = nullptr; } } std::string Logger::GetCurrentTimeStamp() { // get current time auto currentTime = std::chrono::system_clock::now(); // get milliseconds auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(currentTime.time_since_epoch()) % 1000; auto currentTimePoint = std::chrono::system_clock::to_time_t(currentTime); // output the time stamp std::ostringstream stream; char buffer[80]; auto success = std::strftime(buffer, 80, "%T", std::localtime(¤tTimePoint)); // %T显示时分秒 stream << buffer; //assert(0 != success); //stream << '.' << std::setfill('0') << std::setw(3) << milliseconds.count(); return stream.str(); } void Logger::SetPriority(Priority priority) { _priority = priority; } Priority Logger::GetPriority() { return _priority; } void Logger::_InitializeFileStream() { // Prepare fileName std::string fileName = "./logger.log"; // Initialize file stream _fileStream = new std::ofstream(); std::ios_base::openmode mode = std::ios_base::out; mode |= std::ios_base::trunc; _fileStream->open(fileName, mode); // Error handling if (!_fileStream->is_open()) { // Print error information std::ostringstream ss_error; ss_error << "FATAL ERROR: could not Open log file: [" << fileName << "]"; ss_error << " std::ios_base state = " << _fileStream->rdstate(); std::cerr << ss_error.str().c_str() << std::endl << std::flush; // Cleanup _fileStream->close(); delete _fileStream; _fileStream = nullptr; } } void Logger::WriteLog(Priority priority, const std::string &log) { if (priority < _priority) return; std::stringstream stream; stream << GetCurrentTimeStamp() << " [" << PRIORITY_STRING[priority] << "] " << log; _queue.Push(stream.str()); } void Logger::_WriteThread() { while (!_shutdown) { std::string log; _queue.Pop(log, true); //std::cout << log << std::endl; if (_fileStream) *_fileStream << log << std::endl; } }
测试:
#include "stdio.h" #include <thread> #include <chrono> #include "../logger.h" int main() { TRACE_DEBUG("Logger Debug test"); while(1) { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } return 0; }