zoukankan      html  css  js  c++  java
  • Boost无锁队列

     

    版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/okiwilldoit/article/details/50970408

    在开发接收转发agent时,采用了多线程的生产者-消费者模式,用了加互斥锁的方式来实现线程同步。互斥锁会阻塞线程,所以压测时,效率并不高。所以想起用无锁队列来实现,性能确实提升了。

    首先介绍下lock-free和wait-free的区别:

    阻塞算法可能会出现整个系统都挂起的情况(占有锁的线程被中断,无法释放所,那么所有试图争用这个锁的线程会被挂起),系统中的所有线程全部饿死。

    无锁算法可以保证系统中至少有一个线程处于工作状态,但是还是可能有线程永远抢不到资源而被饿死。

    无等待算法保证系统中的所有线程都能处于工作状态,没有线程会被饿死,只要时间够,所有线程都能结束。相比于无锁算法,无等待算法有更强的保证。

    一. 用互斥锁实现单生产者-单消费者

    #include <string>
    #include <sstream>
    #include <list>
    #include <pthread.h>
    #include <iostream>
    #include <time.h>
    
    using namespace std;
    
    int producer_count = 0;
    int consumer_count = 0;
    
    list<string> product;
    list<string> consumer_list;
    pthread_mutex_t mutex;
    
    const int iterations = 10000;
    
    //是否生产完毕标志
    bool done = false;
    
    void* producer(void* args)
    {
        for (int i = 0; i != iterations; ++i) {
            pthread_mutex_lock(&mutex);
            int value = ++producer_count;
            stringstream ss;
            ss<<value;
            product.push_back(ss.str());
            //cout<<"list push:"<<ss.str()<<endl;
            pthread_mutex_unlock(&mutex);
        }
        return 0;
    }
    
    
    
    //消费函数
    void* consumer(void* args)
    {
        //当没有生产完毕,则边消费边生产
        while (!done) {
            pthread_mutex_lock(&mutex);
            if(!product.empty()){
                consumer_list.splice(consumer_list.end(), product);
                pthread_mutex_unlock(&mutex);
                while(!consumer_list.empty()){
                    string value = consumer_list.front();
                    consumer_list.pop_front();
                    //cout<<"list pop:"<<value<<endl;
                    ++consumer_count;
                }           
            }else{
                pthread_mutex_unlock(&mutex);
            }
        }
        //如果生产完毕,则消费
        while(!consumer_list.empty()){
            string value = consumer_list.front();
            consumer_list.pop_front();
            //cout<<"list pop:"<<value<<endl;
            ++consumer_count;
        }
        return 0;
    }
    
    int main(int argc, char* argv[])
    {
        struct timespec time_start={0, 0},time_end={0, 0};
        clock_gettime(CLOCK_REALTIME, &time_start);
    
        pthread_t producer_tid;
        pthread_t consumer_tid;
    
        pthread_mutex_init (&mutex,NULL);
        pthread_create(&producer_tid, NULL, producer, NULL);
        pthread_create(&consumer_tid, NULL, consumer, NULL);
    
        //等待生产者生产完毕
        pthread_join(producer_tid, NULL);
        //可以消费标志
        done = true;     //主线程不等生产线程完毕就设置done标记
        cout << "producer done" << endl;    //输出以观察主线程和各子线程的执行顺序
    
        //等待消费者结束
        pthread_join(consumer_tid, NULL);
        clock_gettime(CLOCK_REALTIME, &time_end);
    
        long cost = (time_end.tv_sec-time_start.tv_sec)/1000000 + (time_end.tv_nsec-time_start.tv_nsec)/1000;
    
        cout<<"===========cost time:"<<cost<<"us==========="<<endl;
    
        cout << "produced " << producer_count << " objects." << endl;
        cout << "consumed " << consumer_count << " objects." << endl;
    }

    生产消费10000个string类型的数据,耗时:58185us

    二. Boost库的无锁队列

    boost.lockfree实现了三种无锁数据结构:
    boost::lockfree::queue
    alock-free multi-produced/multi-consumer queue
    一个无锁的多生产者/多消费者队列,注意,这个queue不支持string类型,支持的数据类型要求:
    - T must have a copy constructor
    - T must have a trivial assignment operator
    - T must have a trivial destructor

    boost::lockfree::stack
    alock-free multi-produced/multi-consumer stack
    一个无锁的多生产者/多消费者栈,支持的数据类型要求:
    - T must have a copy constructor

    boost::lockfree::spsc_queue
    await-free single-producer/single-consumer queue (commonly known as ringbuffer)
    一个无等待的单生产者/单消费者队列(通常被称为环形缓冲区),支持的数据类型要求:
    - T must have a default constructor
    - T must be copyable

    详细资料可以看官方文档:http://www.boost.org/doc/libs/1_55_0/doc/html/lockfree.html

    三. Queue示例

    这里实现的还是单生产者-单消费者。

    #include <pthread.h>
    #include <boost/lockfree/queue.hpp>
    #include <iostream>
    #include <time.h>
    #include <boost/atomic.hpp>
    
    using namespace std;
    
    //生产数量
    boost::atomic_int producer_count(0);
    //消费数量
    boost::atomic_int consumer_count(0);
    //队列
    boost::lockfree::queue<int> queue(512);
    
    
    //迭代次数
    const int iterations = 10000;
    
    //生产函数
    void* producer(void* args)
    {
        for (int i = 0; i != iterations; ++i) {
            int value = ++producer_count;
            //原子计数————多线程不存在计数不上的情况       
            //若没有进入队列,则重复推送
            while(!queue.push(value));
            //cout<<"queue push:"<<value<<endl;
        }
        return 0;
    }
    
    //是否生产完毕标志
    boost::atomic<bool> done (false);
    
    //消费函数
    void* consumer(void* args)
    {
        int value;
        //当没有生产完毕,则边消费边生产
        while (!done) {
            //只要能弹出元素,就消费
            while (queue.pop(value)) {
                //cout<<"queue pop:"<<value<<endl;
                ++consumer_count;
            }
        }
        //如果生产完毕,则消费
        while (queue.pop(value)){
            //cout<<"queue pop:"<<value<<endl;
            ++consumer_count;
        }
        return 0;
    }
    
    int main(int argc, char* argv[])
    {
        cout << "boost::lockfree::queue is ";
        if (!queue.is_lock_free())
            cout << "not ";
        cout << "lockfree" << endl;
    
        struct timespec time_start={0, 0},time_end={0, 0};
        clock_gettime(CLOCK_REALTIME, &time_start);
    
        pthread_t producer_tid;
        pthread_t consumer_tid;
    
        pthread_create(&producer_tid, NULL, producer, NULL);
        pthread_create(&consumer_tid, NULL, consumer, NULL);
    
        //等待生产者生产完毕
        pthread_join(producer_tid, NULL);
        //可以消费标志
        done = true;     //主线程不等生产线程完毕就设置done标记
        cout << "producer done" << endl;    //输出以观察主线程和各子线程的执行顺序
    
        //等待消费者结束
        pthread_join(consumer_tid, NULL);
        clock_gettime(CLOCK_REALTIME, &time_end);
    
        long cost = (time_end.tv_sec-time_start.tv_sec)/1000000 + (time_end.tv_nsec-time_start.tv_nsec)/1000;
    
        cout<<"===========cost time:"<<cost<<"us==========="<<endl;
    
        //输出生产和消费数量
        cout << "produced " << producer_count << " objects." << endl;
        cout << "consumed " << consumer_count << " objects." << endl;
    
        return 0;
    }

    生产消费10000个int类型的数据,耗时:3963us
    stack与queue类似,只不过是先进后出。

    四. Waitfree Single-Producer/Single-Consumer Queue无等待单生产者/单消费者队列

    #include <pthread.h>
    #include <iostream>
    #include <time.h>
    #include <boost/lockfree/spsc_queue.hpp>
    #include <boost/atomic.hpp>
    
    using namespace std;
    
    int producer_count = 0;
    boost::atomic_int consumer_count (0);
    
    boost::lockfree::spsc_queue<int, boost::lockfree::capacity<1024> > spsc_queue;
    
    const int iterations = 10000;
    
    void* producer(void* args)
    {
        for (int i = 0; i != iterations; ++i) {
            int value = ++producer_count;
            while(!spsc_queue.push(value));
            //cout<<"queue push:"<<value<<endl;
        }
        return 0;
    }
    
    //是否生产完毕标志
    boost::atomic<bool> done (false);
    
    //消费函数
    void* consumer(void* args)
    {
        int value;
        //当没有生产完毕,则边消费边生产
        while (!done) {
            //只要能弹出元素,就消费
            while (spsc_queue.pop(value)) {
                //cout<<"queue pop:"<<value<<endl;
                ++consumer_count;
            }
        }
        //如果生产完毕,则消费
        while (spsc_queue.pop(value)){
            //cout<<"queue pop:"<<value<<endl;
            ++consumer_count;
        }
        return 0;
    }
    
    int main(int argc, char* argv[])
    {
        using namespace std;
        cout << "boost::lockfree::queue is ";
        if (!spsc_queue.is_lock_free())
            cout << "not ";
        cout << "lockfree" << endl;
    
        struct timespec time_start={0, 0},time_end={0, 0};
        clock_gettime(CLOCK_REALTIME, &time_start);
    
        pthread_t producer_tid;
        pthread_t consumer_tid;
    
        pthread_create(&producer_tid, NULL, producer, NULL);
        pthread_create(&consumer_tid, NULL, consumer, NULL);
    
        //等待生产者生产完毕
        pthread_join(producer_tid, NULL);
        //可以消费标志
        done = true;     //主线程不等生产线程完毕就设置done标记
        cout << "producer done" << endl;    //输出以观察主线程和各子线程的执行顺序
    
        //等待消费者结束
        pthread_join(consumer_tid, NULL);
        clock_gettime(CLOCK_REALTIME, &time_end);
    
        long cost = (time_end.tv_sec-time_start.tv_sec)/1000000 + (time_end.tv_nsec-time_start.tv_nsec)/1000;
    
        cout<<"===========cost time:"<<cost<<"us==========="<<endl;
    
        cout << "produced " << producer_count << " objects." << endl;
        cout << "consumed " << consumer_count << " objects." << endl;
    }

    生产消费10000个int类型的数据,耗时:1832us
    如果把int改为string类型,耗时:28788us

    五.性能对比

    这里写图片描述
    从上面可以看出在单生产者-单消费者模式下,spsc_queue比queue性能好,无锁队列比互斥锁的方式性能也要好。

  • 相关阅读:
    PAT (Advanced Level) Practice 1100 Mars Numbers (20分)
    PAT (Advanced Level) Practice 1107 Social Clusters (30分) (并查集)
    PAT (Advanced Level) Practice 1105 Spiral Matrix (25分)
    PAT (Advanced Level) Practice 1104 Sum of Number Segments (20分)
    PAT (Advanced Level) Practice 1111 Online Map (30分) (两次迪杰斯特拉混合)
    PAT (Advanced Level) Practice 1110 Complete Binary Tree (25分) (完全二叉树的判断+分享致命婴幼儿错误)
    PAT (Advanced Level) Practice 1109 Group Photo (25分)
    PAT (Advanced Level) Practice 1108 Finding Average (20分)
    P6225 [eJOI2019]异或橙子 树状数组 异或 位运算
    P4124 [CQOI2016]手机号码 数位DP
  • 原文地址:https://www.cnblogs.com/lvdongjie/p/9679344.html
Copyright © 2011-2022 走看看