zoukankan      html  css  js  c++  java
  • 队列加锁无锁栈实现一例

    本篇文章个人在上海游玩的时候突然想到的...这两天就有想写几篇关于队列加锁的笔记,所以回家到之后就奋笔疾书的写出来发布了

        

    一、何谓无锁队列

        无锁队列,望文生义,即不需要加锁的队列;之所以不需要额定加锁,是因为其本身已经是线程安全的。

        二、为什么要在队列中集成线程安全的机制?

        这里我想采取对比的方法来讲述。有锁队列,这可能是最简略的一种队列了,比如我们在多线程情况下使用标准STD的deque,那么毫无疑问需要对其加锁。加锁其实是将协调过程交给了操作系统来管理,但无锁队列却是在CPU层面就做到了协调,所以在效率上会高很多。更具体的解释请拜见http://www.searchtb.com/2012/10/introduction_to_disruptor.html

        三、如何实现?

        这里主要是采取ACS。

        1. 定义队列。这里由于测试的缘故,队列节点内的数据比较简略。

        

    /* ACS node define. */
    typedef struct acs_node_t {
        std::string id;
        int index;
        struct acs_node_t* next;
    } acs_node_t;
    
    /* ACS deque define. */
    typedef struct acs_deque_t {
        struct acs_node_t head;
        struct acs_node_t* tail;
    } acs_deque_t;

        

        2. 定义接口。这里定义了队列初始化,入队列以及出队列三个接口。

        

    void acs_deque_init(acs_deque_t* deq);
    int acs_deque_empty(acs_deque_t* deq);
    void acs_deque_push(acs_deque_t* deq, acs_node_t* node);
    acs_node_t* acs_deque_pop(acs_deque_t* deq);

        

        3.上面是接口的实现。

        

    void acs_deque_init(acs_deque_t* deq)
    {
        if (deq) {
            deq->tail = &deq->head;
            deq->head.next = NULL;
        }
    }
    
    int acs_deque_empty(acs_deque_t* deq)
    {
        if (!deq)
            return 1;
        return deq->head.next == NULL;
    }
    
    void acs_deque_push(acs_deque_t* deq, acs_node_t* node)
    {
        acs_node_t* q = NULL;
        
        do {
            q = deq->tail;
        } while (InterlockedCompareExchangePointer((PVOID*)&q->next, 0, node) != q->next);
    
        InterlockedCompareExchangePointer((PVOID*)&deq->tail, q, node);
    }
    
    acs_node_t* acs_deque_pop(acs_deque_t* deq)
    {
        acs_node_t* q = NULL;
        
        do {
            q = deq->head.next;
            if (q == NULL)
                return NULL;
        } while (InterlockedCompareExchangePointer((PVOID*)&deq->head.next, q, q->next) != deq->head.next);
        
        return q;
    }

        接口采取了Windows的ACS函数,当然你也可以将其更改为linux版本的ACS函数。

        4. 其他代码为测试代码,全体代码为

        

        每日一道理
    有些冷,有些凉,心中有些无奈,我一个人走在黑夜中,有些颤抖,身体瑟缩着,新也在抖动着,我看不清前方的路,何去何从,感觉迷茫,胸口有些闷,我环视了一下周围,无人的街头显得冷清,感到整个世界都要将我放弃。脚步彷徨之间,泪早已滴下……
    #include <stdio.h>
    
    #include <string>
    
    #define WIN32_LEAN_AND_MEAN
    #include <Windows.h>
    
    #include <deque>
    
    static const int knThreadCount = 4;
    static const int knMaxNodeCount = 50000;
    static const int knPopedCount = 5000;
    
    /* ACS node define. */
    typedef struct acs_node_t {
        std::string id;
        int index;
        struct acs_node_t* next;
    } acs_node_t;
    
    /* ACS deque define. */
    typedef struct acs_deque_t {
        struct acs_node_t head;
        struct acs_node_t* tail;
    } acs_deque_t;
    
    typedef struct DequeData {
        std::string id;
        int index;
    } DequeData;
    
    typedef std::deque<DequeData*> StdDeque;
    
    CRITICAL_SECTION g_stdcs;
    
    int g_aes_cost_time = 0;
    int g_std_cost_time = 0;
    
    class HRTimer {
    public:
        HRTimer();
        ~HRTimer() {}
    
        double GetFrequency(void);
        void StartTimer(void);
        double StopTimer(void);
    
    private:
        LARGE_INTEGER start_;
        LARGE_INTEGER stop_;
        double frequency_;
    };
    
    HRTimer::HRTimer()
        : start_(),
        stop_(),
        frequency_(0.f)
    {
        frequency_ = this->GetFrequency();
    }
    
    double HRTimer::GetFrequency(void)
    {
        LARGE_INTEGER proc_freq;
        if (!::QueryPerformanceFrequency(&proc_freq))
            return 0.f;
        return proc_freq.QuadPart;
    }
    
    void HRTimer::StartTimer(void)
    {
        HANDLE curth = ::GetCurrentThread();
        DWORD_PTR oldmask = ::SetThreadAffinityMask(curth, 0);
        ::QueryPerformanceCounter(&start_);
        ::SetThreadAffinityMask(curth, oldmask);
    }
    
    double HRTimer::StopTimer(void)
    {
        HANDLE curth = ::GetCurrentThread();
        DWORD_PTR oldmask = ::SetThreadAffinityMask(curth, 0);
        ::QueryPerformanceCounter(&stop_);
        ::SetThreadAffinityMask(curth, oldmask);
        return ((stop_.QuadPart - start_.QuadPart) / frequency_) * 1000;
    }
    
    class AutoHRTimer {
    public:
        AutoHRTimer(HRTimer& hrt, const char* name);
        ~AutoHRTimer();
    
    private:
        HRTimer& hrt_;
        const char* name_;
    };
    
    AutoHRTimer::AutoHRTimer(HRTimer& hrt, const char* name)
        : hrt_(hrt),
        name_(name)
    {
        hrt_.StartTimer();
    }
    
    AutoHRTimer::~AutoHRTimer()
    {
        double diff = hrt_.StopTimer();
        fprintf(stdout, "%s cost time %f ms\n", name_, diff);
    }
    
    HRTimer g_hrtimer;
    
    void acs_deque_init(acs_deque_t* deq);
    int acs_deque_empty(acs_deque_t* deq);
    void acs_deque_push(acs_deque_t* deq, acs_node_t* node);
    acs_node_t* acs_deque_pop(acs_deque_t* deq);
    
    void acs_deque_init(acs_deque_t* deq)
    {
        if (deq) {
            deq->tail = &deq->head;
            deq->head.next = NULL;
        }
    }
    
    int acs_deque_empty(acs_deque_t* deq)
    {
        if (!deq)
            return 1;
        return deq->head.next == NULL;
    }
    
    void acs_deque_push(acs_deque_t* deq, acs_node_t* node)
    {
        acs_node_t* q = NULL;
        
        do {
            q = deq->tail;
        } while (InterlockedCompareExchangePointer((PVOID*)&q->next, 0, node) != q->next);
    
        InterlockedCompareExchangePointer((PVOID*)&deq->tail, q, node);
    }
    
    acs_node_t* acs_deque_pop(acs_deque_t* deq)
    {
        acs_node_t* q = NULL;
        
        do {
            q = deq->head.next;
            if (q == NULL)
                return NULL;
        } while (InterlockedCompareExchangePointer((PVOID*)&deq->head.next, q, q->next) != deq->head.next);
        
        return q;
    }
    
    static DWORD AesThreadFunc(void* arg)
    {
        acs_deque_t* ad = (acs_deque_t*)arg;
    
        for (int i = 0; i < knMaxNodeCount; ++i) {
            acs_node_t* an = new acs_node_t;
            an->id = "randid_";
            an->id.push_back((i % 10) + '0');
            an->index = i;
            an->next = NULL;
            acs_deque_push(ad, an);
        }  // for
    
        return 0;
    }
    
    static void TestAcsDeque()
    {
        acs_deque_t ad;
        acs_node_t* poped_node = NULL;
        HANDLE th[knThreadCount];
    
        {
            AutoHRTimer ahr(g_hrtimer, "ACS push 50000");
            acs_deque_init(&ad);
            for (int i = 0; i < knThreadCount; ++i) {
                th[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)AesThreadFunc, &ad, 0, NULL);
            }  // for
            ::WaitForMultipleObjects(knThreadCount, th, TRUE, INFINITE);
        }
    
        {
            // Test pop.
            AutoHRTimer ahr(g_hrtimer, "ACS pop 5000");
            for (int i = 0; i < knPopedCount; ++i) {
                poped_node = acs_deque_pop(&ad);
                delete poped_node;
            }
        }
    
        {
            AutoHRTimer ahr(g_hrtimer, "ACS free 45000");
            acs_node_t* cur = ad.head.next;
            while (cur != NULL) {
                acs_node_t* tmp = cur;
                cur = cur->next;
                delete tmp;
            }
        }
    }
    
    static DWORD StdThreadFunc(void* arg)
    {
        StdDeque* deq_list = (StdDeque*)arg;
    
        for (int i = 0; i < knMaxNodeCount; ++i) {
            DequeData* dd = new DequeData;
            dd->id = "randid_";
            dd->id.push_back((i % 10) + '0');
            dd->index = i;
            EnterCriticalSection(&g_stdcs);
            deq_list->push_back(dd);
            LeaveCriticalSection(&g_stdcs);
        }  // for
    
        return 0;
    }
    
    static void TestLockedDeque()
    {
        StdDeque deq_list;
        DequeData* poped_dd = NULL;
        HANDLE th[knThreadCount];
        InitializeCriticalSectionAndSpinCount(&g_stdcs, 2000);
    
        {
            AutoHRTimer ahr(g_hrtimer, "STD push 50000");
            for (int i = 0; i < knThreadCount; ++i) {
                th[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)StdThreadFunc, &deq_list, 0, NULL);
            }  // for
            ::WaitForMultipleObjects(knThreadCount, th, TRUE, INFINITE);
        }
    
        {
            AutoHRTimer ahr(g_hrtimer, "STD pop 5000");
            for (int i = 0; i < knPopedCount; ++i) {
                poped_dd = deq_list.front();
                deq_list.pop_front();
                delete poped_dd;
            }
        }
    
        {
            AutoHRTimer ahr(g_hrtimer, "STD free 45000");
            StdDeque::iterator iter = deq_list.begin();
            while (iter != deq_list.end()) {
                DequeData* dd = *iter;
                delete dd;
                ++iter;
            }
            deq_list.clear();
        }
    
        DeleteCriticalSection(&g_stdcs);
    }
    
    int main()
    {
        while (1) {
            TestAcsDeque();
            TestLockedDeque();
            Sleep(3000);
            fprintf(stdout, "--------------------------------------\n");
        }
    
        getchar();
    
        return 0;
    }

        5. 将无锁队列同std的有锁队列停止对比,效果如下图

        队列和加锁

        

        

    文章结束给大家分享下程序员的一些笑话语录: 手机终究会变成PC,所以ip会比wm更加畅销,但是有一天手机强大到一定程度了就会发现只有wm的支持才能完美享受。就好比树和草,草长得再高也是草,时间到了条件成熟了树就会窜天高了。www.ishuo.cn

  • 相关阅读:
    Python Day14
    Python Day13
    Python Day12
    Python Day11
    Python Day10
    Python Day9
    Python Day8
    Python Day7
    Python Day6
    Python Day5
  • 原文地址:https://www.cnblogs.com/xinyuyuanm/p/3089216.html
Copyright © 2011-2022 走看看