zoukankan      html  css  js  c++  java
  • Linux编程之自定义消息队列

    复制代码
    typedef struct Msg_Hdr_s  
    {  
        uint32 msg_type;  
        uint32 msg_len;  
        uint32 msg_src;  
        uint32 msg_dst;      
    }Msg_Hdr_t;  
      
    typedef struct Msg_s  
    {  
        Msg_Hdr_t hdr;  
        uint8 data[100];  
    } Msg_t;
    复制代码
    下面是我设计的消息格式内容的解释:
    • msg_type:标记消息类型,当消息接收者看到该msg_type后就知道他要干什么事了
    • msg_len:消息长度,待扩展,暂时没用到(以后会扩展为变长消息)
    • msg_src:消息的源地址,即消息的发起者
    • msg_dst:消息的目的地,即消息的接受者
    • data[100]:消息除去消息头外可以携带的信息量,定义为100字节
    由该消息数据结构可以知道,这个消息是定长的,当然也可以实现为变长消息,但现在暂不实现,今天先把定长消息实现了,以后再完善变长消息。
    二、构造循环队列
    队列可以由链表实现,也可以由数组实现,这里就使用数组实现的循环链表作为我们消息队列的队列模型。
    复制代码
    typedef struct Queue_s  
    {  
        int head;  
        int rear;  
        sem_t sem;  
        Msg_t data[QUEUE_SIZE];  
    }Queue_t;  
      
    int MsgQueueInit(Queue_t* Q)  
    {  
        if(!Q)  
        {  
            printf("Invalid Queue!
    ");  
            return -1;  
        }  
        Q->rear = 0;  
        Q->head = 0;  
        sem_init(&Q->sem, 0, 1);  
        return 0;      
    }  
      
    int MsgDeQueue(Queue_t* Q, Msg_t* msg)  
    {  
        if(!Q)  
        {  
            printf("Invalid Queue!
    ");  
            return -1;  
        }  
        if(Q->rear == Q->head) //only one consumer,no need to lock head  
        {  
            printf("Empty Queue!
    ");  
            return -1;  
        }  
        memcpy(msg, &(Q->data[Q->head]), sizeof(Msg_t));  
        Q->head = (Q->head+1)%QUEUE_SIZE;  
        return 0;         
      
    }  
      
    int MsgEnQueue(Queue_t* Q, Msg_t* msg)  
    {  
        if(Q->head == (Q->rear+1)%QUEUE_SIZE)  
        {  
            printf("Full Queue!
    ");  
            return -1;  
        }  
        sem_wait(&Q->sem);  
        memcpy(&(Q->data[Q->rear]), msg, sizeof(Msg_t));  
        Q->rear = (Q->rear+1)%QUEUE_SIZE;  
        sem_post(&Q->sem);  
        return 0;  
    } 
    复制代码
    循环队列的实现想必大家都比较熟悉,但这里需要提示的几点是:
    • 队列中应加入信号量或锁来保证进队时的互斥访问,因为多个消息可能同时进队,互相覆盖其队列节点
    • 这里的信号量仅用于进队而没用于出队,理由是消息处理者只有一个,不存在互斥的情形

    三、构造消息处理者

    复制代码
    if(pthread_create(&handler_thread_id, NULL, (void*)msg_handler, NULL))  
    {  
        printf("create handler thread fail!
    ");  
        return -1;          
    }  
      
    void msg_printer(Msg_t* msg)  
    {  
        if(!msg)  
        {  
            return;  
        }  
        printf("%s: I have recieved a message!
    ", __FUNCTION__);  
        printf("%s: msgtype:%d   msg_src:%d  dst:%d
    
    ",__FUNCTION__,msg->hdr.msg_type,msg->hdr.msg_src,msg->hdr.msg_dst);  
      
    }  
      
    void msg_handler()  
    {  
        sleep(5);  //let's wait 5s when starts  
        while(1)  
        {  
            Msg_t msg;  
            memset(&msg, 0 ,sizeof(Msg_t));  
            int res = MsgDeQueue((Queue_t*)&MsgQueue, &msg);  
            if(res != 0)  
            {  
                sleep(10);  
                continue;  
            }  
            msg_printer(&msg);  
            sleep(1);  
        }  
    }
    复制代码
    我在进程里create了一个线程作为消息处理者(handler)来处理消息队列的消息,甘进入该线程时先等个5秒钟来让生产者往队列里丢些消息,然后再开始消息处理。当队列没消息可取时,就休息十秒,再去取消息。
    这里的消息处理很简单,我只是简单地将受到的消息打印一下,证明受到的消息正是其他线程发给我的。当然,你也可以在这里扩展功能,根据受到的消息类型进一步决定该做什么事。比如:
    复制代码
    enum MSG_TYPE  
    {  
        GO_HOME,  
        GO_TO_BED,  
        GO_TO_LUNCH,  
        GO_TO_CINAMA,  
        GO_TO_SCHOOL,  
        GO_DATEING,  
        GO_TO_WORK,//6  
    };  
      
    void handler()  
    {  
        switch(msgtype)  
        {  
            case GO_HOME: go_home(); break;  
            case GO_TO_BED:  go_to_bed(); break;  
            .......  
        }  
    }
    复制代码

    这里的handler就是一个简单的状态机了,根据给定的消息类型(事件)去做特定的事,推动状态机的转动。

    四、构造消息生产者

    复制代码
    if(pthread_create(&thread1_id, NULL, (void*)msg_sender1, NULL))  
    {  
        printf("create thread1 fail!
    ");  
        return -1;  
    }  
      
    if(pthread_create(&thread2_id, NULL, (void*)msg_sender2, NULL))  
    {  
        printf("create thread2 fail!
    ");  
        return -1;  
    }      
      
    if(pthread_create(&thread3_id, NULL, (void*)msg_sender3, NULL))  
    {  
        printf("create thread3 fail!
    ");  
        return -1;  
    }      
      
      
    void msg_sender1()  
    {  
        int i = 0;  
        while(1)  
        {  
            if(i > 10)  
            {  
                i = 0;  
            }  
            Msg_t msg;  
            msg.hdr.msg_type = i++;  
            msg.hdr.msg_src = THREAD1;  
            msg.hdr.msg_dst = HANDLER;  
            MsgEnQueue((Queue_t*)&MsgQueue, &msg);  
            printf("%s: Thread1 send a message!
    ",__FUNCTION__);  
            sleep(1);  
        }  
    }  
      
    void msg_sender2()  
    {  
        int i = 0;  
        while(1)  
        {  
            if(i > 10)  
            {  
                i = 0;  
            }  
            Msg_t msg;  
            msg.hdr.msg_type = i++;  
            msg.hdr.msg_src = THREAD2;  
            msg.hdr.msg_dst = HANDLER;  
            MsgEnQueue((Queue_t*)&MsgQueue, &msg);  
            printf("%s: Thread2 send a message!
    ",__FUNCTION__);  
            sleep(1);  
        }  
    }  
      
    void msg_sender3()  
    {  
        int i = 0;  
        while(1)  
        {  
            if(i > 10)  
            {  
                i = 0;  
            }  
            Msg_t msg;  
            msg.hdr.msg_type = i++;  
            msg.hdr.msg_src = THREAD3;  
            msg.hdr.msg_dst = HANDLER;  
            MsgEnQueue((Queue_t*)&MsgQueue, &msg);  
            printf("%s: Thread3 send a message!
    ",__FUNCTION__);  
            sleep(1);  
        }  
    }
    复制代码

    这里我create了三个线程来模拟消息生产者,每个生产者每隔1秒往消息队列里写消息。

    五、跑起来看看

    先贴完整的代码:
    msg_queue.c:
    复制代码
      1 #include <stdio.h>  
      2 #include <pthread.h>  
      3 #include <semaphore.h>  
      4 #include <unistd.h>  
      5 #include <string.h>  
      6 #include "msg_def.h"  
      7   
      8 Queue_t MsgQueue;  
      9   
     10 int main(int argc, char* argv[])  
     11 {  
     12     int ret;  
     13     pthread_t thread1_id;  
     14     pthread_t thread2_id;  
     15     pthread_t thread3_id;  
     16     pthread_t handler_thread_id;  
     17   
     18     ret = MsgQueueInit((Queue_t*)&MsgQueue);  
     19     if(ret != 0)  
     20     {  
     21         return -1;  
     22     }  
     23   
     24     if(pthread_create(&handler_thread_id, NULL, (void*)msg_handler, NULL))  
     25     {  
     26         printf("create handler thread fail!
    ");  
     27         return -1;          
     28     }  
     29   
     30   
     31     if(pthread_create(&thread1_id, NULL, (void*)msg_sender1, NULL))  
     32     {  
     33         printf("create thread1 fail!
    ");  
     34         return -1;  
     35     }  
     36   
     37     if(pthread_create(&thread2_id, NULL, (void*)msg_sender2, NULL))  
     38     {  
     39         printf("create thread2 fail!
    ");  
     40         return -1;  
     41     }      
     42   
     43     if(pthread_create(&thread3_id, NULL, (void*)msg_sender3, NULL))  
     44     {  
     45         printf("create thread3 fail!
    ");  
     46         return -1;  
     47     }      
     48   
     49   
     50     while(1)  
     51     {      
     52         sleep(1);  
     53     }  
     54   
     55     return 0;  
     56 }  
     57   
     58   
     59   
     60   
     61 int MsgQueueInit(Queue_t* Q)  
     62 {  
     63     if(!Q)  
     64     {  
     65         printf("Invalid Queue!
    ");  
     66         return -1;  
     67     }  
     68     Q->rear = 0;  
     69     Q->head = 0;  
     70     sem_init(&Q->sem, 0, 1);  
     71     return 0;      
     72 }  
     73   
     74 int MsgDeQueue(Queue_t* Q, Msg_t* msg)  
     75 {  
     76     if(!Q)  
     77     {  
     78         printf("Invalid Queue!
    ");  
     79         return -1;  
     80     }  
     81     if(Q->rear == Q->head) //only one cosumer,no need to lock head  
     82     {  
     83         printf("Empty Queue!
    ");  
     84         return -1;  
     85     }  
     86     memcpy(msg, &(Q->data[Q->head]), sizeof(Msg_t));  
     87     Q->head = (Q->head+1)%QUEUE_SIZE;  
     88     return 0;         
     89   
     90 }  
     91   
     92 int MsgEnQueue(Queue_t* Q, Msg_t* msg)  
     93 {  
     94     if(Q->head == (Q->rear+1)%QUEUE_SIZE)  
     95     {  
     96         printf("Full Queue!
    ");  
     97         return -1;  
     98     }  
     99     sem_wait(&Q->sem);  
    100     memcpy(&(Q->data[Q->rear]), msg, sizeof(Msg_t));  
    101     Q->rear = (Q->rear+1)%QUEUE_SIZE;  
    102     sem_post(&Q->sem);  
    103     return 0;  
    104 }  
    105   
    106 void msg_printer(Msg_t* msg)  
    107 {  
    108     if(!msg)  
    109     {  
    110         return;  
    111     }  
    112     printf("%s: I have recieved a message!
    ", __FUNCTION__);  
    113     printf("%s: msgtype:%d   msg_src:%d  dst:%d
    
    ",__FUNCTION__,msg->hdr.msg_type,msg->hdr.msg_src,msg->hdr.msg_dst);  
    114   
    115 }  
    116   
    117 int msg_send()  
    118 {  
    119   
    120     Msg_t msg;  
    121     msg.hdr.msg_type = GO_HOME;  
    122     msg.hdr.msg_src = THREAD1;  
    123     msg.hdr.msg_dst = HANDLER;  
    124     return MsgEnQueue((Queue_t*)&MsgQueue, &msg);      
    125   
    126 }  
    127   
    128 void msg_handler()  
    129 {  
    130     sleep(5);  //let's wait 5s when starts  
    131     while(1)  
    132     {  
    133         Msg_t msg;  
    134         memset(&msg, 0 ,sizeof(Msg_t));  
    135         int res = MsgDeQueue((Queue_t*)&MsgQueue, &msg);  
    136         if(res != 0)  
    137         {  
    138             sleep(10);  
    139             continue;  
    140         }  
    141         msg_printer(&msg);  
    142         sleep(1);  
    143     }  
    144 }  
    145   
    146   
    147 void msg_sender1()  
    148 {  
    149     int i = 0;  
    150     while(1)  
    151     {  
    152         if(i > 10)  
    153         {  
    154             i = 0;  
    155         }  
    156         Msg_t msg;  
    157         msg.hdr.msg_type = i++;  
    158         msg.hdr.msg_src = THREAD1;  
    159         msg.hdr.msg_dst = HANDLER;  
    160         MsgEnQueue((Queue_t*)&MsgQueue, &msg);  
    161         printf("%s: Thread1 send a message!
    ",__FUNCTION__);  
    162         sleep(1);  
    163     }  
    164 }  
    165   
    166 void msg_sender2()  
    167 {  
    168     int i = 0;  
    169     while(1)  
    170     {  
    171         if(i > 10)  
    172         {  
    173             i = 0;  
    174         }  
    175         Msg_t msg;  
    176         msg.hdr.msg_type = i++;  
    177         msg.hdr.msg_src = THREAD2;  
    178         msg.hdr.msg_dst = HANDLER;  
    179         MsgEnQueue((Queue_t*)&MsgQueue, &msg);  
    180         printf("%s: Thread2 send a message!
    ",__FUNCTION__);  
    181         sleep(1);  
    182     }  
    183 }  
    184   
    185 void msg_sender3()  
    186 {  
    187     int i = 0;  
    188     while(1)  
    189     {  
    190         if(i > 10)  
    191         {  
    192             i = 0;  
    193         }  
    194         Msg_t msg;  
    195         msg.hdr.msg_type = i++;  
    196         msg.hdr.msg_src = THREAD3;  
    197         msg.hdr.msg_dst = HANDLER;  
    198         MsgEnQueue((Queue_t*)&MsgQueue, &msg);  
    199         printf("%s: Thread3 send a message!
    ",__FUNCTION__);  
    200         sleep(1);  
    201     }  
    202 }
    复制代码

    msg_def.h:

    复制代码
     1 #include <stdio.h>  
     2 #include <pthread.h>  
     3 #include <semaphore.h>  
     4   
     5 typedef unsigned char uint8;  
     6 typedef unsigned short unit16;  
     7 typedef unsigned int uint32;  
     8   
     9 #define QUEUE_SIZE 1000  
    10   
    11 typedef struct Msg_Hdr_s  
    12 {  
    13     uint32 msg_type;  
    14     uint32 msg_len;  
    15     uint32 msg_src;  
    16     uint32 msg_dst;      
    17 }Msg_Hdr_t;  
    18   
    19 typedef struct Msg_s  
    20 {  
    21     Msg_Hdr_t hdr;  
    22     uint8 data[100];  
    23 } Msg_t;  
    24   
    25 typedef struct Queue_s  
    26 {  
    27     int head;  
    28     int rear;  
    29     sem_t sem;  
    30     Msg_t data[QUEUE_SIZE];  
    31 }Queue_t;  
    32   
    33 typedef struct Queue_s QueueNode;  
    34   
    35 enum MSG_TYPE  
    36 {  
    37     GO_HOME,  
    38     GO_TO_BED,  
    39     GO_TO_LUNCH,  
    40     GO_TO_CINAMA,  
    41     GO_TO_SCHOOL,  
    42     GO_DATEING,  
    43     GO_TO_WORK,//6  
    44 };  
    45   
    46 enum SRC_ADDR  
    47 {  
    48     THREAD1,  
    49     THREAD2,  
    50     THREAD3,  
    51     HANDLER,  
    52 };  
    53   
    54   
    55 int MsgQueueInit(Queue_t* Q);  
    56 int MsgDeQueue(Queue_t* Q, Msg_t* msg);  
    57 int MsgEnQueue(Queue_t* Q, Msg_t* msg);  
    58 void msg_handler();  
    59 void msg_sender1();  
    60 void msg_sender2();  
    61 void msg_sender3();  
    62 void msg_printer(Msg_t* msg);  
    63 int msg_send();
    复制代码
    看看跑起来的现象:
     
    Finish!
    现在这套进程内的消息队列的架构在实际工程中非常实用(当然实际工程的框架会复杂健壮得多),很多工程都需要这种基于事件推动的思想来保证每条请求都可以有条不絮地执行,所以这个框架也是有用武之地的,尤其配合状态机非常适合!
  • 相关阅读:
    【洛谷P3628】特别行动队
    【洛谷P3233】世界树
    【BZOJ1597】土地购买
    【洛谷P4068】数字配对
    【洛谷P3899】谈笑风生
    【BZOJ2726】任务安排
    【洛谷P6186】[NOI Online 提高组] 冒泡排序
    【洛谷P3369】【模板】普通平衡树
    【UOJ#8】Quine
    标准 插入flash
  • 原文地址:https://www.cnblogs.com/xieyulin/p/7060886.html
Copyright © 2011-2022 走看看