zoukankan      html  css  js  c++  java
  • 使用pthread下的mutex与cond_var模拟windows下的event几个接口

    两个版本的链接:

    https://github.com/neosmart/pevents

    https://github.com/moya-lang/Event

    第一个版本能够模拟等待多个事件中的一个触发,而后者仅最多支持一个事件

    但第一个版本在UnlockedWaitForEvent执行后,是需要增加一个判断的,否则会不正确

    代码:

    #if 0
    
    //#define PULSE
    //#define WFMO
    
    #include <assert.h>
    #include <errno.h>
    #include <pthread.h>
    #include <sys/time.h>
    
    #ifdef WFMO
    #include <algorithm>
    #include <deque>
    #endif
    
        struct SyncObjectPosix_event_st;
    
        // Function declarations
        void* CreateEvent(void* lpEventAttributes = nullptr, bool manualReset = false, bool initialState = false, void* lpName = nullptr);
        int DestroyEvent(void* event);
        //int WaitForEvent(void* event, uint64_t milliseconds = -1);
        int WaitForSingleObject(void* event, uint64_t milliseconds = -1);
        int SetEvent(void* event);
        int ResetEvent(void* event);
    #ifdef WFMO
        int WaitForMultipleObjects(int nCount,const void* *lpHandles, bool  bWaitAll, uint64_t milliseconds);
    
        int WaitForMultipleEvents(void* *events, int count, bool waitAll,
                                  uint64_t milliseconds);
        int WaitForMultipleEvents(void* *events, int count, bool waitAll,
                                  uint64_t milliseconds, int &index);
    #endif
    #ifdef PULSE
        int PulseEvent(void* event);
    #endif
    
    #ifdef WFMO
        // Each call to WaitForMultipleObjects initializes a neosmart_wfmo_t object which tracks
        // the progress of the caller's multi-object wait and dispatches responses accordingly.
        // One neosmart_wfmo_t struct is shared for all events in a single WFMO call
        typedef struct SyncObjectPosix_wfmo_st {
            pthread_mutex_t Mutex;
            pthread_cond_t CVariable;
            int RefCount;
            union {
                int FiredEvent; // WFSO
                int EventsLeft; // WFMO
            } Status;
            bool WaitAll;
            bool StillWaiting;
    
            void Destroy() {
                pthread_mutex_destroy(&Mutex);
                pthread_cond_destroy(&CVariable);
            }
        }SyncObjectPosix_wfmo_st;
        //typedef SyncObjectPosix_wfmo_t_ *SyncObjectPosix_wfmo_t;
    
        // A neosmart_wfmo_info_t object is registered with each event waited on in a WFMO
        // This reference to neosmart_wfmo_t_ is how the event knows whom to notify when triggered
        typedef struct SyncObjectPosix_wfmo_info_st {
            SyncObjectPosix_wfmo_st* Waiter;
            int WaitIndex;
        }SyncObjectPosix_wfmo_info_st;
        //typedef SyncObjectPosix_wfmo_info_t_ *nSyncObjectPosix_wfmo_info_t;
    #endif // WFMO
    
        // The basic event structure, passed to the caller as an opaque pointer when creating events
        typedef struct SyncObjectPosix_event_st {
            pthread_cond_t CVariable;
            pthread_mutex_t Mutex;
            bool AutoReset;
            bool State;
    #ifdef WFMO
            std::deque<SyncObjectPosix_wfmo_info_st> RegisteredWaits;
    #endif
        }SyncObjectPosix_event_st;
    
    #ifdef WFMO
        bool RemoveExpiredWaitHelper(SyncObjectPosix_wfmo_info_st wait) {
            int result = pthread_mutex_trylock(&wait.Waiter->Mutex);
    
            if (result == EBUSY) {
                return false;
            }
    
            assert(result == 0);
    
            if (wait.Waiter->StillWaiting == false) {
                --wait.Waiter->RefCount;
                assert(wait.Waiter->RefCount >= 0);
                bool destroy = wait.Waiter->RefCount == 0;
                result = pthread_mutex_unlock(&wait.Waiter->Mutex);
                assert(result == 0);
                if (destroy) {
                    wait.Waiter->Destroy();
                    delete wait.Waiter;
                }
    
                return true;
            }
    
            result = pthread_mutex_unlock(&wait.Waiter->Mutex);
            assert(result == 0);
    
            return false;
        }
    #endif // WFMO
    
        void* CreateEvent(void* lpEventAttributes, bool manualReset, bool initialState, void* lpName) {
            SyncObjectPosix_event_st* event = new SyncObjectPosix_event_st;
    
            int result = pthread_cond_init(&event->CVariable, 0);
            assert(result == 0);
    
            result = pthread_mutex_init(&event->Mutex, 0);
            assert(result == 0);
    
            event->State = false;
            event->AutoReset = !manualReset;
    
            if (initialState) {
                result = SetEvent(event);
                assert(result == 0);
            }
    
            return event;
        }
    
        int UnlockedWaitForEvent(void* event, uint64_t milliseconds) {
            int result = 0;
            if (!((SyncObjectPosix_event_st*)event)->State) {
                // Zero-timeout event state check optimization
                if (milliseconds == 0) {
                    return WAIT_TIMEOUT;
                }
    
                timespec ts;
                if (milliseconds != (uint64_t)-1) {
                    timeval tv;
                    gettimeofday(&tv, NULL);
    
                    uint64_t nanoseconds = ((uint64_t)tv.tv_sec) * 1000 * 1000 * 1000 +
                                           milliseconds * 1000 * 1000 + ((uint64_t)tv.tv_usec) * 1000;
    
                    ts.tv_sec = nanoseconds / 1000 / 1000 / 1000;
                    ts.tv_nsec = (nanoseconds - ((uint64_t)ts.tv_sec) * 1000 * 1000 * 1000);
                }
    
                do {
                    // Regardless of whether it's an auto-reset or manual-reset event:
                    // wait to obtain the event, then lock anyone else out
                    if (milliseconds != (uint64_t)-1) {
                        result = pthread_cond_timedwait(&((SyncObjectPosix_event_st*)event)->CVariable, &((SyncObjectPosix_event_st*)event)->Mutex, &ts);
                    } else {
                        result = pthread_cond_wait(&((SyncObjectPosix_event_st*)event)->CVariable, &((SyncObjectPosix_event_st*)event)->Mutex);
                    }
                } while (result == 0 && !((SyncObjectPosix_event_st*)event)->State);
    
                if (result == 0 && ((SyncObjectPosix_event_st*)event)->AutoReset) {
                    // We've only accquired the event if the wait succeeded
                    ((SyncObjectPosix_event_st*)event)->State = false;
                }
            } else if (((SyncObjectPosix_event_st*)event)->AutoReset) {
                // It's an auto-reset event that's currently available;
                // we need to stop anyone else from using it
                result = 0;
                ((SyncObjectPosix_event_st*)event)->State = false;
            }
            // Else we're trying to obtain a manual reset event with a signaled state;
            // don't do anything
    
            return result;
        }
    
        int WaitForSingleObject(void* event, uint64_t milliseconds) {
            int tempResult;
            if (milliseconds == 0) {
                tempResult = pthread_mutex_trylock(&((SyncObjectPosix_event_st*)event)->Mutex);
                if (tempResult == EBUSY) {
                    return WAIT_TIMEOUT;
                }
            } else {
                tempResult = pthread_mutex_lock(&((SyncObjectPosix_event_st*)event)->Mutex);
            }
    
            assert(tempResult == 0);
    
            int result = UnlockedWaitForEvent(((SyncObjectPosix_event_st*)event), milliseconds);
    
            tempResult = pthread_mutex_unlock(&((SyncObjectPosix_event_st*)event)->Mutex);
            assert(tempResult == 0);
    
            if (result == ETIMEDOUT) {
                return WAIT_TIMEOUT;
            }
    
            return result;
        }
    
    #ifdef WFMO
        int WaitForMultipleEvents(void* *events, int count, bool waitAll,
                                  uint64_t milliseconds) {
            int unused;
            return WaitForMultipleEvents(events, count, waitAll, milliseconds, unused);
        }
    
        int WaitForMultipleEvents(void* *events, int count, bool waitAll,
                                  uint64_t milliseconds, int &waitIndex) {
            SyncObjectPosix_wfmo_st* wfmo = new SyncObjectPosix_wfmo_st;
    
            SyncObjectPosix_event_st** pp_events = (SyncObjectPosix_event_st**)events;
    
            int result = 0;
            int tempResult = pthread_mutex_init(&wfmo->Mutex, 0);
            assert(tempResult == 0);
    
            tempResult = pthread_cond_init(&wfmo->CVariable, 0);
            assert(tempResult == 0);
    
            SyncObjectPosix_wfmo_info_st waitInfo;
            waitInfo.Waiter = wfmo;
            waitInfo.WaitIndex = -1;
    
            wfmo->WaitAll = waitAll;
            wfmo->StillWaiting = true;
            wfmo->RefCount = 1;
    
            if (waitAll) {
                wfmo->Status.EventsLeft = count;
            } else {
                wfmo->Status.FiredEvent = -1;
            }
    
            tempResult = pthread_mutex_lock(&wfmo->Mutex);
            assert(tempResult == 0);
    
            bool done = false;
            waitIndex = -1;
    
            for (int i = 0; i < count; ++i) {
                waitInfo.WaitIndex = i;
    
                // Must not release lock until RegisteredWait is potentially added
                tempResult = pthread_mutex_lock(&pp_events[i]->Mutex);
                assert(tempResult == 0);
    
                // Before adding this wait to the list of registered waits, let's clean up old, expired
                // waits while we have the event lock anyway
                pp_events[i]->RegisteredWaits.erase(std::remove_if(pp_events[i]->RegisteredWaits.begin(),
                                                                pp_events[i]->RegisteredWaits.end(),
                                                                RemoveExpiredWaitHelper),
                                                 pp_events[i]->RegisteredWaits.end());
    
                if (UnlockedWaitForEvent(events[i], 0) == 0) {
                    tempResult = pthread_mutex_unlock(&pp_events[i]->Mutex);
                    assert(tempResult == 0);
    
                    if (waitAll) {
                        --wfmo->Status.EventsLeft;
                        assert(wfmo->Status.EventsLeft >= 0);
                    } else {
                        wfmo->Status.FiredEvent = i;
                        waitIndex = i;
                        done = true;
                        break;
                    }
                } else {
                    pp_events[i]->RegisteredWaits.push_back(waitInfo);
                    ++wfmo->RefCount;
    
                    tempResult = pthread_mutex_unlock(&pp_events[i]->Mutex);
                    assert(tempResult == 0);
                }
            }
    
            // We set the `done` flag above in case of WaitAny and at least one event was set.
            // But we need to check again here if we were doing a WaitAll or else we'll incorrectly
            // return WAIT_TIMEOUT.
            if (waitAll && wfmo->Status.EventsLeft == 0) {
                done = true;
            }
    
            timespec ts;
            if (!done) {
                if (milliseconds == 0) {
                    result = WAIT_TIMEOUT;
                    done = true;
                } else if (milliseconds != (uint64_t)-1) {
                    timeval tv;
                    gettimeofday(&tv, NULL);
    
                    uint64_t nanoseconds = ((uint64_t)tv.tv_sec) * 1000 * 1000 * 1000 +
                                           milliseconds * 1000 * 1000 + ((uint64_t)tv.tv_usec) * 1000;
    
                    ts.tv_sec = nanoseconds / 1000 / 1000 / 1000;
                    ts.tv_nsec = (nanoseconds - ((uint64_t)ts.tv_sec) * 1000 * 1000 * 1000);
                }
            }
    
            while (!done) {
                // One (or more) of the events we're monitoring has been triggered?
    
                // If we're waiting for all events, assume we're done and check if there's an event that
                // hasn't fired But if we're waiting for just one event, assume we're not done until we
                // find a fired event
                done = (waitAll && wfmo->Status.EventsLeft == 0) ||
                       (!waitAll && wfmo->Status.FiredEvent != -1);
    
                if (!done) {
                    if (milliseconds != (uint64_t)-1) {
                        result = pthread_cond_timedwait(&wfmo->CVariable, &wfmo->Mutex, &ts);
                    } else {
                        result = pthread_cond_wait(&wfmo->CVariable, &wfmo->Mutex);
                    }
    
                    if (result != 0) {
                        break;
                    }
                }
            }
    
            waitIndex = wfmo->Status.FiredEvent;
            wfmo->StillWaiting = false;
    
            --wfmo->RefCount;
            assert(wfmo->RefCount >= 0);
            bool destroy = wfmo->RefCount == 0;
            tempResult = pthread_mutex_unlock(&wfmo->Mutex);
            assert(tempResult == 0);
            if (destroy) {
                wfmo->Destroy();
                delete wfmo;
            }
    
            return result;
        }
    #endif // WFMO
    
        int CloseHandle(void* event) {
            return DestroyEvent(event);
        }
    
        int DestroyEvent(void* event) {
            int result = 0;
    
    #ifdef WFMO
            result = pthread_mutex_lock(&((SyncObjectPosix_event_st*)event)->Mutex);
            assert(result == 0);
            ((SyncObjectPosix_event_st*)event)->RegisteredWaits.erase(std::remove_if(((SyncObjectPosix_event_st*)event)->RegisteredWaits.begin(),
                                                        ((SyncObjectPosix_event_st*)event)->RegisteredWaits.end(),
                                                        RemoveExpiredWaitHelper),
                                         ((SyncObjectPosix_event_st*)event)->RegisteredWaits.end());
            result = pthread_mutex_unlock(&((SyncObjectPosix_event_st*)event)->Mutex);
            assert(result == 0);
    #endif
    
            result = pthread_cond_destroy(&((SyncObjectPosix_event_st*)event)->CVariable);
            assert(result == 0);
    
            result = pthread_mutex_destroy(&((SyncObjectPosix_event_st*)event)->Mutex);
            assert(result == 0);
    
            delete ((SyncObjectPosix_event_st*)event);
    
            return 0;
        }
    
        int SetEvent(void* event) {
            int result = pthread_mutex_lock(&((SyncObjectPosix_event_st*)event)->Mutex);
            assert(result == 0);
    
            ((SyncObjectPosix_event_st*)event)->State = true;
    
            // Depending on the event type, we either trigger everyone or only one
            if (((SyncObjectPosix_event_st*)event)->AutoReset) {
    #ifdef WFMO
                while (!((SyncObjectPosix_event_st*)event)->RegisteredWaits.empty()) {
                    SyncObjectPosix_wfmo_info_st* i = &((SyncObjectPosix_event_st*)event)->RegisteredWaits.front();
    
                    result = pthread_mutex_lock(&i->Waiter->Mutex);
                    assert(result == 0);
    
                    --i->Waiter->RefCount;
                    assert(i->Waiter->RefCount >= 0);
                    if (!i->Waiter->StillWaiting) {
                        bool destroy = i->Waiter->RefCount == 0;
                        result = pthread_mutex_unlock(&i->Waiter->Mutex);
                        assert(result == 0);
                        if (destroy) {
                            i->Waiter->Destroy();
                            delete i->Waiter;
                        }
                        ((SyncObjectPosix_event_st*)event)->RegisteredWaits.pop_front();
                        continue;
                    }
    
                    ((SyncObjectPosix_event_st*)event)->State = false;
    
                    if (i->Waiter->WaitAll) {
                        --i->Waiter->Status.EventsLeft;
                        assert(i->Waiter->Status.EventsLeft >= 0);
                        // We technically should do i->Waiter->StillWaiting = Waiter->Status.EventsLeft
                        // != 0 but the only time it'll be equal to zero is if we're the last event, so
                        // no one else will be checking the StillWaiting flag. We're good to go without
                        // it.
                    } else {
                        i->Waiter->Status.FiredEvent = i->WaitIndex;
                        i->Waiter->StillWaiting = false;
                    }
    
                    result = pthread_mutex_unlock(&i->Waiter->Mutex);
                    assert(result == 0);
    
                    result = pthread_cond_signal(&i->Waiter->CVariable);
                    assert(result == 0);
    
                    ((SyncObjectPosix_event_st*)event)->RegisteredWaits.pop_front();
    
                    result = pthread_mutex_unlock(&((SyncObjectPosix_event_st*)event)->Mutex);
                    assert(result == 0);
    
                    return 0;
                }
    #endif // WFMO
           // event->State can be false if compiled with WFMO support
                if (((SyncObjectPosix_event_st*)event)->State) {
                    result = pthread_mutex_unlock(&((SyncObjectPosix_event_st*)event)->Mutex);
                    assert(result == 0);
    
                    result = pthread_cond_signal(&((SyncObjectPosix_event_st*)event)->CVariable);
                    assert(result == 0);
    
                    return 0;
                }
            } else {
    #ifdef WFMO
                for (size_t i = 0; i < ((SyncObjectPosix_event_st*)event)->RegisteredWaits.size(); ++i) {
                    SyncObjectPosix_wfmo_info_st* info = &((SyncObjectPosix_event_st*)event)->RegisteredWaits[i];
    
                    result = pthread_mutex_lock(&info->Waiter->Mutex);
                    assert(result == 0);
    
                    --info->Waiter->RefCount;
                    assert(info->Waiter->RefCount >= 0);
    
                    if (!info->Waiter->StillWaiting) {
                        bool destroy = info->Waiter->RefCount == 0;
                        result = pthread_mutex_unlock(&info->Waiter->Mutex);
                        assert(result == 0);
                        if (destroy) {
                            info->Waiter->Destroy();
                            delete info->Waiter;
                        }
                        continue;
                    }
    
                    if (info->Waiter->WaitAll) {
                        --info->Waiter->Status.EventsLeft;
                        assert(info->Waiter->Status.EventsLeft >= 0);
                        // We technically should do i->Waiter->StillWaiting = Waiter->Status.EventsLeft
                        // != 0 but the only time it'll be equal to zero is if we're the last event, so
                        // no one else will be checking the StillWaiting flag. We're good to go without
                        // it.
                    } else {
                        info->Waiter->Status.FiredEvent = info->WaitIndex;
                        info->Waiter->StillWaiting = false;
                    }
    
                    result = pthread_mutex_unlock(&info->Waiter->Mutex);
                    assert(result == 0);
    
                    result = pthread_cond_signal(&info->Waiter->CVariable);
                    assert(result == 0);
                }
                ((SyncObjectPosix_event_st*)event)->RegisteredWaits.clear();
    #endif // WFMO
                result = pthread_mutex_unlock(&((SyncObjectPosix_event_st*)event)->Mutex);
                assert(result == 0);
    
                result = pthread_cond_broadcast(&((SyncObjectPosix_event_st*)event)->CVariable);
                assert(result == 0);
            }
    
            return 0;
        }
    
        int ResetEvent(void* event) {
            int result = pthread_mutex_lock(&((SyncObjectPosix_event_st*)event)->Mutex);
            assert(result == 0);
    
            ((SyncObjectPosix_event_st*)event)->State = false;
    
            result = pthread_mutex_unlock(&((SyncObjectPosix_event_st*)event)->Mutex);
            assert(result == 0);
    
            return 0;
        }
    
    #ifdef PULSE
        int PulseEvent(void* event) {
            // This may look like it's a horribly inefficient kludge with the sole intention of reducing
            // code duplication, but in reality this is what any PulseEvent() implementation must look
            // like. The only overhead (function calls aside, which your compiler will likely optimize
            // away, anyway), is if only WFMO auto-reset waits are active there will be overhead to
            // unnecessarily obtain the event mutex for ResetEvent() after. In all other cases (being no
            // pending waits, WFMO manual-reset waits, or any WFSO waits), the event mutex must first be
            // released for the waiting thread to resume action prior to locking the mutex again in
            // order to set the event state to unsignaled, or else the waiting threads will loop back
            // into a wait (due to checks for spurious CVariable wakeups).
    
            int result = SetEvent(event);
            assert(result == 0);
            result = ResetEvent(event);
            assert(result == 0);
    
            return 0;
        }
    #endif
    
    #else
    
    #include <mutex>
    #include <condition_variable>
    #include <chrono>
    #include <functional>
    
    class SyncObjectPosix
    {
        public:
            SyncObjectPosix(bool initial, bool manual) :
                state(initial), manual(manual)
            {
            }
    
            void change(bool manual)
            {
                std::unique_lock<std::mutex> lock(mutex);
    
                this->manual = manual;
            }
    
            void set()
            {
                std::unique_lock<std::mutex> lock(mutex);
    
                if (state)
                    return;
    
                state = true;
                if (manual)
                    condition.notify_all();
                else
                    condition.notify_one();
            }
    
            void reset()
            {
                std::unique_lock<std::mutex> lock(mutex);
    
                state = false;
            }
    
            void wait()
            {
                std::unique_lock<std::mutex> lock(mutex);
    
                condition.wait(lock, [this] { return state; });
    
                if (!manual)
                    state = false;
            }
    
            template<class Rep, class Period>
            int wait(const std::chrono::duration<Rep, Period> &timeout)
            {
                std::unique_lock<std::mutex> lock(mutex);
    
                if (!condition.wait_for(lock, timeout, [this] {return state;} )) {
                    return WAIT_TIMEOUT;
                }
                //return;
    
                if (!manual)
                    state = false;
                return 0;
            }
    
        private:
            bool return_state() {
                return state;
            };
    
            std::mutex mutex;
            std::condition_variable condition;
            bool state, manual;
    };
    
    inline void* CreateEvent(void* lpEventAttributes, BOOL bManualReset, BOOL bInitialState, void* lpName){
        return (void*)(new SyncObjectPosix(bInitialState, bManualReset));
    }
    
    inline void CloseHandle(void* p_this) {
        delete (SyncObjectPosix*)p_this;
    }
    
    inline int WaitForSingleObject(void* p_this, uint64_t milliseconds = -1){
        SyncObjectPosix* event_this = (SyncObjectPosix*)p_this;
        return event_this->wait(std::chrono::milliseconds(milliseconds));
    }
    
    inline int SetEvent(void* p_this){
        SyncObjectPosix* event_this = (SyncObjectPosix*)p_this;
        event_this->set();
        return 0;
    }
    
    inline int ResetEvent(void* p_this){
        SyncObjectPosix* event_this = (SyncObjectPosix*)p_this;
        event_this->reset();
        return 0;
    }
    
    #endif
  • 相关阅读:
    好代码收藏
    JVM
    关于Redis
    记录ok6410 使用ov9650摄像头的过程
    记录一下在uyuv 转 planar yuv420 的做法
    mini2440 使用的mkyaffs2image 工具的源码
    hi3516a imx178 uboot 默认启动参数
    记录ok6410 使用fast150u 无线网卡的过程 其中部分内容为转载 没有修改
    ubuntu建立tftp服务器有两种方式
    转载hi3516 sd 只读解决
  • 原文地址:https://www.cnblogs.com/eaglexmw/p/11606309.html
Copyright © 2011-2022 走看看