参考周伟明前辈的《多任务下的数据结构与算法》实现的自定义的消息队列。
0x01 消息队列机制
消息队列是 一 种 可 以 有 多 个 任 务 同 时 问 队 列 里 发 送 和 接收 数 据 的 队 列 。 要 实 现 多个 任 务 同 时 向 队 列 里 收 发 数 据 , 那 么 必 须 在 收 发 操 作 上 加 上 俣 护 。 另 外 还 要 分 别 分析 队 列 为 空 和 队 列 为 满 的 情 况 。
(1) . 队 列 为 空 的 情 况
此 时 向 队 列 发 送 数 据 是 没 有 问题的 , 如 果 此 时 要 从 队 列 接 收 数 据 , 由 于 队 列 中 暂无 数 据 , 接 收 任 务 必 须 等 数 据 发 送 到 队 列 中 去 , 才 能 从 队 列 中 接 收 到 数 据 。
(2), 队 列 为 满 的 情 况
此 时 要 从 队 列 中 接 收 数 据 是 没 有 问题 的 , 如 吴 此 时 向 队 列 发 送 数 据 , 由 于 队列为满 , 发 送 任 务 必 须 等 待 其 他 任 务 从 队 列 里 将 数 据 接 收 出 去 , 才 能 将 数 据 发 送 到 队 列 中 。当 然 也 可 以 不 限 制 队 列 的 大 小 , 也 就 不 需 要 考 虑 队 列 满 的 情 况 了 , 不 过 要 保 证 发 送 到
队 列 里 的 数 据 数 量 的 上 界 值 不 会 超 过 系 统 的 负 荷 , 特 别 是 网 络 软 件 , 如 果 接 收 网 络 数据 放 人 消 息 队 列 中 , 而 不 设 置 队 列 大 小 限 就 很 容 易 受 到 攻 击 。
消息队列实现框架:
(1).自定义构造消息,队列相关数据结构
(2).一个线程负责依次从消息队列中取出消息,并处理该消息
(3).多个线程产生事件,并将消息放进消息队列,等待处理
0x02 消息队列构造(代码)
1.自定义消息队列结构体
(1)消 息 队 列 中 , 需 要 一 个 计 数 器 来 记 录 队 列 中 元 素 的 个 数 ,而且当计数为0时,要将接收 操 作 阻 塞 , 由 于 Semaphore 信 号 量 刚 好 有 这 个 特 性 ,因此可以使用信号量来实现计数功能。
(2)使用双向链表构造队列
typedef struct _MSG_QUEUE_{
PTASK_NODE TaskNode; /* 多任务链表 */ HANDLE SemaphoreHandleHandle; /* 处理队列为空情况的计数信号量*/ UINT MaxLength; /* 队列的最大长度 */ }MSG_QUEUE,*PMSG_QUEUE; typedef struct _TASK_NODE_ { PDOUBLE_LIST DoubleList; /* 双向链表指针 */ PEXIT_TASK ExitTask; /* 多任务退出结构指针 */ }TASK_NODE,*PTASK_NODE; typedef struct _EXIT_TASK_ { HANDLE MutexHandle; /* 操作锁 */ UINT ExitFlag; /* 退出标志 */ }EXIT_TASK,*PEXIT_TASK; typedef struct _DOUBLE_LIST_ { PDOUBLE_NODE HeadNode; /* 第1个节点的指针 */ PDOUBLE_NODE TailNode; /* 最后1个节点的指针 */ UINT NodeCount; /* 保存链表节点的个数 */ }DOUBLE_LIST, *PDOUBLE_LIST; typedef struct _DOUBLE_NODE_ { struct _DOUBLE_NODE_ *Flink; /* 下一个节点指针 */ struct _DOUBLE_NODE_ *Blink; /* 前一个节点指针 */ void *BufferData; /* 数据指针 */ }DOUBLE_NODE,*PDOUBLE_NODE;
(2)发送消息,接受消息操作
1>发送消息只需将数据加入到链表当中,计数加一。
进行插入数据操作时,使用Mutex互斥体来实现多线程的安全性,具体插入数据操作:malloc新建一个节点,将数据插入到新节点的BufferData成员上,再将新节点的指向下一个节点的指针赋为空NULL,指向上一个节点的指针去指向尾节点(每次插入到消息队列的尾部),
NewNode->Flink = NULL; NewNode->Blink = DoubleList->TailNode;
最后判断尾节点是否为空,如果为空表示原来链表中没有节点,此时的新节点既是头节点也是尾节点,应该将双向链表的头节点指针和尾节点指针都指向当前的新节点。
DoubleList->HeadNode = NewNode; DoubleList->TailNode = NewNode;
如果不为空表示原来链表中有节点,则将尾节点下一节点指针指向新加入的 节点,并且尾节点指针也应该指向新节点
DoubleList->TailNode->Flink = NewNode; DoubleList->TailNode = NewNode;
BOOL MsgQueue_Entern(PMSG_QUEUE MsgQueue, void* BufferData) { BOOL IsOk; IsOk = TaskList_Entern(MsgQueue->TaskNode, BufferData); //由于是Queue所以采用 Tail插入 //限制入队情况 SeReleaseSemaphore(MsgQueue->SemaphoreHandleHandle, 1); /* 将计数加1 */ return IsOk; } BOOL TaskList_Entern(PTASK_NODE TaskNode, void* BufferData) { BOOL IsOk; SeWaitForSingleObject(TaskNode->ExitTask->MutexHandle); IsOk = DoubleList_Entern(TaskNode->DoubleList, BufferData); SeReleaseMutex(TaskNode->ExitTask->MutexHandle); return IsOk; } BOOL DoubleList_Entern(PDOUBLE_LIST DoubleList, void* BufferData) { PDOUBLE_NODE NewNode = NULL; /* 参数校验 */ if ( DoubleList == NULL || BufferData == NULL ) { return FALSE; } /* 新建一个节点 */ NewNode = (PDOUBLE_NODE)malloc( sizeof(DOUBLE_NODE)); if ( NewNode == NULL ) { return FALSE; } NewNode->BufferData = BufferData; /* 将节点数据指针指向传进来的数据 */ NewNode->Flink = NULL; /* 将节点的下一节点赋为空指针NULL */ NewNode->Blink = DoubleList->TailNode; /* * 判断是否尾节点指针为空,如果为空表示原来链表中没有节点, * 此时应该将尾节点指向新加入的节点, 并且头节点指针也应该指向新节点 */ if ( DoubleList->TailNode == NULL ) { DoubleList->HeadNode = NewNode; } else { /* * 如果尾节点指针不为空,此时应该将尾节点下一节点指针指向新加入的 * 节点,并且尾节点指针也应该指向新节点 */ DoubleList->TailNode->Flink = NewNode; } DoubleList->TailNode = NewNode; /* 将链表节点数据加1 */ DoubleList->NodeCount++; return TRUE; }
2>接 收 消息 需 要 将 计 数 减 I, 并 且 当 计 数 为 0 时 要 阻 塞 接 收 操 作 ,直 到 有 数 据 到 来 才从队列(链表)中将数据取出来。
删除节点前,先将信号量计数减1,信号计数为0则会阻塞住删除结点操作。
然后将要弹出数据的节点指针指向链表头节点,弹出数据指针指向头节点的数据:
PopNode = DoubleList->HeadNode; BufferData = PopNode->BufferData;
删除节点时,将头节点指针指向头节点的下一节点(第二个节点成为新的头节点),如果下一个节点存在的话(当前的新头节点不为空的话),将新头节点的Blink指针置为NULL。
DoubleList->HeadNode->Blink = NULL;
最后链表节点数量减1,并判断链表的节点数量是否为0,如果链表的节点数量已经为0则表示原来只有一个节点,弹出头节点后,链表已经为空,没有节点在里面,此时应该将尾节点指针赋空
DoubleList->TailNode = NULL;
void* MsgQueue_Leave(PMSG_QUEUE MsgQueue) { SeWaitForSingleObject(MsgQueue->SemaphoreHandleHandle); /* 将计数减1,计数为0则会阻塞住 */ return TaskList_Leave(MsgQueue->TaskNode); } void * TaskList_Leave(PTASK_NODE TaskNode) { void * BufferData; SeWaitForSingleObject(TaskNode->ExitTask->MutexHandle); BufferData = DoubleList_Leave(TaskNode->DoubleList); SeReleaseMutex(TaskNode->ExitTask->MutexHandle); return BufferData; } void* DoubleList_Leave(PDOUBLE_LIST DoubleList) { PDOUBLE_NODE PopNode; /* 用来指向要弹出数据的节点的指针 */ void* BufferData; /* 用来指向要弹出的数据的指针 */ /* 参数校验 */ if ( DoubleList == NULL || DoubleList->HeadNode == NULL ) { return NULL; } /* 将要弹出数据的节点指针指向链表头节点,弹出数据指针指向头节点的数据 */ PopNode = DoubleList->HeadNode; BufferData = PopNode->BufferData; /* 将头节点指针指向头节点的下一节点 */ DoubleList->HeadNode = DoubleList->HeadNode->Flink; if ( DoubleList->HeadNode != NULL ) { DoubleList->HeadNode->Blink = NULL; } /* 将链表节点数量减1 */ DoubleList->NodeCount--; /* 如果链表的节点数量已经为0则表示原来只有一个节点,弹出头节点后, * 此时链表已经为空,没有节点在里面,此时应该将尾节点指针赋空 * 当前节点指针由于前面已经处理过了,如果只有一个节点的话肯定为空 * 所以这里不需要处理当前节点指针 */ if ( DoubleList->NodeCount == 0 ) { DoubleList->TailNode = NULL; } /* 释放弹出的节点, 注意这里并没有释放节点数据指针 */ free( PopNode ); return BufferData; /* 返回头节点的数据指针 */ }
3.消息队列的创造与释放
1>消息队列的创造
消 息 队 列 创 建 编 比 较 简 单 , 只 是 申 请 一 个 MSG_QUEUE 结 构 体 , 并 初 化 结 构体 成 员 , 然 后 返 创 建 时 , 信号量的 初 始 计 数 要 设 为 0 , 因 为 这 时 队 列 是 空。
1 PMSG_QUEUE MsgQueue_Create(INT MaxLength) 2 { 3 PMSG_QUEUE MsgQueue = NULL; 4 5 MsgQueue = (PMSG_QUEUE)malloc(sizeof(MSG_QUEUE)); 6 if (MsgQueue != NULL) 7 { 8 MsgQueue->SemaphoreHandleHandle = SeCreateSemaphore(0, MaxLength); 9 if (MsgQueue->SemaphoreHandleHandle != NULL) 10 { 11 MsgQueue->MaxLength = MaxLength; 12 MsgQueue->TaskNode = TaskList_Create(); 13 if (MsgQueue->TaskNode == NULL) 14 { 15 SeCloseHandle(MsgQueue->SemaphoreHandleHandle); 16 free(MsgQueue); 17 MsgQueue = NULL; 18 } 19 } 20 else 21 { 22 free(MsgQueue); 23 MsgQueue = NULL; 24 } 25 } 26 return MsgQueue; 27 } 28 PTASK_NODE TaskList_Create(void) 29 { 30 PTASK_NODE TaskNode = NULL; 31 TaskNode = (PTASK_NODE)malloc(sizeof(TASK_NODE)); 32 if (TaskNode != NULL) 33 { 34 TaskNode->DoubleList = DoubleList_Create(); 35 if (TaskNode->DoubleList != NULL) 36 { 37 TaskNode->ExitTask = ExitTask_Create(); 38 if (TaskNode->ExitTask == NULL) 39 { 40 free(TaskNode->DoubleList); 41 free(TaskNode); 42 TaskNode = NULL; 43 } 44 } 45 else 46 { 47 free(TaskNode); 48 TaskNode = NULL; 49 } 50 } 51 return TaskNode; 52 } 53 PDOUBLE_LIST DoubleList_Create(void) 54 { 55 PDOUBLE_LIST DoubleList = NULL; 56 57 /* 分配内存操作 */ 58 DoubleList = (PDOUBLE_LIST )malloc(sizeof(DOUBLE_LIST)); 59 if (DoubleList != NULL) 60 { 61 /* 初始化链表结构体各指针成员为空,链表节点个数为0 */ 62 DoubleList->HeadNode = NULL; 63 DoubleList->TailNode = NULL; 64 DoubleList->NodeCount = 0; 65 } 66 67 return DoubleList; 68 } 69 PEXIT_TASK ExitTask_Create() 70 { 71 PEXIT_TASK ExitTask = NULL; 72 73 ExitTask = (PEXIT_TASK)malloc(sizeof(EXIT_TASK)); 74 75 if (ExitTask != NULL) 76 { 77 ExitTask->MutexHandle = SeCreateMutex(); 78 if (ExitTask->MutexHandle == NULL) 79 { 80 free(ExitTask); 81 return NULL; 82 } 83 84 ExitTask->ExitFlag = TASK_NO_EXIT; 85 } 86 return ExitTask; 87 }
2>消息队列的释放
消 息 队列 的 释 放 操 作 要 比 创 建 操 作 复 杂 一 点 , 主 要 是 要 考 虑 退 出 的 资 源 释 放 问 题 。释 放 整 个 消 息 队 列 前 , 需 要 通 知 所 有 阻 塞 在 消 息 队 列 接 收 操 作 上 的 任 务 退 出 , 因 此 要使 用 ReleaseSemaphore将 计 数 值 设 最 大 。
1 void MsgQueue_Destroy(PMSG_QUEUE MsgQueue, LPFN_DESTROYFUNCTION DestroyFunction) 2 { 3 if (MsgQueue != NULL) 4 { 5 INT MaxLength = 0;; 6 if (MsgQueue->MaxLength > DEFAULT_MSGQUEUE_LENGTH) 7 { 8 MaxLength = MsgQueue->MaxLength; 9 } 10 else 11 { 12 MaxLength = DEFAULT_MSGQUEUE_LENGTH; 13 } 14 SeReleaseSemaphore(MsgQueue->SemaphoreHandleHandle, MaxLength); /* 让所有阻塞的接收操作可以继续*/ 15 16 TaskList_Destroy(MsgQueue->TaskNode, DestroyFunction); 17 SeCloseHandle(MsgQueue->SemaphoreHandleHandle); 18 free(MsgQueue); 19 } 20 } 21 void TaskList_Destroy(PTASK_NODE TaskNode, LPFN_DESTROYFUNCTION DestroyFunction) 22 { 23 24 if (TaskNode == NULL) 25 { 26 return; 27 } 28 29 ExitTask_Destroy(TaskNode->ExitTask); 30 31 DoubleList_Destroy(TaskNode->DoubleList, DestroyFunction); 32 33 free(TaskNode); 34 35 } 36 void DoubleList_Destroy(PDOUBLE_LIST DoubleList, LPFN_DESTROYFUNCTION DestroyFunction) 37 { 38 PDOUBLE_NODE TravleNode = NULL; 39 40 if (DoubleList) 41 { 42 /* 从头节点开始,一个接一个释放链表节点及节点数据 */ 43 TravleNode = DoubleList->HeadNode; 44 while (TravleNode != NULL) 45 { 46 PDOUBLE_NODE DeleteNode = NULL; 47 48 DeleteNode = TravleNode; 49 TravleNode = TravleNode->Flink; 50 51 if (DestroyFunction != NULL && DeleteNode->BufferData != NULL) 52 { 53 /* 释放数据 */ 54 (*DestroyFunction)(DeleteNode->BufferData); 55 } 56 free(DeleteNode); /* 释放节点 */ 57 } 58 /* 释放链表结构体 */ 59 free(DoubleList); 60 } 61 } 62 void ExitTask_Destroy(PEXIT_TASK ExitTask) 63 { 64 SeWaitForSingleObject(ExitTask->MutexHandle); 65 66 ExitTask->ExitFlag = TASK_EXIT; 67 68 69 70 SeReleaseMutex(ExitTask->MutexHandle); 71 72 /* 关闭操作的锁和退出事件 */ 73 SeCloseHandle(ExitTask->MutexHandle); 74 75 free(ExitTask); 76 }