在编写多线程程序时,临界资源的处理常常需要互斥量、读写锁等来加以保护。这时需要考虑锁的粒度问题,粒度太粗,会出现很多线程阻塞等待相同的锁,源自并发性的改善微乎其微;如果锁的粒度太细,那么过多的锁开销会使系统性能受到影响,而且代码变得相当复杂。除此之外,还要细致的考虑各种dead lock问题。
因此,对于某些关键数据结构(临界资源),可以考虑使用Lock Free的实现手段。一个Lock Free的程序能够确保执行它的所有线程至少有一个能够继续往下执行,从而免疫了死锁等问题。Lock Free算法需要对应的原子操作加以支持,比如CAS(compare-and-swap)及其变种。CAS实现的逻辑如下:
bool CAS(inptr_t* addr, intptr_t oldv, intptr_t newv)
atomically{
if(*addr == oldv)
{
*addr = newv;
return true;
}
else
{
return false
}
}
接下来给出有锁和无锁的栈实现,以及最后的性能测试数据。
有锁实现的栈:
lock_stack.h
1 #ifndef __LOCK_STACK_H
2 #define __LOCK_STACK_H
3 #include<pthread.h>
4
5 typedef int value_t;
6
7 struct cell
8 {
9 struct cell *next;
10 value_t value;
11 };
12
13 struct stack
14 {
15 struct cell *top;
16 pthread_mutex_t lock;
17 };
18
19 struct stack *stack_alloc(void);
20 void stack_push(struct stack *fp,struct cell *cl);
21 struct cell *stack_pop(struct stack *fp);
22
23 #endif
lock_stack.c
1 #include "lock_stack.h"
2 #include <pthread.h>
3 #include <stdlib.h>
4
5 struct stack *stack_alloc(void)
6 {
7 struct stack *fp;
8 if((fp = (struct stack*)malloc(sizeof(struct stack))) != NULL)
9 {
10 fp->top = NULL;
11 if(pthread_mutex_init(&fp->lock,NULL) != 0)
12 {
13 free(fp);
14 return NULL;
15 }
16 }
17 return fp;
18 }
19
20 void stack_push(struct stack *fp,struct cell *cl)
21 {
22 pthread_mutex_lock(&fp->lock);
23
24 cl->next = fp->top;
25 fp->top = cl;
26
27 pthread_mutex_unlock(&fp->lock);
28 }
29
30 struct cell *stack_pop(struct stack *fp)
31 {
32 cell *head;
33 pthread_mutex_lock(&fp->lock);
34
35 head = fp->top;
36 if(head == NULL)
37 {
38 pthread_mutex_unlock(&fp->lock);
39 return head;
40 }
41
42 fp->top = head->next;
43 pthread_mutex_unlock(&fp->lock);
44 }
无锁实现的栈:
lock_free_stack.h
1 #ifndef __LOCK_FREE_STACK_H
2 #define __LOCK_FREE_STACK_H
3 #include<pthread.h>
4
5 typedef int value_t;
6
7 struct cell
8 {
9 struct cell *next;
10 value_t value;
11 };
12
13 struct stack
14 {
15 struct cell *top;
16 };
17
18 struct stack *stack_alloc(void);
19 void stack_push(struct stack *fp,struct cell *cl);
20 struct cell *stack_pop(struct stack *fp);
21
22 #endif
lock_free_stack.c
1 #include "lock_free_stack.h"
2 #include <pthread.h>
3 #include <stdlib.h>
4
5 struct stack *stack_alloc(void)
6 {
7 struct stack *fp;
8 if((fp = (struct stack*)malloc(sizeof(struct stack))) != NULL)
9 {
10 fp->top = NULL;
11 }
12 return fp;
13 }
14
15 void stack_push(struct stack *fp,struct cell *cl)
16 {
17 do{
18 cl->next = fp->top;
19 }while(!__sync_bool_compare_and_swap(&fp->top,cl->next,cl));
20 }
21
22 struct cell *stack_pop(struct stack *fp)
23 {
24 cell *head,*next;
25
26 do
27 {
28 head = fp->top;
29 if(head == NULL)
30 {
31 break;
32 }
33 next = head->next;
34 }while(!__sync_bool_compare_and_swap(&fp->top,head,next));
35 return head;
36 }
主程序使用100个线程,每个线程对同一栈进行500000次随机的push/pop操作(具体进行某个操作rand确定),并记录了每个线程的执行时间,代码如下:
1 #ifdef __LOCK_VERSION
2 #include "lock_stack.h"
3 #endif
4 #ifdef __LOCK_FREE_VERSION
5 #include "lock_free_stack.h"
6 #endif
7 #include <stdlib.h>
8 #include <stdio.h>
9 #include <unistd.h>
10 #include <time.h>
11
12 void* thr_fn(void *fp)
13 {
14 time_t t1 = time(NULL);
15 for(int i = 0; i != 500000; i++)
16 {
17 int r = rand()%2;
18 struct cell *cl;
19
20 if(r == 0)
21 {
22 if((cl = (struct cell*)malloc(sizeof(struct cell))) != NULL)
23 {
24 stack_push((struct stack*)fp,cl);
25 }
26 printf("push!\n");
27 }
28 else
29 {
30 cl = stack_pop((struct stack*)fp);
31 if(cl != NULL)
32 {
33 //free(cl);
34 }
35 printf("pop!\n");
36 }
37 }
38
39 printf("cost time: %d\n",time(NULL)-t1);
40
41 return NULL;
42 }
43
44 int main()
45 {
46 struct stack *fp;
47 fp = stack_alloc();
48
49 pthread_t tid;
50 for(int i = 0; i < 100; i++)
51 {
52 pthread_create(&tid,NULL,thr_fn,(void *)fp);
53 }
54
55 sleep(1000);
56
57 return 0;
58 }
makefile文件如下:
1 lock_free:
2 g++ lock_free_stack.c main.c -lpthread -D__LOCK_FREE_VERSION -march=nocona -o lock_free
3 lock:
4 g++ lock_stack.c main.c -lpthread -D__LOCK_VERSION -o lock
5
6 clean:
7 rm lock lock_free
得到最终的测试数据如下所示:
相比较而言,在性能上lock_free_stack有微弱的优势。
注:
1)大家在看代码时可能已经注意到了,main.c 33行free()函数被注释掉了,这是有意为之,在主动进行内存管理时,Lock Free结构会遇到ABA问题,需要CAS2配合Sequence Number加以解决。感兴趣的读者可以继续阅读文后的参考文献;
2)主线程需要等待所有的子线程执行完毕,这里不好使用pthread_join,本文作者采用取巧的办法,调用sleep函数进行等待,具体等待时间可能和测试环境相关;
3)__sync_bool_compare_and_swap
是由GCC提供的内置函数(GCC 4.1或更高版本),编译时需要指定-march参数;
参考资料:
[1]搜狗实验室 C10K与高性能程序续篇 http://www.sogou.com/labs/report/4-2.pdf
[2]W.Richard Stevens Advanced Programming in the Unix Environment 11.6节 线程同步
[3]设计不适用互斥锁的并发数据结构 http://www.ibm.com/developerworks/cn/aix/library/au-multithreaded_structures2/
[4]解决了“undefined reference to `__sync_bool_compare_and_swap_4”的编译错误 http://raylinn.iteye.com/blog/347323
[5]compare_and_swap http://en.wikipedia.org/wiki/Compare-and-swap