zoukankan      html  css  js  c++  java
  • 利用用户级线程提高多线程应用的性能

    随着处理器往多核的发展,多线程被越来越多的应用到软件的开发中。但是如果没有正确的使用多线程,反而可能会导致软件性能的下降。

    多线程程序中一个影响程序性能的因素就是同步。对于windows系统来说,最快的同步方案就是critical_section,critical_section基本上可以被认为是一个用户态的同步机制(特别是设定了spincount,只有在自旋超过了spincount次之后任然不能获得锁,才会切入核心态并把当前线程阻塞).但即使是这样,如果在临界区中的代码如果处理时间比较长,任然会导致处理器浪费在自旋上。如果我们可以让线程在无法获得锁的时候就切换线程(当然是在用户态,切换核心态线程的代价很大,除了进入核心态的开销,还有因为线程切换而导致缓存失效带来的代价)那么就可以把浪费在自旋上的cpu时间用来做有用的工作了。

    下面介绍一种利用用户态线程的多线程解决方案,首先,创建跟cpu数量一致的线程,每个线程上将会运行一个用户级线程调度器。

    所有的业务处理都交给用户级线程处理,每当用户级线程无法获得锁时,就将自己阻塞并回到调度器中,由调度器选择另一个用户级线程来运行。当一个用户级线程释放锁的时候,会唤醒一个阻塞在这个锁上的用户级线程。当然,因为用户级线程是没有时间片控制的,如果在里面处理耗时的代码,将会导致在同一调度器上运行的其它用户级线程无法运行。

    下面是代码:

    首先是一个lockfree队列,队列是线程安全的,并且无需任何锁

    lockfree_queue.h

    #ifndef _LOCKFREE_QUEUE_H
    #define _LOCKFREE_QUEUE_H
    template <typename T>
    struct _node
    {
    T val;
    _node<T> *_next;
    };
    template <typename T>
    class LockFreeQueue
    {
    public:
    LockFreeQueue():_head(0){}
    //在队列头插入一个节点
    void push(_node<T> *newnode)
    {
    while(1)
    {
    _node<T> *lhead = _head;//本地保存
    newnode->_next = lhead;
    //成功就退出,失败就重做
    if(InterlockedCompareExchangePointer((volatile PVOID *)&_head,newnode,lhead) == lhead)
    break;
    }
    }

    //从队列头弹出一个节点
    _node<T>* pop()
    {
    while(1)
    {
    _node<T> *lhead = _head;//本地保存
    if(!lhead)
    return NULL;
    _node<T> *ret = _head;
    if(InterlockedCompareExchangePointer((volatile PVOID *)&_head,_head->_next,lhead) == lhead)
    {
    ret->_next = NULL;
    return ret;
    }
    }
    }
    private:
    _node<T> *_head;
    };
    #endif

    用户级线程

    uthread.h

    #ifndef _UTHREAD_H
    #define _UTHREAD_H
    #include <Windows.h>
    #include "lockfree_queue.h"
    //#include "luaWrapper.h"
    enum
    {
    NONE,
    WAIT4EVENT = 1, //等待某事件的来临
    DEAD, //纤程已死亡
    ACTIVED, //可运行的
    UNACTIVED, //不可被添加到调度队列中
    YIELD,
    SLEEP,
    };
    enum
    {
    BS_MOV = 0,
    BS_ATK,
    BS_OTHER,
    BS_WAIT4LOCK,
    BS_END,
    };
    //阻塞结构
    class BlockStruct
    {
    public:
    BlockStruct(uChar type=BS_OTHER):bs_type(type){}
    //返回true纤程将从block中返回
    virtual bool WakeUp() = 0;
    uChar bs_type;
    };
    typedef int uthread_t;
    class uthread;
    class runnable
    {
    public:
    virtual void main_routine() = 0;
    };
    class uthread;
    struct st_timeout
    {
    st_timeout(uthread *ut):ut(ut),_timeout(0),index(0){}
    bool operator < (st_timeout &r)
    {
    return _timeout < r._timeout;
    }
    uLong _timeout;
    uthread *ut;
    int index;//在超时队列中的下标
    private:
    st_timeout & operator = (const st_timeout &other);
    st_timeout(const st_timeout &other);
    };
    class Scheduler;
    class uthread;
    struct ulstruct
    {
    void *lock_addr;
    uthread *ut;
    };
    //纤程
    class uthread
    {
    public:
    uthread(Scheduler *sc):m_runnable(0),m_bs(0),uthread_id(-1),m_status(NONE),_st_timeout(this),p_uthreadContext(0),m_next(0),wakeuptick(0),m_scheduler(sc)
    {
    m_unlockevent = (_node<ulstruct>*)_aligned_malloc(sizeof(*m_unlockevent),4);
    m_locknode = (_node<uthread*>*)_aligned_malloc(sizeof(*m_locknode),4);
    m_locknode->val = this;
    m_unlockevent->val.ut = this;
    }
    static void WINAPI fiber_routine(LPVOID pvParam);

    Scheduler *GetScheduler()
    {
    return m_scheduler;
    }
    //有事件到达,尝试唤醒block的纤程
    void Signal();

    void SetStatus(unsigned char st)
    {
    m_status = st;
    }
    unsigned char GetStatus()
    {
    return m_status;
    }
    PVOID GetUContext()
    {
    return p_uthreadContext;
    }
    void SetUContext(PVOID uct)
    {
    p_uthreadContext = uct;
    }
    void SetBs(BlockStruct *bs)
    {
    m_bs = bs;
    }
    BlockStruct *GetBs()
    {
    return m_bs;
    }
    void SetRunnable(runnable *ra)
    {
    m_runnable = ra;
    }
    runnable *GetRunnable()
    {
    return m_runnable;
    }

    st_timeout &GetTimeoutSt()
    {
    return _st_timeout;
    }

    uthread_t GetUid()
    {
    return uthread_id;
    }
    void SetUid(uthread_t uid)
    {
    uthread_id = uid;
    }
    uthread *Next()
    {
    return m_next;
    }
    void SetNext(uthread *ut)
    {
    m_next = ut;
    }
    uLong wakeuptick;

    _node<ulstruct>* GetUnlockEvent()
    {
    return m_unlockevent;
    }
    _node<uthread*>* GetLockNode()
    {
    return m_locknode;
    }

    private:
    unsigned char m_status;

    uthread_t uthread_id;

    PVOID p_uthreadContext;
    BlockStruct *m_bs;
    runnable *m_runnable;
    uthread *m_next;
    st_timeout _st_timeout;
    _node<ulstruct>* m_unlockevent;
    _node<uthread*>* m_locknode;
    Scheduler *m_scheduler;
    };
    #endif

    uthread.cpp

    #include "stdafx.h"
    #include "uthread.h"
    #include "fiberApi.h"
    #include <assert.h>
    #include <iostream>
    #include "ulock.h"
    void WINAPI uthread::fiber_routine(LPVOID pvParam)
    {
    uthread *_uthread = (uthread*)pvParam;
    while(1)
    {
    assert(_uthread->m_runnable);
    std::cout << "Ai Start,threadid :" << _uthread->uthread_id << std::endl;
    _uthread->m_runnable->main_routine();
    std::cout << "Ai Stop" << std::endl;
    _uthread->m_runnable = 0;
    //从可运行队列中删除
    //Scheduler::m_uthreads[Scheduler::m_curuid]->m_status = UNACTIVED;
    //SetCurrentUthreadState(UNACTIVED);

    //Scheduler::ReleaseUthread(Scheduler::m_curuid);

    ReleaseCurrentUthread();
    //Scheduler::_Yield(UNACTIVED);
    _Yield(UNACTIVED);
    }

    //Scheduler::m_uthreads[Scheduler::m_curuid]->m_status = DEAD;
    //SetCurrentUthreadState(DEAD);
    /*这里不能直接退出纤程运行函数,否则会导致运行线程的退出,
    * 正确的做法是把运行权交回给scheduler,由scheduler来删除
    * 这个纤程
    */
    //Scheduler::_Yield(DEAD);
    _Yield(DEAD);
    }
    //等待的事件到达了,将纤程重新插入到可运行队列中
    void uthread::Signal()
    {
    if(m_bs->WakeUp())
    {
    //printf("满足唤醒条件 %d /n",this->GetUid());
    //等待的条件满足了,把fiber置为可运行态并添加到运行队列中
    //Scheduler::Add2Active(this);
    Add2Active(this);
    m_bs = 0;
    wakeuptick = 0;
    }
    }

    然后是用户态的锁

    uLock.h

    #ifndef _ULOCK_H
    #define _ULOCK_H
    #pragma pack(push)
    #pragma pack(4)
    #include "fiberApi.h"
    #include "lockfree_queue.h"
    class Scheduler;
    //纤程间使用的用户级锁
    struct umutex
    {
    friend class Scheduler;
    public:
    umutex():flag(0){}
    void Lock()
    {
    if(InterlockedCompareExchange(&flag,1,0) == 1)
    {

    uthread *currentUThread = GetCurrentUThread();
    _node<uthread*> *tmp = currentUThread->GetLockNode();
    m_blockthread.push(tmp);
    //加锁失败,阻塞当前纤程
    Wait4Lock();
    }
    }
    void UnLock()
    {
    if(InterlockedCompareExchange(&flag,0,1) == 0)
    {
    //没有lock
    return;
    }
    //已经解锁,唤醒阻塞在这个锁上的纤程
    _node<uthread*> *tmp = m_blockthread.pop();
    if(tmp)
    {
    NotifyUnLock(this,tmp->val);
    }
    }
    private:
    bool _Lock(uthread *ut)
    {
    bool ret = InterlockedCompareExchange(&flag,1,0) == 0;
    if(!ret)
    {
    //uthread *currentUThread = GetCurrentUThread();
    _node<uthread*> *tmp = ut->GetLockNode();
    m_blockthread.push(tmp);
    }
    return ret;
    }
    private:
    volatile long flag;//如果被持有则置1,否则置0
    LockFreeQueue<uthread*> m_blockthread;//阻塞在这个锁上的纤程
    };
    #pragma pack(pop)
    #endif

    调度器

    scheduler.h

    #ifndef _SCHEDULER_H
    #define _SCHEDULER_H
    #include <Windows.h>
    #include "uthread.h"
    #include <map>
    #include <list>
    #include <time.h>
    #include "minHeap.h"
    #include "lockfree_queue.h"
    #define MAX_FIBER 32
    class Scheduler
    {
    friend class uthread;
    friend void _Yield(uChar);
    friend void ReleaseUthread(int);
    friend void ReleaseCurrentUthread();
    friend void SetCurrentUthreadState(uChar);
    friend void Add2Active(uthread*);
    friend uthread *GetCurrentUThread();
    friend uthread_t GetCurrentUThreadId();
    public:
    Scheduler():m_active_head(0),m_active_tail(0),m_count(0),m_curuid(-1),pending_index(0)
    {}
    //初始化纤程库
    void Init();
    void Destroy();
    //将一个纤程加入到调度列表中以运行runnable
    uthread_t FiberStartRun(runnable *param);
    //选择一个纤程以进行调度
    void Schedule();
    void SwitchTo(uthread_t uid)
    {
    SwitchToFiber(m_uthreads[uid]->GetUContext());
    }
    void SwitchToBlock(uthread_t uid)
    {
    if(m_uthreads[uid]->GetBs())
    SwitchTo(uid);
    }
    void _Yield(uChar status = YIELD)
    {
    m_uthreads[m_curuid]->SetStatus(status);
    SwitchToFiber(m_pUthreadContext);
    }
    //将一个纤程添加到可运行队列中
    void Add2Active(uthread *ut);
    //阻塞纤程,直到wc得到满足
    void Block(BlockStruct *bs,uLong ms);
    uthread_t GetFreeUthread()
    {
    if(!m_uthreadpool.empty())
    {
    uthread_t ret = m_uthreadpool.front();
    m_uthreadpool.pop_front();
    return ret;
    }
    return -1;
    }
    void ReleaseUthread(uthread_t uid)
    {
    if(uid < MAX_FIBER)
    {
    m_uthreads[uid]->SetStatus(UNACTIVED);
    m_uthreadpool.push_back(uid);
    }
    }
    //尝试唤醒uid
    void TryWakeup(uthread_t uid)
    {
    if(m_uthreads[uid]->GetBs())
    m_uthreads[uid]->Signal();
    }
    //强制唤醒纤程
    void ForceWakeup(uthread_t uid)
    {
    if(m_uthreads[uid]->GetStatus() != ACTIVED)
    {
    //printf("强制唤醒/n");
    Add2Active(m_uthreads[uid]);
    }
    }
    //强制唤醒阻塞在type条件上的纤程
    void ForceWakeup(uthread_t uid,uChar type)
    {
    if(m_uthreads[uid]->GetStatus() != ACTIVED &&
    m_uthreads[uid]->GetBs()->bs_type == type)
    {
    //printf("强制唤醒/n");
    Add2Active(m_uthreads[uid]);
    }
    }
    //清空activelist,和pendingadd
    void ClearActiveList();
    void ClearTimeOut()
    {
    m_timeoutlist.Clear();
    }
    void Sleep(uLong ms);
    void NotifyUnlock(_node<ulstruct> *nn)
    {
    m_unlockevent.push(nn);
    }
    void Wait4Lock();
    private:
    uthread *m_active_head;
    uthread *m_active_tail;
    //也可以不使用m_pendingAdd,根据测试结果决定
    uthread_t m_pendingAdd[MAX_FIBER];
    unsigned int pending_index;
    minheap<MAX_FIBER> m_timeoutlist;
    PVOID m_pUthreadContext;//调度器所在纤程的上下文

    uthread *m_uthreads[MAX_FIBER];
    LockFreeQueue<ulstruct> m_unlockevent;
    int m_count;
    int m_curuid; //当前正在运行的纤程的uid,==-1表示在scheduler中运行
    std::list<uthread_t> m_uthreadpool;//fiber池
    //std::map<void*,std::list<uthread*> > m_wait4lock;
    //std::list<uthread*> m_wait4lock;
    static const int reservesize = 65536;
    static const int commitsize = 8192;
    };
    #endif

    scheduler.cpp

    #include "stdafx.h"
    //#include "Scheduler.h"
    #include <assert.h>
    #include <iostream>
    #include "fiberApi.h"
    #include "uLock.h"
    //extern umutex *g_lock;
    uthread_t Scheduler::FiberStartRun(runnable *param)
    {
    uthread_t uid = GetFreeUthread();
    if(uid != -1)
    {
    m_uthreads[uid]->SetRunnable(param);
    Add2Active(m_uthreads[uid]);
    }
    return uid;
    }
    void Scheduler::Schedule()
    {
    {
    //看看是否有可以获取锁的纤程
    _node<ulstruct> *tmp = NULL;
    while(tmp = m_unlockevent.pop())
    {
    umutex *um = (umutex*)tmp->val.lock_addr;
    uthread *ut = tmp->val.ut;
    if(um->_Lock(ut))
    {
    //加锁成功,将纤程从等待队列中删除并投入到可运行队列中
    Add2Active(ut);
    }
    //std::map<void*,std::list<uthread*> >::iterator it = m_wait4lock.find(tmp->val);
    //if(it != m_wait4lock.end())
    //{
    //尝试加锁
    /*if(!it->second.empty())
    {
    umutex *um = (umutex*)it->first;
    uthread *ut = it->second.front();
    if(um->_Lock(ut))
    {
    //加锁成功,将纤程从等待队列中删除并投入到可运行队列中

    it->second.pop_front();
    Add2Active(ut);
    }
    }
    */

    //}
    //else
    //{
    //在Wait4Lock调用完成前,其它线程的解锁可能已经调用过NotifyUnLock了,
    //所以这里把解锁消息重新放回队列中,再次尝试
    // m_unlockevent.push(tmp);
    //}
    }
    }
    //将所有等待添加到m_activeList中的纤程都添加进去
    {
    for(unsigned int i = 0; i < pending_index; ++i)
    {
    uthread *ut = m_uthreads[m_pendingAdd[i]];
    ut->SetNext(0);
    if(m_active_tail)
    {
    m_active_tail->SetNext(ut);
    m_active_tail = ut;
    }
    else
    {
    m_active_head = m_active_tail = ut;
    }
    }
    pending_index = 0;
    }
    uthread *cur = m_active_head;
    uthread *pre = NULL;
    while(cur)
    {
    m_curuid = cur->GetUid();
    SwitchToFiber(cur->GetUContext());
    m_curuid = -1;
    unsigned char status = cur->GetStatus();
    //当纤程处于以下状态时需要从可运行队列中移除
    if(status == DEAD || status == SLEEP || status == WAIT4EVENT || status == UNACTIVED || status == YIELD)
    {
    //删除首元素
    if(cur == m_active_head)
    {
    //同时也是尾元素
    if(cur == m_active_tail)
    m_active_head = m_active_tail = NULL;
    else
    m_active_head = cur->Next();
    }
    else if(cur == m_active_tail)
    {
    pre->SetNext(NULL);
    m_active_tail = pre;
    }
    else
    pre->SetNext(cur->Next());
    uthread *tmp = cur;
    cur = cur->Next();
    tmp->SetNext(0);
    //如果仅仅是让出处理器,需要重新投入到可运行队列中
    if(status == YIELD)
    Add2Active(tmp);

    }
    else
    {
    pre = cur;
    cur = cur->Next();
    }
    }
    //看看有没有timeout的纤程
    {
    uLong now = GetTickCount();
    while(m_timeoutlist.Min() !=0 && m_timeoutlist.Min() <= now)
    {
    st_timeout *timeout = m_timeoutlist.PopMin();
    if(timeout->ut->GetStatus() == WAIT4EVENT || timeout->ut->GetStatus() == SLEEP)
    {
    timeout->ut->wakeuptick = timeout->_timeout;
    Add2Active(timeout->ut);
    }
    }
    }
    }
    void Scheduler::Destroy()
    {
    for(int i = 0; i < MAX_FIBER; ++i)
    {
    if(m_uthreads[i])
    {
    DeleteFiber(m_uthreads[i]->GetUContext());
    delete m_uthreads[i];
    }
    }
    ConvertFiberToThread();
    }
    void Scheduler::Block(BlockStruct *bs,uLong ms)
    {
    if(ms > 0)
    {
    st_timeout &_st_timeout = m_uthreads[m_curuid]->GetTimeoutSt();
    _st_timeout._timeout = GetTickCount() + ms;//time(NULL) + timeout;
    if(!_st_timeout.index)
    {
    m_timeoutlist.Insert(&_st_timeout);
    }
    else
    {
    m_timeoutlist.Change(&_st_timeout);
    }
    }
    m_uthreads[m_curuid]->SetBs(bs);
    m_uthreads[m_curuid]->SetStatus(WAIT4EVENT);
    SwitchToFiber(m_pUthreadContext);
    m_uthreads[m_curuid]->SetBs(0);
    }
    void Scheduler::Init()
    {
    m_pUthreadContext = ConvertThreadToFiber(NULL);

    //创建fiber池
    for(int i = 0 ; i < MAX_FIBER; ++i)
    {
    uthread *nthread = new uthread(this);
    PVOID uthreadcontext = CreateFiberEx(commitsize,reservesize,0,uthread::fiber_routine,nthread);
    assert(uthreadcontext);
    nthread->SetUContext(uthreadcontext);
    m_uthreads[i] = nthread;
    nthread->SetUid(i);
    m_uthreadpool.push_back(i);
    }
    }
    //将一个纤程添加到可运行队列中
    void Scheduler::Add2Active(uthread *ut)
    {
    //如果已经在active中了则不能再次添加
    if(ut->GetStatus() != ACTIVED)
    {
    ut->SetStatus(ACTIVED);
    m_pendingAdd[pending_index++] = ut->GetUid();
    }
    }
    void Scheduler::ClearActiveList()
    {
    pending_index = 0;
    uthread *cur = m_active_head;
    while(cur)
    {
    uthread *next = cur->Next();
    cur->SetNext(0);
    cur = next;
    }
    m_active_head = m_active_tail = NULL;
    }
    void Scheduler::Sleep(uLong ms)
    {
    if(ms > 0)
    {
    st_timeout &_st_timeout = m_uthreads[m_curuid]->GetTimeoutSt();
    _st_timeout._timeout = GetTickCount() + ms;//time(NULL) + seconds;
    if(!_st_timeout.index)
    {
    m_timeoutlist.Insert(&_st_timeout);
    }
    else
    {
    m_timeoutlist.Change(&_st_timeout);
    }
    m_uthreads[m_curuid]->SetStatus(SLEEP);
    }
    SwitchToFiber(m_pUthreadContext);
    }
    //纤程在等待lock_addr锁,需要将纤程移出可运行队列,并记等待信息
    void Scheduler::Wait4Lock()
    {
    /*std::map<void*,std::list<uthread*> >::iterator it = m_wait4lock.find(lock_addr);
    uthread *current_uthread = m_uthreads[m_curuid];
    if(it == m_wait4lock.end())
    m_wait4lock.insert(std::make_pair(lock_addr,std::list<uthread*>(1,current_uthread)));
    else
    it->second.push_back(current_uthread);
    */
    uthread *current_uthread = m_uthreads[m_curuid];
    //m_wait4lock.push_back(current_uthread);
    current_uthread->SetStatus(WAIT4EVENT);
    //切换回调度器
    SwitchToFiber(m_pUthreadContext);
    }

    然后是一些API

    fiberApi.h

    #ifndef _FIBERAPI_H
    #define _FIBERAPI_H
    #include "Scheduler.h"
    #include <map>
    //与每个线程相关的纤程调度器
    //extern std::map<DWORD,Scheduler*> g_tlssc;
    extern Scheduler* g_tlssc[1019];
    void _Yield(uChar);
    void ReleaseUthread(int);
    void ReleaseCurrentUthread();//释放当前的纤程
    void SetCurrentUthreadState(uChar);//设置当前纤程的状态
    void Add2Active(uthread*);
    uthread *GetCurrentUThread();
    uthread_t GetCurrentUThreadId();
    void Wait4Lock();
    void NotifyUnLock(void*,uthread*);
    #endif

    fiberApi.cpp

    #include "stdafx.h"
    #include "fiberApi.h"
    //std::map<DWORD,Scheduler*> g_tlssc;
    Scheduler* g_tlssc[1019];
    void _Yield(uChar state)
    {
    DWORD currenttrheadid = GetCurrentThreadId();
    g_tlssc[currenttrheadid%512]->_Yield(state);
    }
    void ReleaseUthread(int uthreadid)
    {
    DWORD currenttrheadid = GetCurrentThreadId();
    g_tlssc[currenttrheadid%512]->ReleaseUthread(uthreadid);
    }
    void ReleaseCurrentUthread()
    {
    DWORD currenttrheadid = GetCurrentThreadId();
    Scheduler *sc = g_tlssc[currenttrheadid%512];
    sc->ReleaseUthread(sc->m_curuid);
    }
    void SetCurrentUthreadState(uChar state)
    {
    DWORD currenttrheadid = GetCurrentThreadId();
    Scheduler *sc = g_tlssc[currenttrheadid%512];
    sc->m_uthreads[sc->m_curuid]->SetStatus(state);
    }
    void Add2Active(uthread *ut)
    {
    DWORD currenttrheadid = GetCurrentThreadId();
    g_tlssc[currenttrheadid%512]->Add2Active(ut);
    }
    uthread *GetCurrentUThread()
    {
    DWORD currenttrheadid = GetCurrentThreadId();
    Scheduler *sc = g_tlssc[currenttrheadid%512];
    return sc->m_uthreads[sc->m_curuid];
    }
    uthread_t GetCurrentUThreadId()
    {
    DWORD currenttrheadid = GetCurrentThreadId();
    Scheduler *sc = g_tlssc[currenttrheadid%512];
    return sc->m_curuid;
    }
    void Wait4Lock()
    {
    DWORD currenttrheadid = GetCurrentThreadId();
    g_tlssc[currenttrheadid%512]->Wait4Lock();
    }
    void NotifyUnLock(void *lock_addr,uthread *ut)
    {
    _node<ulstruct> *nn = ut->GetUnlockEvent();
    nn->val.lock_addr = lock_addr;
    ut->GetScheduler()->NotifyUnlock(nn);
    //g_tlssc[threadid]->NotifyUnlock(lock_addr);
    //std::map<DWORD,Scheduler>::iterator it = g_tlssc.begin();
    //std::map<DWORD,Scheduler>::iterator end = g_tlssc.end();
    //for(; it != end; ++it)
    // it->second.NotifyUnlock(lock_addr);
    }

    经过进一步测试,在ulock的lock和unlock中使用的无锁队列m_blockthread可能因为多线程操作导致解锁通告丢失。

    因此,m_blockthread需要改为普通队列,并且在操作前暂时用自旋锁锁定(暂时使用,希望可以找到更好的方法)。

    大致修改如下:

    void Lock()
    {
    if(InterlockedCompareExchange(&flag,1,0) == 1)
    {
    uthread *currentUThread = GetCurrentUThread();
    _node<uthread*> tmp = currentUThread->GetLockNode();
    while(InterlockedCompareExchange(&spinlock,1,0) == 1);
    push(tmp);
    InterlockedCompareExchange(&spinlock,0,1);
    Wait4Lock();
    }
    }
    void UnLock()
    {
    if(InterlockedCompareExchange(&flag,0,1) == 0)
    return;
    while(InterlockedCompareExchange(&spinlock,1,0) == 1);
    _node<uthread*> *tmp = pop();
    InterlockedCompareExchange(&spinlock,0,1);
    if(tmp)
    NotifyUnLock(this,tmp->val);
    }

    其次,还有一个问题需要解决,就是各纤程获得锁的次数不平均,具体例子如下:在双核机器上启动两个线程,线程上各运行一个纤程,对testlist进行写入的时候

    会发现,大部分的写入是由其中一个纤程完成的,而另外一个纤程则很少能获得写入的机会。

    下面是修改后的uLock.h,解决了纤程获得锁不平均的问题,只要创建的调度线程不超过cpu的数量,基本保证了各纤程有均等的机会获得锁。

    #ifndef _ULOCK_H
    #define _ULOCK_H
    #pragma pack(push)
    #pragma pack(4)
    #include "fiberApi.h"
    //#include "lockfree_queue.h"
    class Scheduler;
    //纤程间使用的用户级锁
    struct umutex
    {
    friend class Scheduler;
    public:
    umutex():flag(0),spinlock(0),m_head(0),m_tail(0)
    {
    }
    void Lock()
    {
    if(InterlockedCompareExchange(&flag,1,0) == 1)
    {

    uthread *currentUThread = GetCurrentUThread();
    _node<uthread*> *tmp = currentUThread->GetLockNode();
    while(InterlockedCompareExchange(&spinlock,1,0) == 1);
    //再次尝试加锁
    if(InterlockedCompareExchange(&flag,1,0) == 0)
    {
    InterlockedCompareExchange(&spinlock,0,1);
    return;
    }
    push(tmp);
    InterlockedCompareExchange(&spinlock,0,1);

    //加锁失败,阻塞当前纤程
    Wait4Lock();
    }
    }
    void UnLock()
    {
    if(InterlockedCompareExchange(&flag,0,1) == 0)
    {
    //没有lock
    return;
    }
    //已经解锁,挑选一个纤程,并将它唤醒
    while(InterlockedCompareExchange(&spinlock,1,0) == 1);
    _node<uthread*> *tmp = pop();
    InterlockedCompareExchange(&spinlock,0,1);

    if(tmp)
    {
    NotifyUnLock(this,tmp->val);
    }
    }
    private:
    bool _Lock(uthread *ut)
    {
    bool ret = InterlockedCompareExchange(&flag,1,0) == 0;
    if(!ret)
    {
    uthread *currentUThread = GetCurrentUThread();
    _node<uthread*> *tmp = ut->GetLockNode();
    while(InterlockedCompareExchange(&spinlock,1,0) == 1);
    //再次尝试加锁
    if(InterlockedCompareExchange(&flag,1,0) == 0)
    {
    InterlockedCompareExchange(&spinlock,0,1);
    return true;
    }

    push(tmp);
    InterlockedCompareExchange(&spinlock,0,1);

    }
    return ret;
    }
    void push(_node<uthread*> *blockut)
    {
    blockut->_next = NULL;
    if(NULL == m_tail)
    {
    m_head = m_tail = blockut;
    }
    else
    {
    m_tail->_next = blockut;
    m_tail = blockut;
    }
    }
    _node<uthread*> *pop()
    {
    if(NULL == m_head)
    return NULL;
    else
    {
    _node<uthread*> *ret = m_head;
    m_head = m_head->_next;
    if(m_head == NULL)
    m_tail = m_head;
    return ret;
    }
    }
    private:
    volatile long flag;//如果被持有则置1,否则置0
    volatile long spinlock;//自旋锁,保护m_blockthread;
    //队列,记录阻塞在这个锁上的纤程
    _node<uthread*> *m_head;
    _node<uthread*> *m_tail;
    };
    #pragma pack(pop)
    #endif

    测试代码:

    // fiberFramework.cpp : 定义控制台应用程序的入口点。
    //
    #include "stdafx.h"
    #include "CThread.h"
    #include "fiberApi.h"
    #include "uLock.h"
    #include "CLock.h"
    #define TESTSIZE 1000000
    int g_testlist[TESTSIZE];
    int g_testlistcs[TESTSIZE];
    int g_testmutex[TESTSIZE];
    umutex *g_lock;
    zMutex *g_lockmutex;
    zLightMutex *g_lockcs;
    /*std::list<int> g_testlist2;
    std::list<int> g_testlistcs2;
    std::list<int> g_testmutex2;
    umutex *g_lock2;
    zMutex *g_lockmutex2;
    zLightMutex *g_lockcs2;
    */
    static volatile bool finish = false;
    static volatile long count = 0;
    zThreadGroup g_threadgroup;
    DWORD starttime = 0;
    DWORD endtime = 0;
    class uworker : public runnable
    {
    public:
    void main_routine()
    {
    while(1)
    {
    g_lock->Lock();
    if(count==0)
    {
    starttime = GetTickCount();
    }
    if(count == TESTSIZE)
    {
    endtime = GetTickCount();
    finish = true;
    g_lock->UnLock();
    return;
    }
    else
    {
    g_testlist[count] = GetCurrentThreadId()+GetCurrentUThreadId();
    //InterlockedIncrement(&count);
    }
    ++count;
    g_lock->UnLock();
    _Yield(YIELD);
    volatile int c = 0;
    for(volatile int cc = 0; cc < 100; ++cc)
    c++;
    }
    }
    };
    class CWorkerThread : public zThread,private Noncopyable
    {
    public:
    CWorkerThread(const std::string &name = std::string("zThread"),const bool joinable = true)
    :zThread(name,joinable){}
    ~CWorkerThread(){}
    void run()
    {
    Scheduler *sc = new Scheduler;
    sc->Init();
    //g_tlssc.insert(std::make_pair(GetCurrentThreadId(),sc));
    if(g_tlssc[GetCurrentThreadId()%TLSSIZE] != NULL)
    {
    printf("error here/n");
    getchar();
    exit(0);
    }
    g_tlssc[GetCurrentThreadId()%TLSSIZE] = sc;
    {
    uworker uw1;
    uworker uw2;
    uworker uw3;
    uworker uw4;
    sc->FiberStartRun(&uw1);
    sc->FiberStartRun(&uw2);
    sc->FiberStartRun(&uw3);
    sc->FiberStartRun(&uw4);
    }
    /*{
    uworker uw1;
    uworker uw2;
    uworker uw3;
    uworker uw4;
    sc->FiberStartRun(&uw1);
    sc->FiberStartRun(&uw2);
    sc->FiberStartRun(&uw3);
    sc->FiberStartRun(&uw4);
    }
    */

    while(!finish)
    {
    sc->Schedule();
    }

    }
    };
    class CWorkerThreadCs : public zThread,private Noncopyable
    {
    public:
    CWorkerThreadCs(const std::string &name = std::string("zThread"),const bool joinable = true)
    :zThread(name,joinable){}
    ~CWorkerThreadCs(){}
    void run()
    {
    while(1)
    {
    g_lockcs->Lock();
    if(count == 0)
    {
    starttime = GetTickCount();;
    }
    if(count == TESTSIZE)
    {
    endtime = GetTickCount();
    g_lockcs->UnLock();
    return;
    }
    else
    {
    g_testlistcs[count] = GetCurrentThreadId();
    //InterlockedIncrement(&count);
    }
    ++count;
    g_lockcs->UnLock();
    volatile int c = 0;
    for(volatile int cc = 0; cc < 100; ++cc)
    c++;
    }
    }
    };
    class CWorkerThreadMutex : public zThread,private Noncopyable
    {
    public:
    CWorkerThreadMutex(const std::string &name = std::string("zThread"),const bool joinable = true)
    :zThread(name,joinable){}
    ~CWorkerThreadMutex(){}
    void run()
    {
    while(1)
    {
    g_lockmutex->Lock();
    if(count == 0)
    {
    starttime = GetTickCount();;
    }
    if(count == TESTSIZE)
    {
    endtime = GetTickCount();
    g_lockmutex->UnLock();
    //printf("finish/n");
    return;
    }
    else
    {
    g_testmutex[count] = GetCurrentThreadId();
    //InterlockedIncrement(&count);
    //printf("uthread:%d/n",GetCurrentThreadId());
    }
    ++count;
    g_lockmutex->UnLock();
    volatile int c = 0;
    for(volatile int cc = 0; cc < 100; ++cc)
    c++;
    }
    }
    };
    struct TestCallback : public zThreadGroup::Callback
    {
    void exec(zThread *e)
    {
    e->start();
    }
    ~TestCallback(){}
    };
    void testfiber(int n)
    {
    void *buf = _aligned_malloc(sizeof(*g_lock),4);
    g_lock = new (buf)umutex;
    for(int i = 0; i < n; ++i)
    {
    CWorkerThread *cw1 = new CWorkerThread;
    g_threadgroup.add(cw1);
    }
    TestCallback CallBack;
    g_threadgroup.execAll(CallBack);
    g_threadgroup.joinAll();
    printf("test fiber/n");
    printf("count %d/n",count);
    printf("time %d/n",endtime - starttime);
    std::map<int,int> stat;
    for(int i = 0; i < TESTSIZE; ++i)
    {
    std::map<int,int>::iterator it = stat.find(g_testlist[i]);
    if(it == stat.end())
    {
    stat.insert(std::make_pair(g_testlist[i],1));
    }
    else
    {
    stat[g_testlist[i]]++;
    }
    }
    printf("stat size = %d/n",stat.size());
    for(std::map<int,int>::iterator it = stat.begin(); it != stat.end(); ++it)
    {
    printf("id=%d,count=%d/n",it->first,it->second);
    }
    }
    void testcs(int n)
    {
    g_lockcs = new zLightMutex;
    for(int i = 0; i < n; ++i)
    {
    CWorkerThreadCs *cw1 = new CWorkerThreadCs;
    g_threadgroup.add(cw1);
    }
    TestCallback CallBack;
    g_threadgroup.execAll(CallBack);
    //while(!_kbhit())//等待服务器终止
    //{
    // Sleep(10);
    //}
    g_threadgroup.joinAll();
    printf("test cs/n");
    printf("count %d/n",count);
    printf("time %d/n",endtime - starttime);
    std::map<int,int> stat;
    for(int i = 0; i < TESTSIZE; ++i)
    {
    std::map<int,int>::iterator it = stat.find(g_testlistcs[i]);
    if(it == stat.end())
    {
    stat.insert(std::make_pair(g_testlistcs[i],1));
    }
    else
    {
    stat[g_testlistcs[i]]++;
    }
    }
    printf("stat size = %d/n",stat.size());
    for(std::map<int,int>::iterator it = stat.begin(); it != stat.end(); ++it)
    {
    printf("id=%d,count=%d/n",it->first,it->second);
    }
    }
    void testmutex(int n)
    {
    g_lockmutex = new zMutex;
    for(int i = 0; i < n; ++i)
    {
    CWorkerThreadMutex *cw1 = new CWorkerThreadMutex;
    g_threadgroup.add(cw1);
    }
    TestCallback CallBack;
    g_threadgroup.execAll(CallBack);
    g_threadgroup.joinAll();
    printf("test mutex/n");
    printf("count %d/n",count);
    printf("time %d/n",endtime - starttime);
    std::map<int,int> stat;
    for(int i = 0; i < TESTSIZE; ++i)
    {
    std::map<int,int>::iterator it = stat.find(g_testmutex[i]);
    if(it == stat.end())
    {
    stat.insert(std::make_pair(g_testmutex[i],1));
    }
    else
    {
    stat[g_testmutex[i]]++;
    }
    }
    printf("stat size = %d/n",stat.size());
    for(std::map<int,int>::iterator it = stat.begin(); it != stat.end(); ++it)
    {
    printf("id=%d,count=%d/n",it->first,it->second);
    }
    }
    int _tmain(int argc, _TCHAR* argv[])
    {
    int n = _ttol(argv[1]);
    memset(g_tlssc,0,sizeof(g_tlssc));
    count = 0;
    testfiber(n/4);
    count = 0;
    testcs(n);
    count = 0;
    testmutex(n);

    /*LockFreeQueue<int> q;
    for(int i = 0; i < 5; ++i)
    {
    _node<int> *pNode = new _node<int>;
    pNode->val = i;
    q.push(pNode);
    }
    //q.print();
    for(int i = 0; i < 5; ++i)
    {
    _node<int> *pNode = q.pop();
    printf("%d/n",pNode->val);
    }
    _node<int> *pNode = q.pop();
    */
    getchar();
    return 0;
    }
















  • 相关阅读:
    由基于qml,c++的串口调试工具浅谈qml与c++混合编程
    qt5_qml_Opengl_shader 第一弹----------------------openglunderqml的简化及介绍
    Delphi 的接口机制——接口操作的编译器实现过程(2)
    Delphi 的接口机制——接口操作的编译器实现过程(1)
    ddd
    [leetcode]Gray Code
    synapse socket总结一:服务器模型
    CentOS 6.5(64bit)安装GCC4.8.2+Qt5.2.1(替换GCC的链接库)
    Qt打开外部程序和文件夹需要注意的细节(Qt调用VC写的动态库,VC需要用C的方式输出函数,否则MinGW32编译过程会报错)
    Qt+SQLite数据加密的一种思路(内存数据库)
  • 原文地址:https://www.cnblogs.com/sniperHW/p/2429634.html
Copyright © 2011-2022 走看看