zoukankan      html  css  js  c++  java
  • 无锁编程实战演练

    版权声明:本文为博主原创文章。欢迎转载。 https://blog.csdn.net/hzhsan/article/details/25837189

    前段时间研究过一阵子无锁化编程。

    刚写了几个简单的程序,来验证了下自己学到的一些概念。

    測试场景:假设有一个应用:如今有一个全局变量,用来计数,再创建10个线程并发运行,每个线程中循环对这个全局变量进行++操作(i++)。循环加2000000次。

    所以非常easy知道,这必定会涉及到并发相互排斥操作。

    以下通过三种方式来实现这样的并发操作。并对照出其在效率上的不同之处。

    这里先贴上代码。共5个文件:2个用于做时间统计的文件:timer.h  timer.cpp。这两个文件是暂时封装的,仅仅用来计时。能够不必细看。

     timer.h

    #ifndef TIMER_H
    #define TIMER_H
    
    #include <sys/time.h>
    class Timer
    {
    public:	
    	Timer();
    	// 開始计时时间
    	void Start();
    	// 终止计时时间
    	void Stop();
    	// 又一次设定
    	void Reset();
    	// 耗时时间
    	void Cost_time();
    private:
    	struct timeval t1;
    	struct timeval t2;
    	bool b1,b2;
    };
    #endif


    timer.cpp

    #include "timer.h"
    #include <stdio.h>
    
    Timer::Timer()
    {
    	b1 = false;
    	b2 = false;
    }
    void Timer::Start()
    {
    	gettimeofday(&t1,NULL);  
    	b1 = true;
    	b2 = false;
    }
    
    void Timer::Stop()
    {	
    	if (b1 == true)
    	{
    		gettimeofday(&t2,NULL);  
    		b2 = true;
    	}
    }
    
    void Timer::Reset()
    {	
    	b1 = false;
    	b2 = false;
    }
    
    void Timer::Cost_time()
    {
    	if (b1 == false)
    	{
    		printf("计时出错,应该先运行Start(),然后运行Stop(),再来运行Cost_time()");
    		return ;
    	}
    	else if (b2 == false)
    	{
    		printf("计时出错。应该运行完Stop(),再来运行Cost_time()");
    		return ;
    	}
    	else
    	{
    		int usec,sec;
    		bool borrow = false;
    		if (t2.tv_usec > t1.tv_usec)
    		{
    			usec = t2.tv_usec - t1.tv_usec;
    		}
    		else
    		{
    			borrow = true;
    			usec = t2.tv_usec+1000000 - t1.tv_usec;
    		}
    
    		if (borrow)
    		{
    			sec = t2.tv_sec-1 - t1.tv_sec;
    		}
    		else
    		{
    			sec = t2.tv_sec - t1.tv_sec;
    		}
    		printf("花费时间:%d秒 %d微秒
    ",sec,usec);
    	}
    }
    
    


    传统相互排斥量加锁方式 lock.cpp

    #include <stdio.h>
    #include <stdlib.h>
    #include <pthread.h>
    #include <time.h>
    #include "timer.h"
    
    pthread_mutex_t mutex_lock;
    static volatile int count = 0;
    void *test_func(void *arg)
    {
            int i = 0;
            for(i = 0; i < 2000000; i++)
    		{
                    pthread_mutex_lock(&mutex_lock);
                    count++;
                    pthread_mutex_unlock(&mutex_lock);
            }
            return NULL;
    }
    
    int main(int argc, const char *argv[])
    {
    	Timer timer; // 为了计时,暂时封装的一个类Timer。

    timer.Start(); // 计时開始 pthread_mutex_init(&mutex_lock, NULL); pthread_t thread_ids[10]; int i = 0; for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++) { pthread_create(&thread_ids[i], NULL, test_func, NULL); } for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++) { pthread_join(thread_ids[i], NULL); } timer.Stop();// 计时结束 timer.Cost_time();// 打印花费时间 printf("结果:count = %d ",count); return 0; }


    no lock 不加锁的形式 nolock.cpp

    #include <stdio.h>
    #include <stdlib.h>
    #include <pthread.h>
    #include <unistd.h>
    #include <time.h>
    #include "timer.h"
    
    int mutex = 0;
    int lock = 0;
    int unlock = 1;
    
    static volatile int count = 0;
    void *test_func(void *arg)
    {
            int i = 0;
            for(i = 0; i < 2000000; i++)
    	{
    		while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(100000);
    		 count++;
    		 __sync_bool_compare_and_swap (&mutex, unlock, 0);
            }
            return NULL;
    }
    
    int main(int argc, const char *argv[])
    {
    	Timer timer;
    	timer.Start();
    	pthread_t thread_ids[10];
    	int i = 0;
    
    	for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)
    	{
    			pthread_create(&thread_ids[i], NULL, test_func, NULL);
    	}
    
    	for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)
    	{
    			pthread_join(thread_ids[i], NULL);
    	}
    
    	timer.Stop();
    	timer.Cost_time();
    	printf("结果:count = %d
    ",count);
    
    	return 0;
    }
    
    


    原子函数进行统计方式 atomic.cpp

    #include <stdio.h>
    #include <stdlib.h>
    #include <pthread.h>
    #include <unistd.h>
    #include <time.h>
    #include "timer.h"
    
    static volatile int count = 0;
    void *test_func(void *arg)
    {
            int i = 0;
            for(i = 0; i < 2000000; i++)
            {
    	        __sync_fetch_and_add(&count, 1);
            }
            return NULL;
    }
    
    int main(int argc, const char *argv[])
    {
    	Timer timer;
    	timer.Start();
    	pthread_t thread_ids[10];
    	int i = 0;
    
    	for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++){
    			pthread_create(&thread_ids[i], NULL, test_func, NULL);
    	}
    
    	for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++){
    			pthread_join(thread_ids[i], NULL);
    	}
    
    	timer.Stop();
    	timer.Cost_time();
    	printf("结果:count = %d
    ",count);
        return 0;
    }
    
    


    #################################################################3

    好,代码粘贴完成。以下进入測试环节:

    编译:

    [adapter@ZHEJIANG test3]$ g++ lock.cpp ./timer.cpp  -lpthread -o lock ;
    [adapter@ZHEJIANG test3]$ g++ nolock.cpp ./timer.cpp  -lpthread -o nolock ;
    [adapter@ZHEJIANG test3]$  g++ atomic.cpp ./timer.cpp  -lpthread -o atomic ;

    每个线程循环加2000000次。


    第一组測验
    [adapter@ZHEJIANG test3]$ ./lock
    花费时间:3秒 109807微秒
    结果:count = 20000000
    [adapter@ZHEJIANG test3]$ ./nolock
    花费时间:7秒 595784微秒
    结果:count = 20000000
    [adapter@ZHEJIANG test3]$ ./atomic
    花费时间:0秒 381022微秒
    结果:count = 20000000

    结论:
    能够看出,原子操作函数的速度是最快的,其它两种方式根本就没法比。

    而无锁操作是在原子操作函数的基础上形成的。
    为什么无锁操作的效率会这么低?

    假设效率低的话。那还有什么意义,为什么如今大家都提倡无锁编程呢?为什么?咱先不
    解释。先用数据说话。

    第二组測验:
    原子操作代码不变,加锁操作代码不变。修改一下无锁操作的代码。
    将例如以下代码更改
    while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ));
    更改后:while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) )) usleep(1);
    让他睡一微秒。
    为什么要这样改代码?这样启不是会更慢?你的推測是不无道理的,可是一个不歇息的人干的活未必比有歇息的人干的活多。

    [adapter@ZHEJIANG test3]$ ./lock
    花费时间:2秒 970773微秒
    结果:count = 20000000
    [adapter@ZHEJIANG test3]$ ./nolock
    花费时间:0秒 685404微秒
    结果:count = 20000000
    [adapter@ZHEJIANG test3]$ ./atomic
    花费时间:0秒 380675微秒
    结果:count = 20000000

    结论:
    不用明说。大家看到的结果是不是非常诧异?是不是!有木有!怎么会是这样。无锁加上usleep(1),睡一会,反而会变得这么快。
    虽和原子操作相比次了一点。但已经甩开有锁同步好几条街了,无锁比有锁快是应该的,但为什么睡一会会更快,不睡就比有锁
    还慢那么多呢?怎么回事。

    是不是这个測试的时候cpu出现了不稳定的事情。
    那好,那再測试几次。
    [adapter@ZHEJIANG test3]$ ./nolock
    花费时间:0秒 684938微秒
    结果:count = 20000000
    [adapter@ZHEJIANG test3]$ ./nolock
    花费时间:0秒 686039微秒
    结果:count = 20000000
    [adapter@ZHEJIANG test3]$ ./nolock
    花费时间:0秒 685928微秒
    结果:count = 20000000

    如今总没话可说了,这是事实。但为什么,我也不会解释。

    非常好奇,为什么越歇息,效率越高。电脑是机器,它可不是人。

    怎么会这样?
    那我就让它多歇息一会:
     while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(10); //之前是1,如今改成10了。
     以下就再单独对照一个nolock无锁方式。


    [adapter@ZHEJIANG test3]$ ./nolock //usleep(1);
    花费时间:0秒 686039微秒
    结果:count = 20000000
    [adapter@ZHEJIANG test3]$ ./nolock //usleep(10);
    花费时间:0秒 680307微秒
    结果:count = 20000000
    nolock,结果usleep(10)竟然比uleep(1)还要快一点。

    那么这样呢:
    while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(100); //之前是10,如今改成100了。
    [adapter@ZHEJIANG test3]$ ./nolock //usleep(100)
    花费时间:0秒 661935微秒
    结果:count = 20000000
    还是睡的越久,效率越高。

    那我再试一下usleep(1000)
    while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(1000); //之前是100,如今改成1000了。


    [adapter@ZHEJIANG test3]$ ./nolock // usleep(1000);
    花费时间:0秒 652411微秒
    结果:count = 20000000
    还是睡的越久,效率越高。

    那我再试一下usleep(10000)
    while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(10000); //之前是1000,如今改成10000了。
    [adapter@ZHEJIANG test3]$ ./nolock
    花费时间:0秒 626267微秒
    结果:count = 20000000
    还是睡的越久,效率越高。

    那我再试一下usleep(100000)
    while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(100000); //之前是10000。如今改成100000了,也就是0.1秒。
    [adapter@ZHEJIANG test3]$ ./nolock
    花费时间:0秒 942445微秒
    结果:count = 20000000
    哦,如今開始速度慢了。

    运行环境:
    gcc版本号信息:
    [adapter@ZHEJIANG test3]$ g++ -v
    Using built-in specs.
    Target: x86_64-redhat-linux
    gcc version 4.4.5 20110214 (Red Hat 4.4.5-6) (GCC)

    cpu信息:
    [adapter@ZHEJIANG test3]$ cat /proc/cpuinfo | grep name | cut -f2 -d: | uniq -c
          4  Intel(R) Core(TM) i5-3470 CPU @ 3.20GHz

    通过编程測试及測试得出结论:
    1、假设是想用全局变量来做统计操作。而又不得不考虑多线程间的相互排斥訪问的话,最好使用编译器支持的原子操作函数。
    再满足相互排斥訪问的前提下,编程最简单,效率最高。
    2、lock-free。无锁编程方式确实能够比传统加锁方式效率高,经上面測试能够发现,能够快到5倍左右。

    所以在高并发程序中
    採用无锁编程的方式能够进一步提高程序效率。


    3、可是,得对无锁方式有足够熟悉的了解,不然效率反而会更低。并且easy出错。


    4、没想明确的疑问:为什么上面的循环检測时。加uleep比不加。效率更高。为什么在一定程度上。usleep越久效率越高?
    请高手路过的时候,为小弟解答一下。谢谢。

  • 相关阅读:
    shell中使用echo命令改变输出显示样式
    Shell脚本报错unary operator expected
    shell运行报 too many arguments错误
    写shell,运行出错:syntax error near unexpected token `$’do ”
    shell 脚本执行,出现错误bad interpreter: No such file or directory
    Linux 基本命令学习笔记
    如何运行 O’Reilly 书 Python for Finance 的源代码
    IntelliJ 中配置 Anaconda
    Windows 10 中安装 Anaconda 3
    Windows 中安装的 Python 如何卸载
  • 原文地址:https://www.cnblogs.com/mqxnongmin/p/10746726.html
Copyright © 2011-2022 走看看