zoukankan      html  css  js  c++  java
  • message queue的设计

    为了在各线程之间高效的传递消息,必须设计一种高效率的消息队列,传统的做法是mutex加queue,这种做法在每次执行push和pop时都要加锁,

    效率相对较低。其次还有使用循环队列,可以做到完全无锁,但只能实现1:1的消息传递。还有一些lock-free队列的实现,但基于其实现的相对复杂

    性,我不打算使用。

    我的队列设计是使用tls维护一个local list,每个线程执行push时,首先将元素放入属于本线程的local list中,此时是无需加锁的,然后检查队列中元素

    的总数,如果发现总数超过一个阀值,则将local list中的所有元素一次性提交到share list中,此时需要加锁,share list中的元素是对全局可见的。

    当读者执行pop操作时,首先从检查自己的local list中是否有元素,如果有就返回一个,如果没有则尝试从share list中将所有元素同步到自己的local list

    中.

    local list和message queue的结构如下:

    复制代码
    struct per_thread_struct
    {
        list_node   next;
        struct double_link_node block;
        struct link_list *local_q;
        condition_t cond;
    };
    
    struct mq
    {
        uint32_t           push_size;
        pthread_key_t      t_key;
        mutex_t            mtx;
        struct double_link blocks;
        struct link_list  *share_list;
        struct link_list  *local_lists;
    
    };
    复制代码

    对于push操作,提供了两个接口:

    void mq_push(mq_t,struct list_node*);
    void mq_push_now(mq_t,struct list_node*);

    mq_push将元素插入local list但只有当local list中的元素到达一定阀值时才会执行提交操作mq_sync_push.

    而mq_push_now将元素插入local list之后马上就会执行提交操作.

    然后还有一个问题,如果local list中的元素较长时间内都达不到阀值,会导致消息传递的延时,所以提供了mq_force_sync函数,此函数的作用是

    强制将执行一次提交操作,将local list中的所有元素提交到share list中去。producer线程可在其主循环内以固定的频率执行mq_force_sync,将一个

    时间循环内剩余未被提交的消息提交出去.

    下面贴下测试代码:

    复制代码
    #include <stdio.h>
    #include <stdlib.h>
    #include "KendyNet.h"
    #include "thread.h"
    #include "SocketWrapper.h"
    #include "atomic.h"
    #include "SysTime.h"
    #include "mq.h"
    
    list_node *node_list1[5];
    list_node *node_list2[5];
    mq_t mq1;
    
    void *Routine1(void *arg)
    {
        int j = 0;
        for( ; ; )
        {
            int i = 0;
            for(; i < 10000000; ++i)
            {
                mq_push(mq1,&node_list1[j][i]);
            }
            mq_force_sync(mq1);
            j = (j + 1)%5; 
            sleepms(100);
    
        }
    }
    
    void *Routine3(void *arg)
    {
        int j = 0;
        for( ; ; )
        {
            int i = 0;
            for(; i < 10000000; ++i)
            {
                mq_push(mq1,&node_list2[j][i]);
            }
            mq_force_sync(mq1);
            j = (j + 1)%5; 
            sleepms(100);
    
        }
    }
    
    void *Routine2(void *arg)
    {
        uint64_t count = 0;
        uint32_t tick = GetCurrentMs();
        for( ; ; )
        {
            list_node *n = mq_pop(mq1,50);
            if(n)
            {
                ++count;
            }
            uint32_t now = GetCurrentMs();
            if(now - tick > 1000)
            {
                printf("recv:%d\n",(count*1000)/(now-tick));
                tick = now;
                count = 0;
            }
        }
    }
    
    
    int main()
    {
        int i = 0;
        for( ; i < 5; ++i)
        {
            node_list1[i] = calloc(10000000,sizeof(list_node));
            node_list2[i] = calloc(10000000,sizeof(list_node));
        }
        mq1 = create_mq(4096);
        init_system_time(10);
        thread_t t1 = create_thread(0);
        start_run(t1,Routine1,NULL);
    
        thread_t t3 = create_thread(0);
        start_run(t3,Routine3,NULL);    
    
        thread_t t2 = create_thread(0);
        start_run(t2,Routine2,NULL);
    
        getchar();
    
        return 0;
    }
    复制代码

    因为主要是测试mq的效率,所以预先生成了1亿个消息,分为两个写者一个读者,两个写者循环不断的发消息,每发送1000W休眠一小会.

    读者仅仅是从mq中pop一个消息出来,然后更新统计值.在我的i3 2.93双核台式机上运行rhel 6虚拟机,每秒pop出来的消息数量大概在8000W上下。

    这个数据足已满足任何高性能的应用需求了.

    https://github.com/sniperHW/kendylib/blob/master/src/mq.c
     
  • 相关阅读:
    容器虚拟化之LXC(LinuX Container)
    Twemproxy+Keepalived+Redis
    Haproxy+Keepalived+Nginx
    Redis-sentinel集群
    Server2008r2 指定默认用户自动登录--注册表
    .Net Core应用框架Util介绍(二) 转
    .Net Core应用框架Util介绍(一)转
    一分钟教你知道乐观锁和悲观锁的区别
    设置DataTable行属性
    js中的内置对象
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/2730112.html
Copyright © 2011-2022 走看看