zoukankan      html  css  js  c++  java
  • 消息队列的实现

    消息队列的实现转换为经典的生产者——消费者模型。

    网上很多实现只能在生产者:消费 = 1:1的情况下运行。

    本实现参考《Unix 网络编程——卷二 进程间通信》,可以在任意配比下运行。

    还参考了《一种同步消息队列模型(C++)》,但由于 STL中的deque不必担心内存分配问题,所以实现只有互斥相关的操作。

    来看基础消息的定义,作为基类——有其它消息类型可以模拟C++中的继承机制。

    typedef struct ares_msg_block
    {
    pthread_t pid;
    void *msg;
    }msg_block_t;

    共享内存区域,即循环消息队列的结构定义

    typedef struct ares_msg_box
    {
    pthread_cond_t not_full;
    pthread_cond_t not_empty;
    pthread_mutex_t mutex;
    //消息的循环队列
    size_t first;
    size_t last;
    size_t size;
    size_t nready; ///多少个消息,判断满与空
    msg_block_t * msg_array[0]; ///柔性数组
    }msg_box_t;


    消息邮箱的初始化。

    View Code
    ///初始化
    static void msgbox_init(msg_box_t *mbox,size_t size)
    {
    assert(mbox);
    pthread_mutex_init(&(mbox->mutex),NULL);
    pthread_cond_init(&(mbox->not_full),NULL);
    pthread_cond_init(&(mbox->not_empty),NULL);
    mbox->size = size;
    mbox->nready= 0;
    mbox->first = 0;
    mbox->last = 0;
    memset(mbox->msg_array,sizeof(void *)*size,0);
    }
    msg_box_t * msgbox_new(size_t size)
    {
    msg_box_t *mbox = (msg_box_t *)malloc(sizeof(msg_box_t)+sizeof(void *)*size);
    if(mbox)
    msgbox_init(mbox,size);
    return mbox;
    }


    消息邮箱的释放。

    View Code
    void msgbox_free(msg_box_t *mbox)
    {
    ///等待所有的信号量结束
    pthread_mutex_lock(&(mbox->mutex));

    pthread_cond_destroy(&(mbox->not_full));
    pthread_cond_destroy(&(mbox->not_empty));
    pthread_mutex_unlock(&(mbox->mutex));
    pthread_mutex_destroy(&(mbox->mutex));
    free(mbox);
    }



    发送消息

    View Code
    /**
    @note 等待一个事件的到来
    */
    void msgbox_post(msg_box_t *mbox,msg_block_t *msg)
    {
    int wakeup = 0;
    pthread_mutex_lock(&(mbox->mutex));
    ///等待到有存放的空间
    while(mbox->nready == mbox->size )
    pthread_cond_wait(&(mbox->not_full),&(mbox->mutex));

    mbox->msg_array[mbox->last] = msg;
    mbox->last = (mbox->last+1)%mbox->size;
    if(0==mbox->nready)
    wakeup = 1;
    mbox->nready++;
    pthread_mutex_unlock(&(mbox->mutex));
    if(wakeup)
    pthread_cond_signal(&(mbox->not_empty));
    }

    取消息

    View Code
    /**
    @note 等待一个事件的到来
    */
    void msgbox_fetch(msg_box_t *mbox,msg_block_t **pmsg)
    {
    int wakeup = 0;
    pthread_mutex_lock(&(mbox->mutex));
    while(0==mbox->nready)
    pthread_cond_wait(&(mbox->not_empty),&(mbox->mutex));

    *pmsg = mbox->msg_array[mbox->first];
    mbox->first = (mbox->first+1)%mbox->size;


    if(mbox->nready == mbox->size)
    wakeup = 1;
    --(mbox->nready);
    pthread_mutex_unlock(&(mbox->mutex));

    if(wakeup)
    pthread_cond_signal(&(mbox->not_full));
    }







  • 相关阅读:
    编程经验
    GIS业务逻辑
    算法逻辑
    js中arguments的作用
    C#基础及记忆概念
    POI创建Excle
    数据库元数据分析Demo
    c3p0写连接池 Demo
    dbutils的使用Demo
    dbcp写连接池 Demo
  • 原文地址:https://www.cnblogs.com/westfly/p/2398630.html
Copyright © 2011-2022 走看看