zoukankan      html  css  js  c++  java
  • 无锁队列的实现

    锁是高性能程序的杀手,但是为了保证数据的一致性,在多线程的应用环境下又不得不加锁。但是在某些特殊的场景下, 是可以通过优化数据结构来达到无锁的目的。那么我们就来看一下如何实现一个无锁队列。

    队列:众所周知,就是先进先出。 出队列的时候从队列头取出一个结点;入队列的时候,将结点添加到队列尾部。当多线程同时操作一个队列读写时,显然就需要加锁。但是在单读单写的这种多线程应用时,是可以做到无锁的。直接上代码

    #ifndef _QUEUE_H_
    #define _QUEUE_H_
    
    template<class T>
    class node
    { 
        public:
            T* pdata;
            node* next;    
    };
    
    
    
    template<class T>
    class queue
    {
        private:
            node<T> *front;
            node<T> *tear;
        public:
            queue();
            ~queue();
            bool isempty();
            T* pop();
            void push(T *p);
    };
    
    
    
    template<class T>
    queue<T>::queue()
    {
        node<T> *pnode = new node<T>();
        front  = pnode;
        tear = pnode;
    }
    
    template<class T>
    queue<T>::~queue()
    {
        node<T> *cur = front;
        node<T> *next = NULL;
        while(cur)
        {
            next = cur->next;
            delete cur->pdata;
            delete cur;
            cur = next;
        }
    }
    
    template<class T>
    bool queue<T>::isempty()
    {
        if(front == tear)
            return true;
        return false;
    }
    
    
    
    template<class T>
    void queue<T>::push(T *p)
    {
        node<T> *pnode = new node<T>();
        tear->pdata = p;
        tear->next = pnode;
        tear = pnode;
    }
    
    
    template<class T>
    T* queue<T>::pop()
    {
        if(isempty())
            return NULL;
        node<T>* pnode = front;
        T* p = pnode->pdata;
        front = front->next;
        delete pnode;
        return p;
    }
    
    #endif

    原理其实很简单,就是在队列的末尾添加一个空节点。这样在队列非空的情况下,front结点与tear结点中间至少间隔一个结点。那么当两个线程同时插入与取出结点时,就不会存在同时操作front与tear的情况,从而保证不会出现race condition

    下面是测试代码:

    #include <stdio.h>
    #include <pthread.h>
    #include <sys/time.h>
    #include "queue.h"
    #define MAX 100000
    
    void* fun1(void *p)
    {
        queue<int> *q = (queue<int>*)p;
        int i = 0;
        while(1)
        {
            int *tmp = q->pop();
            if(tmp)
            {
                if(*tmp != i++)
                   printf("err
    ");
                delete tmp;
            }
            if(i == MAX)
                break;
        }
    }
    
    void* fun2(void *p)
    {
        queue<int> *q = (queue<int>*)p;
        int i = 0;
        while(i < MAX)
        {
            int *tmp = new int(i++);
            q->push(tmp);
        }
    }
    
    int main()
    {
        queue<int> q;
        struct  timeval tv1;
        struct  timeval tv2;
        pthread_t t1,t2;
        gettimeofday(&tv1,NULL);
        pthread_create(&t1,NULL ,fun1 ,&q);
        pthread_create(&t2,NULL, fun2 ,&q );
        pthread_join(t1,NULL);
        pthread_join(t2,NULL);
    
        gettimeofday(&tv2,NULL);
        long delta = tv2.tv_sec*1000000+tv2.tv_usec - ( tv1.tv_sec*1000000+tv1.tv_usec) ;
        printf("time : %lu
    ",delta);
        return 0;
    }

    在我的机器上,测试结果为327730us

    下面再给出加锁的版本,并使用相同的测试方法,进行对比

    #ifndef _QUEUE_H_
    #define _QUEUE_H_
    #include <pthread.h>
    
    template<class T>
    class node
    { 
        public:
            T* pdata;
            node* next;
            node(T* p1, node* p2):pdata(p1),next(p2){}
    };
    
    
    
    template<class T>
    class queue
    {
        private:
            node<T> *front;
            node<T> *tear;
            pthread_mutex_t mutex;
        public:
            queue();
            ~queue();
            bool isempty();
            T* pop();
            void push(T *p);
    };
    
    
    
    template<class T>
    queue<T>::queue()
    {
        front  = NULL;
        tear =  NULL;
        pthread_mutex_init(&mutex,NULL);
    }
    
    template<class T>
    queue<T>::~queue()
    {
        node<T> *cur = front;
        node<T> *next = NULL;
        while(cur)
        {
            next = cur->next;
            delete cur->pdata;
            delete cur;
            cur = next;
        }
    }
    
    
    
    
    template<class T>
    void queue<T>::push(T *p)
    {
        node<T> *pnode = new node<T>(p,NULL);
        pthread_mutex_lock(&mutex); 
        if(front == NULL)
        { 
            front = pnode;
            tear = pnode;
        }
        else
        {
            tear->next = pnode;
            tear = pnode;
        }
        pthread_mutex_unlock(&mutex);
    }
    
    
    template<class T>
    T* queue<T>::pop()
    {
        T * pdata = NULL;
        node<T> *pnode = NULL;
        pthread_mutex_lock(&mutex); 
        if(front == NULL)
        {
            pthread_mutex_unlock(&mutex);
            return NULL;
        }
        pnode = front;
        if(front == tear)
            tear = NULL;
        front = front->next;
        pthread_mutex_unlock(&mutex);
        pdata = pnode->pdata;
        delete pnode;
        return pdata;
    }
    
    #endif

    加锁的版本,测试结果为497987us。

    可见,加锁版本所耗时间,差不多为无锁版本的1.5倍以上。

  • 相关阅读:
    What is Split Brain in Oracle Clusterware and Real Application Cluster (文档 ID 1425586.1)
    Oracle Grid Infrastructure: Understanding Split-Brain Node Eviction (文档 ID 1546004.1)
    代理模式和装饰者模式区别
    偏向锁、轻量级锁、重量级锁
    理解HTTP幂等性
    Java8 lambda表达式10个示例
    IDEA debug断点调试技巧
    【1】【leetcode-115 动态规划】 不同的子序列
    【leetcode-91 动态规划】 解码方法
    【leetcode-78 dfs+回溯】 子集
  • 原文地址:https://www.cnblogs.com/myd620/p/7114843.html
Copyright © 2011-2022 走看看