zoukankan      html  css  js  c++  java
  • 无锁编程实战演练

    版权声明:本文为博主原创文章。欢迎转载。 https://blog.csdn.net/hzhsan/article/details/25837189

    前段时间研究过一阵子无锁化编程。

    刚写了几个简单的程序,来验证了下自己学到的一些概念。

    測试场景:假设有一个应用:如今有一个全局变量,用来计数,再创建10个线程并发运行,每个线程中循环对这个全局变量进行++操作(i++)。循环加2000000次。

    所以非常easy知道,这必定会涉及到并发相互排斥操作。

    以下通过三种方式来实现这样的并发操作。并对照出其在效率上的不同之处。

    这里先贴上代码。共5个文件:2个用于做时间统计的文件:timer.h  timer.cpp。这两个文件是暂时封装的,仅仅用来计时。能够不必细看。

     timer.h

    #ifndef TIMER_H
    #define TIMER_H
    
    #include <sys/time.h>
    class Timer
    {
    public:	
    	Timer();
    	// 開始计时时间
    	void Start();
    	// 终止计时时间
    	void Stop();
    	// 又一次设定
    	void Reset();
    	// 耗时时间
    	void Cost_time();
    private:
    	struct timeval t1;
    	struct timeval t2;
    	bool b1,b2;
    };
    #endif


    timer.cpp

    #include "timer.h"
    #include <stdio.h>
    
    Timer::Timer()
    {
    	b1 = false;
    	b2 = false;
    }
    void Timer::Start()
    {
    	gettimeofday(&t1,NULL);  
    	b1 = true;
    	b2 = false;
    }
    
    void Timer::Stop()
    {	
    	if (b1 == true)
    	{
    		gettimeofday(&t2,NULL);  
    		b2 = true;
    	}
    }
    
    void Timer::Reset()
    {	
    	b1 = false;
    	b2 = false;
    }
    
    void Timer::Cost_time()
    {
    	if (b1 == false)
    	{
    		printf("计时出错,应该先运行Start(),然后运行Stop(),再来运行Cost_time()");
    		return ;
    	}
    	else if (b2 == false)
    	{
    		printf("计时出错。应该运行完Stop(),再来运行Cost_time()");
    		return ;
    	}
    	else
    	{
    		int usec,sec;
    		bool borrow = false;
    		if (t2.tv_usec > t1.tv_usec)
    		{
    			usec = t2.tv_usec - t1.tv_usec;
    		}
    		else
    		{
    			borrow = true;
    			usec = t2.tv_usec+1000000 - t1.tv_usec;
    		}
    
    		if (borrow)
    		{
    			sec = t2.tv_sec-1 - t1.tv_sec;
    		}
    		else
    		{
    			sec = t2.tv_sec - t1.tv_sec;
    		}
    		printf("花费时间:%d秒 %d微秒
    ",sec,usec);
    	}
    }
    
    


    传统相互排斥量加锁方式 lock.cpp

    #include <stdio.h>
    #include <stdlib.h>
    #include <pthread.h>
    #include <time.h>
    #include "timer.h"
    
    pthread_mutex_t mutex_lock;
    static volatile int count = 0;
    void *test_func(void *arg)
    {
            int i = 0;
            for(i = 0; i < 2000000; i++)
    		{
                    pthread_mutex_lock(&mutex_lock);
                    count++;
                    pthread_mutex_unlock(&mutex_lock);
            }
            return NULL;
    }
    
    int main(int argc, const char *argv[])
    {
    	Timer timer; // 为了计时,暂时封装的一个类Timer。

    timer.Start(); // 计时開始 pthread_mutex_init(&mutex_lock, NULL); pthread_t thread_ids[10]; int i = 0; for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++) { pthread_create(&thread_ids[i], NULL, test_func, NULL); } for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++) { pthread_join(thread_ids[i], NULL); } timer.Stop();// 计时结束 timer.Cost_time();// 打印花费时间 printf("结果:count = %d ",count); return 0; }


    no lock 不加锁的形式 nolock.cpp

    #include <stdio.h>
    #include <stdlib.h>
    #include <pthread.h>
    #include <unistd.h>
    #include <time.h>
    #include "timer.h"
    
    int mutex = 0;
    int lock = 0;
    int unlock = 1;
    
    static volatile int count = 0;
    void *test_func(void *arg)
    {
            int i = 0;
            for(i = 0; i < 2000000; i++)
    	{
    		while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(100000);
    		 count++;
    		 __sync_bool_compare_and_swap (&mutex, unlock, 0);
            }
            return NULL;
    }
    
    int main(int argc, const char *argv[])
    {
    	Timer timer;
    	timer.Start();
    	pthread_t thread_ids[10];
    	int i = 0;
    
    	for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)
    	{
    			pthread_create(&thread_ids[i], NULL, test_func, NULL);
    	}
    
    	for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)
    	{
    			pthread_join(thread_ids[i], NULL);
    	}
    
    	timer.Stop();
    	timer.Cost_time();
    	printf("结果:count = %d
    ",count);
    
    	return 0;
    }
    
    


    原子函数进行统计方式 atomic.cpp

    #include <stdio.h>
    #include <stdlib.h>
    #include <pthread.h>
    #include <unistd.h>
    #include <time.h>
    #include "timer.h"
    
    static volatile int count = 0;
    void *test_func(void *arg)
    {
            int i = 0;
            for(i = 0; i < 2000000; i++)
            {
    	        __sync_fetch_and_add(&count, 1);
            }
            return NULL;
    }
    
    int main(int argc, const char *argv[])
    {
    	Timer timer;
    	timer.Start();
    	pthread_t thread_ids[10];
    	int i = 0;
    
    	for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++){
    			pthread_create(&thread_ids[i], NULL, test_func, NULL);
    	}
    
    	for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++){
    			pthread_join(thread_ids[i], NULL);
    	}
    
    	timer.Stop();
    	timer.Cost_time();
    	printf("结果:count = %d
    ",count);
        return 0;
    }
    
    


    #################################################################3

    好,代码粘贴完成。以下进入測试环节:

    编译:

    [adapter@ZHEJIANG test3]$ g++ lock.cpp ./timer.cpp  -lpthread -o lock ;
    [adapter@ZHEJIANG test3]$ g++ nolock.cpp ./timer.cpp  -lpthread -o nolock ;
    [adapter@ZHEJIANG test3]$  g++ atomic.cpp ./timer.cpp  -lpthread -o atomic ;

    每个线程循环加2000000次。


    第一组測验
    [adapter@ZHEJIANG test3]$ ./lock
    花费时间:3秒 109807微秒
    结果:count = 20000000
    [adapter@ZHEJIANG test3]$ ./nolock
    花费时间:7秒 595784微秒
    结果:count = 20000000
    [adapter@ZHEJIANG test3]$ ./atomic
    花费时间:0秒 381022微秒
    结果:count = 20000000

    结论:
    能够看出,原子操作函数的速度是最快的,其它两种方式根本就没法比。

    而无锁操作是在原子操作函数的基础上形成的。
    为什么无锁操作的效率会这么低?

    假设效率低的话。那还有什么意义,为什么如今大家都提倡无锁编程呢?为什么?咱先不
    解释。先用数据说话。

    第二组測验:
    原子操作代码不变,加锁操作代码不变。修改一下无锁操作的代码。
    将例如以下代码更改
    while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ));
    更改后:while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) )) usleep(1);
    让他睡一微秒。
    为什么要这样改代码?这样启不是会更慢?你的推測是不无道理的,可是一个不歇息的人干的活未必比有歇息的人干的活多。

    [adapter@ZHEJIANG test3]$ ./lock
    花费时间:2秒 970773微秒
    结果:count = 20000000
    [adapter@ZHEJIANG test3]$ ./nolock
    花费时间:0秒 685404微秒
    结果:count = 20000000
    [adapter@ZHEJIANG test3]$ ./atomic
    花费时间:0秒 380675微秒
    结果:count = 20000000

    结论:
    不用明说。大家看到的结果是不是非常诧异?是不是!有木有!怎么会是这样。无锁加上usleep(1),睡一会,反而会变得这么快。
    虽和原子操作相比次了一点。但已经甩开有锁同步好几条街了,无锁比有锁快是应该的,但为什么睡一会会更快,不睡就比有锁
    还慢那么多呢?怎么回事。

    是不是这个測试的时候cpu出现了不稳定的事情。
    那好,那再測试几次。
    [adapter@ZHEJIANG test3]$ ./nolock
    花费时间:0秒 684938微秒
    结果:count = 20000000
    [adapter@ZHEJIANG test3]$ ./nolock
    花费时间:0秒 686039微秒
    结果:count = 20000000
    [adapter@ZHEJIANG test3]$ ./nolock
    花费时间:0秒 685928微秒
    结果:count = 20000000

    如今总没话可说了,这是事实。但为什么,我也不会解释。

    非常好奇,为什么越歇息,效率越高。电脑是机器,它可不是人。

    怎么会这样?
    那我就让它多歇息一会:
     while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(10); //之前是1,如今改成10了。
     以下就再单独对照一个nolock无锁方式。


    [adapter@ZHEJIANG test3]$ ./nolock //usleep(1);
    花费时间:0秒 686039微秒
    结果:count = 20000000
    [adapter@ZHEJIANG test3]$ ./nolock //usleep(10);
    花费时间:0秒 680307微秒
    结果:count = 20000000
    nolock,结果usleep(10)竟然比uleep(1)还要快一点。

    那么这样呢:
    while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(100); //之前是10,如今改成100了。
    [adapter@ZHEJIANG test3]$ ./nolock //usleep(100)
    花费时间:0秒 661935微秒
    结果:count = 20000000
    还是睡的越久,效率越高。

    那我再试一下usleep(1000)
    while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(1000); //之前是100,如今改成1000了。


    [adapter@ZHEJIANG test3]$ ./nolock // usleep(1000);
    花费时间:0秒 652411微秒
    结果:count = 20000000
    还是睡的越久,效率越高。

    那我再试一下usleep(10000)
    while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(10000); //之前是1000,如今改成10000了。
    [adapter@ZHEJIANG test3]$ ./nolock
    花费时间:0秒 626267微秒
    结果:count = 20000000
    还是睡的越久,效率越高。

    那我再试一下usleep(100000)
    while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(100000); //之前是10000。如今改成100000了,也就是0.1秒。
    [adapter@ZHEJIANG test3]$ ./nolock
    花费时间:0秒 942445微秒
    结果:count = 20000000
    哦,如今開始速度慢了。

    运行环境:
    gcc版本号信息:
    [adapter@ZHEJIANG test3]$ g++ -v
    Using built-in specs.
    Target: x86_64-redhat-linux
    gcc version 4.4.5 20110214 (Red Hat 4.4.5-6) (GCC)

    cpu信息:
    [adapter@ZHEJIANG test3]$ cat /proc/cpuinfo | grep name | cut -f2 -d: | uniq -c
          4  Intel(R) Core(TM) i5-3470 CPU @ 3.20GHz

    通过编程測试及測试得出结论:
    1、假设是想用全局变量来做统计操作。而又不得不考虑多线程间的相互排斥訪问的话,最好使用编译器支持的原子操作函数。
    再满足相互排斥訪问的前提下,编程最简单,效率最高。
    2、lock-free。无锁编程方式确实能够比传统加锁方式效率高,经上面測试能够发现,能够快到5倍左右。

    所以在高并发程序中
    採用无锁编程的方式能够进一步提高程序效率。


    3、可是,得对无锁方式有足够熟悉的了解,不然效率反而会更低。并且easy出错。


    4、没想明确的疑问:为什么上面的循环检測时。加uleep比不加。效率更高。为什么在一定程度上。usleep越久效率越高?
    请高手路过的时候,为小弟解答一下。谢谢。

  • 相关阅读:
    Lucene.Net 2.3.1开发介绍 —— 二、分词(一)
    控制‘控制台应用程序’的关闭操作
    详解for循环(各种用法)
    敏捷软件开发
    Sql Server的一些知识点
    在SharePoint 2010 中配置Remote Blob Storage FILESTREAM Provider
    使用LotusScript操作Lotus Notes RTF域
    JOpt Simple 4.5 发布,命令行解析器
    John the Ripper 1.8.0 发布,密码破解工具
    PacketFence ZEN 4.0.1 发布,网络接入控制
  • 原文地址:https://www.cnblogs.com/mqxnongmin/p/10746726.html
Copyright © 2011-2022 走看看