zoukankan      html  css  js  c++  java
  • C++ 并发编程之互斥锁和条件变量的性能比较

    C++ 并发编程之互斥锁和条件变量的性能比较

    来源 https://juejin.im/post/5d6b2e655188257a2615eb72

    介绍

    本文以最简单生产者消费者模型,通过运行程序,观察该进程的cpu使用率,来对比使用互斥锁 和 互斥锁+条件变量的性能比较。

    本例子的生产者消费者模型,1个生产者,5个消费者。 生产者线程往队列里放入数据,5个消费者线程从队列取数据,取数据前需要判断一下队列中是否有数据,这个队列是全局队列,是线程间共享的数据,所以需要使用互斥锁进行保护。即生产者在往队列里放入数据时,其余消费者不能取,反之亦然。


    互斥锁实现的代码

    #include <iostream> // std::cout
    #include <deque>    // std::deque
    #include <thread>   // std::thread
    #include <chrono>   // std::chrono
    #include <mutex>    // std::mutex
    
    
    // 全局队列
    std::deque<int> g_deque;
    
    // 全局锁
    std::mutex g_mutex;
    
    // 生产者运行标记
    bool producer_is_running = true;
    
    // 生产者线程函数
    void Producer()
    {
        // 库存个数
        int count = 8;
        
        do
        {
            // 智能锁,初始化后即加锁,保护的范围是代码花括号内,花括号退出即会自动解锁
            // 可以手动解锁,从而控制互斥锁的细粒度
            std::unique_lock<std::mutex> locker( g_mutex );
            // 入队一个数据
            g_deque.push_front( count );
            // 提前解锁,缩小互斥锁的细粒度,只针对共享的队列数据进行同步保护
            locker.unlock(); 
    
            std::cout << "生产者    :我现在库存有 :" << count << std::endl;
                
            // 放慢生产者生产速度,睡1秒
            std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
    
            // 库存自减少
            count--;
        } while( count > 0 );
        
        // 标记生产者打样了
        producer_is_running = false;
    
        std::cout << "生产者    : 我的库存没有了,我要打样了!"  << std::endl;
    }
    
    // 消费者线程函数
    void Consumer(int id)
    {
        int data = 0;
    
        do
        {
            // 智能锁,初始化后即加锁,保护的范围是代码花括号内,花括号退出即会自动解锁
            // 可以手动解锁,从而控制互斥锁的细粒度
            std::unique_lock<std::mutex> locker( g_mutex );
            
            // 队列不为空
            if( !g_deque.empty() )
            {
                // 取出队列里最后一个数据
                data = g_deque.back();
                
                // 删除队列里最后一个数据
                g_deque.pop_back();
                
                // 提前解锁,缩小互斥锁的细粒度,只针对共享的队列数据进行同步保护
                locker.unlock();
    
                std::cout << "消费者[" << id << "] : 我抢到货的编号是 :" << data << std::endl;
            }
            // 队列为空
            else
            {
                locker.unlock();
            }
        } while( producer_is_running );
    	
    	std::cout << "消费者[" << id << "] :卖家没有货打样了,真可惜,下次再来抢!"  << std::endl;
    }
    
    int main(void)
    {
        std::cout << "1 producer start ..." << std::endl;
        std::thread producer( Producer );
    
        std::cout << "5 consumer start ..." << std::endl;
        std::thread consumer[ 5 ];
        for(int i = 0; i < 5; i++)
        {
            consumer[i] = std::thread(Consumer, i + 1);
        }
    
        producer.join();
    
        for(int i = 0; i < 5; i++)
        {
            consumer[i].join();
        }
    
        std::cout << "All threads joined." << std::endl;
    
        return 0;
    }
    
    复制代码

    互斥锁实现运行结果:

    结果输出

    [root@lincoding condition]# g++ -std=c++0x -pthread -D_GLIBCXX_USE_NANOSLEEP main.cpp -o  main
    [root@lincoding condition]# ./main
    1 producer start ...
    5 consumer start ...
    生产者    :我现在库存有 :8
    消费者[1] : 我抢到货的编号是 :8
    消费者[1] : 我抢到货的编号是 :7
    生产者    :我现在库存有 :7
    生产者    :我现在库存有 :6
    消费者[3] : 我抢到货的编号是 :6
    生产者    :我现在库存有 :5
    消费者[1] : 我抢到货的编号是 :5
    生产者    :我现在库存有 :4
    消费者[2] : 我抢到货的编号是 :4
    生产者    :我现在库存有 :3
    消费者[5] : 我抢到货的编号是 :3
    生产者    :我现在库存有 :2
    消费者[2] : 我抢到货的编号是 :2
    生产者    :我现在库存有 :1
    消费者[1] : 我抢到货的编号是 :1
    生产者    : 我的库存没有了,我要打样了!消费者[
    5] :卖家没有货打样了,真可惜,下次再来抢!
    消费者[2] :卖家没有货打样了,真可惜,下次再来抢!
    消费者[3] :卖家没有货打样了,真可惜,下次再来抢!
    消费者[4] :卖家没有货打样了,真可惜,下次再来抢!
    消费者[1] :卖家没有货打样了,真可惜,下次再来抢!
    All threads joined.
    复制代码

    可以看到,互斥锁其实可以完成这个任务,但是却存在着性能问题。

    • Producer是生产者线程,在生产者数据过程中,会休息1秒,所以这个生产过程是很慢的;

    • Consumer是消费者线程,存在着一个while循环,只有判断到生产者不运行了,才会退出while循环,那么每次在循环体内,都是会先加锁,判断队列不空,然后从列队取出一个数据,最后解锁。所以说,在生产者休息1秒的时候,消费者线程实际上会做很多无用功,导致CPU使用率非常高!

    运行的环境是4核cpu

    [root@lincoding ~]# grep 'model name' /proc/cpuinfo | wc -l
    4
    复制代码

    top命令查看cpu使用情况,可见使用纯互斥锁cpu的开销是很大的,main进程的cpu使用率达到了357.5%CPU,系统开销的cpu为54.5%sy,用户开销的cpu为18.2%us

    [root@lincoding ~]# top
    top - 19:13:41 up 36 min,  3 users,  load average: 0.06, 0.05, 0.01
    Tasks: 179 total,   1 running, 178 sleeping,   0 stopped,   0 zombie
    Cpu(s): 18.2%us, 54.5%sy,  0.0%ni, 27.3%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
    Mem:   1004412k total,   313492k used,   690920k free,    41424k buffers
    Swap:  2031608k total,        0k used,  2031608k free,    79968k cached
    
       PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                                                                                                       
     35346 root      20   0  137m 3288 1024 S 357.5  0.3   0:05.92 main                                                                                                                          
         1 root      20   0 19232 1492 1224 S  0.0  0.1   0:02.16 init                                                                                                                           
         2 root      20   0     0    0    0 S  0.0  0.0   0:00.01 kthreadd                                                                                                                       
         3 root      RT   0     0    0    0 S  0.0  0.0   0:00.68 migration/0  
    复制代码

    解决的办法之一就是给消费者也加一个小延时,当消费者没取到数据时,就休息一下500毫秒,这样可以减少互斥锁给cpu带来的开销。

    // 消费者线程函数
    void Consumer(int id)
    {
        int data = 0;
    
        do
        {
            // 智能锁,初始化后即加锁,保护的范围是代码花括号内,花括号退出即会自动解锁
            // 可以手动解锁,从而控制互斥锁的细粒度
            std::unique_lock<std::mutex> locker( g_mutex );
            
            // 队列不为空
            if( !g_deque.empty() )
            {
                // 取出队列里最后一个数据
                data = g_deque.back();
                
                // 删除队列里最后一个数据
                g_deque.pop_back();
                
                // 提前解锁,缩小互斥锁的细粒度,只针对共享的队列数据进行同步保护
                locker.unlock();
    
                std::cout << "消费者[" << id << "] : 我抢到货的编号是 :" << data << std::endl;
            }
            // 队列为空
            else
            {
                locker.unlock();
                // 休息500毫秒
                std::this_thread::sleep_for( std::chrono::milliseconds( 500 ) );
            }
        } while( producer_is_running );
    	
    	std::cout << "消费者[" << id << "] :卖家没有货打样了,真可惜,下次再来抢!"  << std::endl;
    }
    复制代码

    从运行结果可知,cpu使用率大大降低了

    [root@lincoding ~]# ps aux | grep -v grep  |grep main
    USER        PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
    root      61296  0.0  0.1 141068  1244 pts/1    Sl+  19:40   0:00 ./main
    
    复制代码

    条件变量+互斥锁实现的代码

    那么问题来了,如何确定消费者延时(休息)多久呢?

    • 如果生产者生产的非常快,消费者却延时了500毫秒,也不是很好
    • 如果生产者生产的更慢,那么消费延时500毫秒,也会有无用功,占用了CPU

    这就需要引入条件变量std::condition_variable,应用于消费者生产模型中,就是生产者生产完一个数据后,通过notify_one()唤醒正在wait()消费者线程,使得消费者从队列取出一个数据。

    #include <iostream> // std::cout
    #include <deque>    // std::deque
    #include <thread>   // std::thread
    #include <chrono>   // std::chrono
    #include <mutex>    // std::mutex
    
    #include <condition_variable> // std::condition_variable
    
    
    // 全局队列
    std::deque<int> g_deque;
    
    // 全局锁
    std::mutex g_mutex;
    
    // 全局条件变量
    std::condition_variable g_cond;
    
    // 生产者运行标记
    bool producer_is_running = true;
    
    // 生产者线程函数
    void Producer()
    {
        // 库存个数
        int count = 8;
        
        do
        {
            // 智能锁,初始化后即加锁,保护的范围是代码花括号内,花括号退出即会自动解锁
            // 可以手动解锁,从而控制互斥锁的细粒度
            std::unique_lock<std::mutex> locker( g_mutex );
            // 入队一个数据
            g_deque.push_front( count );
            // 提前解锁,缩小互斥锁的细粒度,只针对共享的队列数据进行同步保护
            locker.unlock(); 
    
            std::cout << "生产者    :我现在库存有 :" << count << std::endl;
            
            // 唤醒一个线程
            g_cond.notify_one();
            
            // 睡1秒
            std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
    
            // 库存自减少
            count--;
        } while( count > 0 );
        
        // 标记生产者打样了
        producer_is_running = false;
        
        // 唤醒所有消费线程
        g_cond.notify_all();
        
        std::cout << "生产者    : 我的库存没有了,我要打样了!"  << std::endl;
    }
    
    // 消费者线程函数
    void Consumer(int id)
    {
    // 消费者线程函数
    void Consumer(int id)
    {
        // 购买的货品编号
        int data = 0;
    
        do
        {
            // 智能锁,初始化后即加锁,保护的范围是代码花括号内,花括号退出即会自动解锁
            // 可以手动解锁,从而控制互斥锁的细粒度
            std::unique_lock<std::mutex> locker( g_mutex );
            
            // wait()函数会先调用互斥锁的unlock()函数,然后再将自己睡眠,在被唤醒后,又会继续持有锁,保护后面的队列操作
            // 必须使用unique_lock,不能使用lock_guard,因为lock_guard没有lock和unlock接口,而unique_lock则都提供了
            g_cond.wait(locker); 
            
            // 队列不为空
            if( !g_deque.empty() )
            {
                // 取出队列里最后一个数据
                data = g_deque.back();
                
                // 删除队列里最后一个数据
                g_deque.pop_back();
                
                // 提前解锁,缩小互斥锁的细粒度,只针对共享的队列数据进行同步保护
                locker.unlock(); 
    
                std::cout << "消费者[" << id << "] : 我抢到货的编号是 :" << data << std::endl;
            }
            // 队列为空
            else
            {
                locker.unlock();
            }
        
        } while( producer_is_running );
        
        std::cout << "消费者[" << id << "] :卖家没有货打样了,真可惜,下次再来抢!"  << std::endl;
    }
    }
    
    int main(void)
    {
        std::cout << "1 producer start ..." << std::endl;
        std::thread producer( Producer );
    
        std::cout << "5 consumer start ..." << std::endl;
        std::thread consumer[ 5 ];
        for(int i = 0; i < 5; i++)
        {
            consumer[i] = std::thread(Consumer, i + 1);
        }
    
        producer.join();
    
        for(int i = 0; i < 5; i++)
        {
            consumer[i].join();
        }
    
        std::cout << "All threads joined." << std::endl;
    
        return 0;
    }
    
    复制代码

    条件变量+互斥锁运行结果

    [root@lincoding condition]# g++ -std=c++0x -pthread -D_GLIBCXX_USE_NANOSLEEP main.cpp -o  main
    [root@lincoding condition]# 
    [root@lincoding condition]# ./main 
    1 producer start ...
    5 consumer start ...
    生产者    :我现在库存有 :8
    消费者[4] : 我抢到货的编号是 :8
    生产者    :我现在库存有 :7
    消费者[2] : 我抢到货的编号是 :7
    生产者    :我现在库存有 :6
    消费者[3] : 我抢到货的编号是 :6
    生产者    :我现在库存有 :5
    消费者[5] : 我抢到货的编号是 :5
    生产者    :我现在库存有 :4
    消费者[1] : 我抢到货的编号是 :4
    生产者    :我现在库存有 :3
    消费者[4] : 我抢到货的编号是 :3
    生产者    :我现在库存有 :2
    消费者[2] : 我抢到货的编号是 :2
    生产者    :我现在库存有 :1
    消费者[3] : 我抢到货的编号是 :1
    生产者    : 我的库存没有了,我要打样了!
    消费者[5] :卖家没有货打样了,真可惜,下次再来抢!
    消费者[1] :卖家没有货打样了,真可惜,下次再来抢!
    消费者[4] :卖家没有货打样了,真可惜,下次再来抢!
    消费者[2] :卖家没有货打样了,真可惜,下次再来抢!
    消费者[3] :卖家没有货打样了,真可惜,下次再来抢!
    All threads joined.
    复制代码

    CPU开销非常的小

    [root@lincoding ~]# ps aux | grep -v grep  |grep main
    USER        PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
    root      73838  0.0  0.1 141068  1256 pts/1    Sl+  19:54   0:00 ./main
    
    复制代码

    总结

    在不确定生产者的生产速度是快还是慢的场景里,不能只使用互斥锁保护共享的数据,这样会对CPU的性能开销非常大,可以使用互斥锁+条件变量的方式,当生产者线程生产了一个数据,就唤醒消费者线程进行消费,避免一些无用功的性能开销。

    ============ End

  • 相关阅读:
    Aerospike系列:4:简单的增删改查aql
    Aerospike系列:3:aerospike特点分析
    MySQL事物系列:2:事物的实现
    MySQL事物系列:1:事物简介
    MySQL 源码系列:1:窥探篇
    MySQL 内存和CPU优化相关的参数
    Aerospike系列:2:商业版和社区版的比较
    Aerospike系列:1:安装
    MDX Cookbook 08
    MDX Cookbook 07
  • 原文地址:https://www.cnblogs.com/lsgxeva/p/13290169.html
Copyright © 2011-2022 走看看