锁是高性能程序的杀手,但是为了保证数据的一致性,在多线程的应用环境下又不得不加锁。但是在某些特殊的场景下, 是可以通过优化数据结构来达到无锁的目的。那么我们就来看一下如何实现一个无锁队列。
队列:众所周知,就是先进先出。 出队列的时候从队列头取出一个结点;入队列的时候,将结点添加到队列尾部。当多线程同时操作一个队列读写时,显然就需要加锁。但是在单读单写的这种多线程应用时,是可以做到无锁的。直接上代码
#ifndef _QUEUE_H_ #define _QUEUE_H_ template<class T> class node { public: T* pdata; node* next; }; template<class T> class queue { private: node<T> *front; node<T> *tear; public: queue(); ~queue(); bool isempty(); T* pop(); void push(T *p); }; template<class T> queue<T>::queue() { node<T> *pnode = new node<T>(); front = pnode; tear = pnode; } template<class T> queue<T>::~queue() { node<T> *cur = front; node<T> *next = NULL; while(cur) { next = cur->next; delete cur->pdata; delete cur; cur = next; } } template<class T> bool queue<T>::isempty() { if(front == tear) return true; return false; } template<class T> void queue<T>::push(T *p) { node<T> *pnode = new node<T>(); tear->pdata = p; tear->next = pnode; tear = pnode; } template<class T> T* queue<T>::pop() { if(isempty()) return NULL; node<T>* pnode = front; T* p = pnode->pdata; front = front->next; delete pnode; return p; } #endif
原理其实很简单,就是在队列的末尾添加一个空节点。这样在队列非空的情况下,front结点与tear结点中间至少间隔一个结点。那么当两个线程同时插入与取出结点时,就不会存在同时操作front与tear的情况,从而保证不会出现race condition
下面是测试代码:
#include <stdio.h> #include <pthread.h> #include <sys/time.h> #include "queue.h" #define MAX 100000 void* fun1(void *p) { queue<int> *q = (queue<int>*)p; int i = 0; while(1) { int *tmp = q->pop(); if(tmp) { if(*tmp != i++) printf("err "); delete tmp; } if(i == MAX) break; } } void* fun2(void *p) { queue<int> *q = (queue<int>*)p; int i = 0; while(i < MAX) { int *tmp = new int(i++); q->push(tmp); } } int main() { queue<int> q; struct timeval tv1; struct timeval tv2; pthread_t t1,t2; gettimeofday(&tv1,NULL); pthread_create(&t1,NULL ,fun1 ,&q); pthread_create(&t2,NULL, fun2 ,&q ); pthread_join(t1,NULL); pthread_join(t2,NULL); gettimeofday(&tv2,NULL); long delta = tv2.tv_sec*1000000+tv2.tv_usec - ( tv1.tv_sec*1000000+tv1.tv_usec) ; printf("time : %lu ",delta); return 0; }
在我的机器上,测试结果为327730us
下面再给出加锁的版本,并使用相同的测试方法,进行对比
#ifndef _QUEUE_H_ #define _QUEUE_H_ #include <pthread.h> template<class T> class node { public: T* pdata; node* next; node(T* p1, node* p2):pdata(p1),next(p2){} }; template<class T> class queue { private: node<T> *front; node<T> *tear; pthread_mutex_t mutex; public: queue(); ~queue(); bool isempty(); T* pop(); void push(T *p); }; template<class T> queue<T>::queue() { front = NULL; tear = NULL; pthread_mutex_init(&mutex,NULL); } template<class T> queue<T>::~queue() { node<T> *cur = front; node<T> *next = NULL; while(cur) { next = cur->next; delete cur->pdata; delete cur; cur = next; } } template<class T> void queue<T>::push(T *p) { node<T> *pnode = new node<T>(p,NULL); pthread_mutex_lock(&mutex); if(front == NULL) { front = pnode; tear = pnode; } else { tear->next = pnode; tear = pnode; } pthread_mutex_unlock(&mutex); } template<class T> T* queue<T>::pop() { T * pdata = NULL; node<T> *pnode = NULL; pthread_mutex_lock(&mutex); if(front == NULL) { pthread_mutex_unlock(&mutex); return NULL; } pnode = front; if(front == tear) tear = NULL; front = front->next; pthread_mutex_unlock(&mutex); pdata = pnode->pdata; delete pnode; return pdata; } #endif
加锁的版本,测试结果为497987us。
可见,加锁版本所耗时间,差不多为无锁版本的1.5倍以上。