#ifndef MSG_CONSTANT_H
#define MSG_CONSTANT_H
#define MAX_MSG_PARA_SIZE 2048 /*消息参数大小*/
#define MAX_QUEUE_LENGTH 3000 /*队列限制长度*/
#define EVENTMSG_QUIT 100 /*退出*/
#define RW_SUCCESS 0
#define EMPTY_QUE 1
#define RW_ERROR_QUE -1
#define FULL_QUE -2
#define LOCK_ERROR -3
#define UNLOCK_ERROR -4
#define REALESE_LOCK_ERROR -5
#define UN_INI -6
#define PER_BUFF_LENGTH 1000 /*消息输入缓冲区大小(条)*/
/*!
*
* 定义消息映射段内项目的宏 参考MFC 消息映射机制
*/
/*消息处理函数调用时,加上this,必须是消息机的成员函数*/
#define MSG_MAP_FUN(MSG_CODE,MSG_ACTION_FUN) case MSG_CODE:this->MSG_ACTION_FUN(msg);break;
/*实现消息转发到其他消息处理对象指针,用于某些消息需要长时间处理的情况,接受对象必须为msg_machine的子类*/
#define MSG_MAP_OBJ(MSG_CODE,MSG_ACTION_OBJ) case MSG_CODE:SendMessage(msg,MSG_ACTION_OBJ);break;
/*!消息映射宏
*
* BEGIN_MSG_MAP为定义消息映射段开始的宏
* 消息映射段必须出现在类体内,不是实现部分
*形式如下
* BEGIN_MSG_MAP
* MSG_MAP_FUN(message code ,member function that must return void and have a Msg type parameter)
* MSG_MAP_OBJ(message code ,message machine class instance)
* END_MSG_MAP
*/
#define BEGIN_MSG_MAP virtual void Dispatch(struct Msg msg) \
{ \
\
unsigned int msg_code; \
msg_code=msg.msg_code; \
switch(msg_code) \
{
/*!
*
* 定义消息映射段结束的宏
*/
#define END_MSG_MAP(base) default: \
base::Dispatch(msg); \
break; \
}\
}
#endif
- #ifndef MESSAGE_STRUCT_H
- #define MESSAGE_STRUCT_H
- #ifdef _cplusplus
- extern "C"
- {
- #include <pthread.h>
- #include <semaphore.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <ctype.h>
- }
- #endif //_cplusplus
- #include "msg_macro.h"
- typedef unsigned char u_char;
- struct Msg
- {
- unsigned int msg_code; // Message Code
- u_char msg_para[MAX_MSG_PARA_SIZE]; // Message Parameter
- };
- class Msg_queue
- {
- public:
- Msg_queue(){ initialized=false; };
- ~Msg_queue();
- int Initialize();
- int GetHead(struct Msg *p_msg);
- int AddTail(struct Msg *p_msg);
- private:
- pthread_mutex_t mutex_wr;
- bool initialized;
- int msg_count;
- int head;
- int tail;
- struct Msg msg_array[MAX_QUEUE_LENGTH];
- };
- #endif // MESSAGE_STRUCT_H
- #include "msg.h"
- extern "C"
- {
- #include <error.h>
- #include <errno.h>
- }
- int Msg_queue::Initialize()
- {
- int error;
- if(initialized)return 0;
- error=pthread_mutex_init(&mutex_wr,NULL);
- switch(error){
- case EAGAIN: initialized=false; return -1;
- case ENOMEM: initialized=false; return -2;
- case EPERM: initialized=false; return -3;
- }
- initialized=true;
- msg_count=0;
- head=0;
- tail=0;
- memset(msg_array,0,sizeof(msg_array));
- return 0;
- }
- int Msg_queue::GetHead(struct Msg *p_msg)
- {
- int result;
- int error;
- if(!initialized)return UN_INI;
- if(p_msg==NULL) return RW_ERROR_QUE;
- if(error=pthread_mutex_lock(&mutex_wr))
- return -3;//取锁失败
- if(msg_count>0)
- {
- memcpy((void*)p_msg,(void*)&msg_array[head],sizeof(struct Msg));
- memset((void*)&msg_array[head],0,sizeof(struct Msg));
- head++;
- if(head==MAX_QUEUE_LENGTH)head=0;
- msg_count--;
- result= RW_SUCCESS;
- }
- else
- {
- p_msg->msg_code=EMPTY_QUE;
- result=EMPTY_QUE;
- }
- if(error=pthread_mutex_unlock(&mutex_wr))
- return -4;//解锁失败
- return result;
- }
- int Msg_queue::AddTail(struct Msg *p_msg)
- {
- int ilock;
- int result;
- int error;
- if(p_msg==NULL)return RW_ERROR_QUE;
- if(!initialized)return UN_INI;
- if(error=pthread_mutex_lock(&mutex_wr))
- return -3;
- if(msg_count<MAX_QUEUE_LENGTH)
- {
- memcpy((void*)&msg_array[tail],(void*)p_msg,sizeof(struct Msg));
- tail++;
- if(tail==MAX_QUEUE_LENGTH)
- tail=0;
- msg_count++;
- result=RW_SUCCESS;
- }
- else
- result=FULL_QUE;
- if(error=pthread_mutex_unlock(&mutex_wr))
- result=-4;
- return result;
- }
- Msg_queue::~Msg_queue()
- {
- pthread_mutex_destroy(&mutex_wr);
- }
- #ifndef MSGMACHINE_H
- #define MSGMACHINE_H
- #include "msg.h"
- #include "msg_macro.h"
- #include "msg_input_buf.h"
- #ifdef _cplusplus
- extern "C"
- {
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <ctype.h>
- #include <signal.h>
- #include <unistd.h>
- }
- #endif //_cplusplus
- //-----------------------------------------------------------------------------
- class MsgMachine
- {
- public:
- bool cmd_run;
- MsgMachine();
- virtual ~MsgMachine();
- void Msg_Deal_Loop();
- /*从各一缓冲区读消息到消息队列,异步检查某个缓冲区是否可读*/
- int ReadInputBuf(InputBuff *pbuff) ;
- /*!
- *启动消息循环处理线程
- *字类如果改写Execute,必须调用父类的Execute才能启动消息处理循环
- */
- virtual void Dispatch(struct Msg msg)
- {
- unsigned int msg_code;
- msg_code=msg.msg_code;
- switch(msg_code)
- {
- default:break;
- }
- }
- private:
- bool DetectorActive;
- Msg_queue *pMsg_que;
- };
- #endif // MSGMACHINE_H
- #include "msg_machine.h"
- MsgMachine::~MsgMachine()
- {
- delete pMsg_que;
- }
- MsgMachine::MsgMachine()
- {
- pMsg_que=new Msg_queue;
- if(pMsg_que->Initialize()==-1)
- {
- delete pMsg_que;
- //printf("Initialize message queue error!\n");
- //u_log("Initialize message queue error!\n");
- exit(-1);
- }
- DetectorActive=true;
- }
- void MsgMachine::Msg_Deal_Loop()
- {
- struct Msg msg;
- while(cmd_run)
- {
- if(pMsg_que->GetHead(&msg)==0)
- {
- if(msg.msg_code==EVENTMSG_QUIT)
- break;
- Dispatch(msg);
- }
- else
- continue;
- }
- }
- int MsgMachine::ReadInputBuf(InputBuff *pbuff)
- {
- struct Msg msg;
- if(pbuff->Read(&msg)==0)
- {
- if(pMsg_que->AddTail(&msg)==RW_SUCCESS)
- {
- return 0;
- }
- else
- return -1;
- }
- else
- {
- return -1;
- }
- }
以上是偶们的某个项目(Redhat Linux下C++开发实现与某Portal Server的基于UDP消息的接口
的daemon,参考MFC的消息映射机制,目前已经实现,上述代码给大家参考,请多多指教