zoukankan      html  css  js  c++  java
  • Monitor.Wait初探(5)

    // We maintain two queues for SyncBlock::Wait.
    // 1. Inside SyncBlock we queue all threads that are waiting on the SyncBlock.
    //    When we pulse, we pick the thread from this queue using FIFO.
    // 2. We queue all SyncBlocks that a thread is waiting for in Thread::m_WaitEventLink.
    //    When we pulse a thread, we find the event from this queue to set, and we also
    //    or in a 1 bit in the syncblock value saved in the queue, so that we can return
    //    immediately from SyncBlock::Wait if the syncblock has been pulsed.
    BOOL SyncBlock::Wait(INT32 timeOut, BOOL exitContext)
    {
        CONTRACTL
        {
            INSTANCE_CHECK;
            THROWS;
            GC_TRIGGERS;
            MODE_ANY;
            INJECT_FAULT(COMPlusThrowOM());
        }
        CONTRACTL_END;

        Thread  *pCurThread = GetThread();
        BOOL     isTimedOut = FALSE;
        BOOL     isEnqueued = FALSE;
        WaitEventLink waitEventLink;
        WaitEventLink *pWaitEventLink;

        // As soon as we flip the switch, we are in a race with the GC, which could clean
        // up the SyncBlock underneath us -- unless we report the object.
        _ASSERTE(pCurThread->PreemptiveGCDisabled());

        // Does this thread already wait for this SyncBlock?
        WaitEventLink *walk = pCurThread->WaitEventLinkForSyncBlock(this);
        if (walk->m_Next) {
            if (walk->m_Next->m_WaitSB == this) {
                // Wait on the same lock again.
                walk->m_Next->m_RefCount ++;
                pWaitEventLink = walk->m_Next;
            }
            else if ((SyncBlock*)(((DWORD_PTR)walk->m_Next->m_WaitSB) & ~1)== this) {
                // This thread has been pulsed.  No need to wait.
                return TRUE;
            }
        }
        else {
            // First time this thread is going to wait for this SyncBlock.
            CLREvent* hEvent;
            if (pCurThread->m_WaitEventLink.m_Next == NULL) {
                hEvent = &(pCurThread->m_EventWait);
            }
            else {
                hEvent = GetEventFromEventStore();
            }
            waitEventLink.m_WaitSB = this;
            waitEventLink.m_EventWait = hEvent;
            waitEventLink.m_Thread = pCurThread;
            waitEventLink.m_Next = NULL;
            waitEventLink.m_LinkSB.m_pNext = NULL;
            waitEventLink.m_RefCount = 1;
            pWaitEventLink = &waitEventLink;
            walk->m_Next = pWaitEventLink;

            // Before we enqueue it (and, thus, before it can be dequeued), reset the event
            // that will awaken us.
            hEvent->Reset();

            // This thread is now waiting on this sync block
            ThreadQueue::EnqueueThread(pWaitEventLink, this);

            isEnqueued = TRUE;
        }

        _ASSERTE ((SyncBlock*)((DWORD_PTR)walk->m_Next->m_WaitSB & ~1)== this);

        PendingSync   syncState(walk);

        OBJECTREF     obj = m_Monitor.GetOwningObject();

        m_Monitor.IncrementTransientPrecious();

        GCPROTECT_BEGIN(obj);
        {
            GCX_PREEMP();

            // remember how many times we synchronized
            syncState.m_EnterCount = LeaveMonitorCompletely();
            _ASSERTE(syncState.m_EnterCount > 0);

            Context* targetContext = pCurThread->GetContext();
            _ASSERTE(targetContext);
            Context* defaultContext = pCurThread->GetDomain()->GetDefaultContext();
            _ASSERTE(defaultContext);

            if (exitContext &&
                targetContext != defaultContext)
            {
                Context::MonitorWaitArgs waitArgs = {timeOut, &syncState, &isTimedOut};
                Context::CallBackInfo callBackInfo = {Context::MonitorWait_callback, (void*) &waitArgs};
                Context::RequestCallBack(CURRENT_APPDOMAIN_ID, defaultContext, &callBackInfo);
            }
            else
            {
                isTimedOut = pCurThread->Block(timeOut, &syncState);
            }
        }
        GCPROTECT_END();
        m_Monitor.DecrementTransientPrecious();

        return !isTimedOut;
    }

    拜托,当你看到函数又臭又长的时候..尤其时还不熟悉的时候,一定要看函数的描述,该函数开头之前的函数说明解释了两件事情:

    1.在SyncBlock 内部维护了一个等待所有这个SyncBlock 的线程队列,当调用pulse的时候(如Monitor.Pulse)会从该队列取出下一个线程,方式是先进先出。

    2.使用另外一个队列维护所有有线程正在waiting的SyncBlock ,队列类型为WaitEventLink(也即是Thread::m_WaitEventLink的类型),一旦有pulse调用,会从该队列取出一个Event并set.


    现在再来看函数代码部分,重点看横线的代码行:

        WaitEventLink *walk = pCurThread->WaitEventLinkForSyncBlock(this);
    先检查当前线程是否已经在等待对象的同步索引块,本示例中显然是第一次,然后通过

    hEvent = &(pCurThread->m_EventWait);或者
    hEvent = GetEventFromEventStore();获取一个等待事件对象

    之后会走  ThreadQueue::EnqueueThread(pWaitEventLink, this);

    从而把当前线程加入到等待队列,这时候我的脑海中又想起来MSDN上对Monitor.Wait的描述:

    当线程调用 Wait 时,它释放对象的锁并进入对象的等待队列。 对象的就绪队列中的下一个线程(如果有)获取锁并拥有对对象的独占使用。

    这下大概能对上号了吧。

    在函数最后,还是调用了isTimedOut = pCurThread->Block(timeOut, &syncState);以实现实现当前线程的等待(或曰阻塞)。

    所以依旧要看看这个Block方法的实现:

    // Called out of SyncBlock::Wait() to block this thread until the Notify occurs.
    BOOL Thread::Block(INT32 timeOut, PendingSync *syncState)
    {
        WRAPPER_CONTRACT;

        _ASSERTE(this == GetThread());

        // Before calling Block, the SyncBlock queued us onto it's list of waiting threads.
        // However, before calling Block the SyncBlock temporarily left the synchronized
        // region.  This allowed threads to enter the region and call Notify, in which
        // case we may have been signalled before we entered the Wait.  So we aren't in the
        // m_WaitSB list any longer.  Not a problem: the following Wait will return
        // immediately.  But it means we cannot enforce the following assertion:
    //    _ASSERTE(m_WaitSB != NULL);

        return (Wait(syncState->m_WaitEventLink->m_Next->m_EventWait, timeOut, syncState) != WAIT_OBJECT_0);
    }

    Block又调用了Thread的Wait方法:

    // Return whether or not a timeout occured.  TRUE=>we waited successfully
    DWORD Thread::Wait(CLREvent *pEvent, INT32 timeOut, PendingSync *syncInfo)
    {
        WRAPPER_CONTRACT;

        DWORD   dwResult;
        DWORD   dwTimeOut32;

        _ASSERTE(timeOut >= 0 || timeOut == INFINITE_TIMEOUT);

        dwTimeOut32 = (timeOut == INFINITE_TIMEOUT
                       ? INFINITE
                       : (DWORD) timeOut);

        dwResult = pEvent->Wait(dwTimeOut32, TRUE /*alertable*/, syncInfo);

        // Either we succeeded in the wait, or we timed out
        _ASSERTE((dwResult == WAIT_OBJECT_0) ||
                 (dwResult == WAIT_TIMEOUT));

        return dwResult;
    }

    Wait又调用了pEvent的Wait方法,注意这里的pEvent是CLREvent类型,而该参数的值则是之前在SyncBlock::Wait获取的等待事件对象。这里我们可以大胆猜测CLREvent对应的其实是一个内核事件对象。

    CLREvent的Wait实现如下,有点长,看关键的横线代码行:

    DWORD CLREvent::Wait(DWORD dwMilliseconds, BOOL alertable, PendingSync *syncState)
    {
        WRAPPER_CONTRACT;
        return WaitEx(dwMilliseconds, alertable?WaitMode_Alertable:WaitMode_None,syncState);
    }

    紧接着WaitEx的实现如下:

    DWORD CLREvent::WaitEx(DWORD dwMilliseconds, WaitMode mode, PendingSync *syncState)
    {
        BOOL alertable = (mode & WaitMode_Alertable)!=0;
        CONTRACTL
        {
            if (alertable)
            {
                THROWS;               // Thread::DoAppropriateWait can throw  
            }
            else
            {
                NOTHROW;
            }
            if (GetThread())
            {
                if (alertable)
                    GC_TRIGGERS;
                else
                    GC_NOTRIGGER;
            }
            else
            {
                DISABLED(GC_TRIGGERS);       
            }
            SO_TOLERANT;
            PRECONDITION(m_handle != INVALID_HANDLE_VALUE); // Handle has to be valid
        }
        CONTRACTL_END;

        _ASSERTE(Thread::AllowCallout());

        Thread *pThread = GetThread();   
    #ifdef _DEBUG
        // If a CLREvent is OS event only, we can not wait for the event on a managed thread
        if (IsOSEvent())
            _ASSERTE (!pThread);
    #endif
        _ASSERTE (pThread || !g_fEEStarted || dbgOnly_IsSpecialEEThread());

        if (IsOSEvent() || !CLRSyncHosted()) {
            if (pThread && alertable) {
                DWORD dwRet = WAIT_FAILED;
                BEGIN_SO_INTOLERANT_CODE_NOTHROW (pThread, return WAIT_FAILED;);
                dwRet = pThread->DoAppropriateWait(1, &m_handle, FALSE, dwMilliseconds,
                                                  mode,
                                                  syncState);
                END_SO_INTOLERANT_CODE;
                return dwRet;
            }
            else {
                _ASSERTE (syncState == NULL);
                return CLREventWaitHelper(m_handle,dwMilliseconds,alertable);
            }
        }
        else {   
           if (pThread && alertable) {
                DWORD dwRet = WAIT_FAILED;
                BEGIN_SO_INTOLERANT_CODE_NOTHROW (pThread, return WAIT_FAILED;);
                dwRet = pThread->DoAppropriateWait(IsAutoEvent()?HostAutoEventWait:HostManualEventWait,
                                                  m_handle,dwMilliseconds,
                                                  mode,
                                                  syncState);
                END_SO_INTOLERANT_CODE;
                return dwRet;
            }
            else {
                _ASSERTE (syncState == NULL);
                DWORD option = 0;
                if (alertable) {
                    option |= WAIT_ALERTABLE;
                }
                if (IsAutoEvent()) {
                    return HostAutoEventWait((IHostAutoEvent*)m_handle,dwMilliseconds, option);
                }
                else {
                    return HostManualEventWait((IHostManualEvent*)m_handle,dwMilliseconds, option);
                }
            }
        }   
    }


    这里又调用了Thread的DoAppropriateWait;
    DoAppropriateWait的实现如下:

    DWORD Thread::DoAppropriateWait(int countHandles, HANDLE *handles, BOOL waitAll,
                                    DWORD millis, WaitMode mode, PendingSync *syncState)
    {
        STATIC_CONTRACT_THROWS;
        STATIC_CONTRACT_GC_TRIGGERS;

        INDEBUG(BOOL alertable = (mode & WaitMode_Alertable) != 0;);
        _ASSERTE(alertable || syncState == 0);

        DWORD dwRet = (DWORD) -1;

        EE_TRY_FOR_FINALLY {
            dwRet =DoAppropriateWaitWorker(countHandles, handles, waitAll, millis, mode);
        }
        EE_FINALLY {
            if (syncState) {
                if (!GOT_EXCEPTION() &&
                    dwRet >= WAIT_OBJECT_0 && dwRet < (DWORD)(WAIT_OBJECT_0 + countHandles)) {
                    // This thread has been removed from syncblk waiting list by the signalling thread
                    syncState->Restore(FALSE);
                }
                else
                    syncState->Restore(TRUE);
            }

            _ASSERTE (dwRet != WAIT_IO_COMPLETION);
        }
        EE_END_FINALLY;

        return(dwRet);
    }

    then,DoAppropriateWaitWorker的实现如下,有点长,只看最关键那一句:

    DWORD Thread::DoAppropriateWaitWorker(int countHandles, HANDLE *handles, BOOL waitAll,
                                          DWORD millis, WaitMode mode)
    {
        CONTRACTL {
            THROWS;
            GC_TRIGGERS;
        }
        CONTRACTL_END;

        DWORD ret = 0;

        BOOL alertable = (mode & WaitMode_Alertable)!= 0;
        BOOL ignoreSyncCtx = (mode & WaitMode_IgnoreSyncCtx)!= 0;

        // Unless the ignoreSyncCtx flag is set, first check to see if there is a synchronization
        // context on the current thread and if there is, dispatch to it to do the wait.
        // If  the wait is non alertable we cannot forward the call to the sync context
        // since fundamental parts of the system (such as the GC) rely on non alertable
        // waits not running any managed code. Also if we are past the point in shutdown were we
        // are allowed to run managed code then we can't forward the call to the sync context.
        if (!ignoreSyncCtx && alertable && CanRunManagedCode(FALSE))
        {
            GCX_COOP();

            BOOL fSyncCtxPresent = FALSE;
            OBJECTREF SyncCtxObj = NULL;
            GCPROTECT_BEGIN(SyncCtxObj)
            {
                GetSynchronizationContext(&SyncCtxObj);
                if (SyncCtxObj != NULL)
                {
                    SYNCHRONIZATIONCONTEXTREF syncRef = (SYNCHRONIZATIONCONTEXTREF)SyncCtxObj;
                    if (syncRef->IsWaitNotificationRequired())
                    {
                        fSyncCtxPresent = TRUE;
                        ret = DoSyncContextWait(&SyncCtxObj, countHandles, handles, waitAll, millis);
                    }
                }
            }
            GCPROTECT_END();

            if (fSyncCtxPresent)
                return ret;
        }

        GCX_PREEMP();

        if(alertable)
        {
            DoAppropriateWaitWorkerAlertableHelper(mode);
        }

        LeaveRuntimeHolder holder((size_t)WaitForMultipleObjectsEx);
        StateHolder<MarkOSAlertableWait,UnMarkOSAlertableWait> OSAlertableWait(alertable);

        ThreadStateHolder tsh(alertable, TS_Interruptible | TS_Interrupted);

        ULONGLONG dwStart = 0, dwEnd;
    retry:
        if (millis != INFINITE)
        {
            dwStart = CLRGetTickCount64();
        }

        ret = DoAppropriateAptStateWait(countHandles, handles, waitAll, millis, mode);

        if (ret == WAIT_IO_COMPLETION)
        {
            _ASSERTE (alertable);

            if (m_State & TS_Interrupted)
            {
                HandleThreadInterrupt(mode & WaitMode_ADUnload);
            }
            // We could be woken by some spurious APC or an EE APC queued to
            // interrupt us. In the latter case the TS_Interrupted bit will be set
            // in the thread state bits. Otherwise we just go back to sleep again.
            if (millis != INFINITE)
            {
                dwEnd = CLRGetTickCount64();
                if (dwEnd >= dwStart + millis)
                {
                    ret = WAIT_TIMEOUT;
                    goto WaitCompleted;
                }
                else
                {
                    millis -= (DWORD)(dwEnd - dwStart);
                }
            }
            goto retry;
        }
        _ASSERTE((ret >= WAIT_OBJECT_0  && ret < (WAIT_OBJECT_0  + (DWORD)countHandles)) ||
                 (ret >= WAIT_ABANDONED && ret < (WAIT_ABANDONED + (DWORD)countHandles)) ||
                 (ret == WAIT_TIMEOUT) || (ret == WAIT_FAILED));
        // countHandles is used as an unsigned -- it should never be negative.
        _ASSERTE(countHandles >= 0);

        if (ret == WAIT_FAILED)
        {
            DWORD errorCode = ::GetLastError();
            if (errorCode == ERROR_INVALID_PARAMETER)
            {
                if (CheckForDuplicateHandles(countHandles, handles))
                    COMPlusThrow(kDuplicateWaitObjectException);
                else
                    COMPlusThrowHR(HRESULT_FROM_WIN32(errorCode));
            }
            else if (errorCode == ERROR_ACCESS_DENIED)
            {
                // A Win32 ACL could prevent us from waiting on the handle.
                COMPlusThrow(kUnauthorizedAccessException);
            }

            _ASSERTE(errorCode == ERROR_INVALID_HANDLE);

            if (countHandles == 1)
                ret = WAIT_OBJECT_0;
            else if (waitAll)
            {
                // Probe all handles with a timeout of zero. When we find one that's
                // invalid, move it out of the list and retry the wait.
    #ifdef _DEBUG
                BOOL fFoundInvalid = FALSE;
    #endif
                for (int i = 0; i < countHandles; i++)
                {
                    // WaitForSingleObject won't pump memssage; we already probe enough space
                    // before calling this function and we don't want to fail here, so we don't
                    // do a transition to tolerant code here
                    DWORD subRet = WaitForSingleObject (handles[i], 0);
                    if (subRet != WAIT_FAILED)
                        continue;
                    _ASSERTE(::GetLastError() == ERROR_INVALID_HANDLE);
                    if ((countHandles - i - 1) > 0)
                        memmove(&handles[i], &handles[i+1], (countHandles - i - 1) * sizeof(HANDLE));
                    countHandles--;
    #ifdef _DEBUG
                    fFoundInvalid = TRUE;
    #endif
                    break;
                }
                _ASSERTE(fFoundInvalid);

                // Compute the new timeout value by assume that the timeout
                // is not large enough for more than one wrap
                dwEnd = CLRGetTickCount64();
                if (millis != INFINITE)
                {
                    if (dwEnd >= dwStart + millis)
                    {
                        ret = WAIT_TIMEOUT;
                        goto WaitCompleted;
                    }
                    else
                    {
                        millis -= (DWORD)(dwEnd - dwStart);
                    }
                }
                goto retry;
            }
            else
            {
                // Probe all handles with a timeout as zero, succeed with the first
                // handle that doesn't timeout.
                ret = WAIT_OBJECT_0;
                int i;
                for (i = 0; i < countHandles; i++)
                {
                TryAgain:
                    // WaitForSingleObject won't pump memssage; we already probe enough space
                    // before calling this function and we don't want to fail here, so we don't
                    // do a transition to tolerant code here
                    DWORD subRet = WaitForSingleObject (handles[i], 0);
                    if ((subRet == WAIT_OBJECT_0) || (subRet == WAIT_FAILED))
                        break;
                    if (subRet == WAIT_ABANDONED)
                    {
                        ret = (ret - WAIT_OBJECT_0) + WAIT_ABANDONED;
                        break;
                    }
                    // If we get alerted it just masks the real state of the current
                    // handle, so retry the wait.
                    if (subRet == WAIT_IO_COMPLETION)
                        goto TryAgain;
                    _ASSERTE(subRet == WAIT_TIMEOUT);
                    ret++;
                }
                _ASSERTE(i != countHandles);
            }
        }

    WaitCompleted:

        _ASSERTE((ret != WAIT_TIMEOUT) || (millis != INFINITE));

        return ret;
    }

    then, 还要看 DoAppropriateAptStateWait(countHandles, handles, waitAll, millis, mode)的实现:

    DWORD Thread::DoAppropriateAptStateWait(int numWaiters, HANDLE* pHandles, BOOL bWaitAll,
                                             DWORD timeout, WaitMode mode)
    {
        CONTRACTL {
            THROWS;
            GC_TRIGGERS;
        }
        CONTRACTL_END;

        BOOL alertable = (mode&WaitMode_Alertable)!=0;

        return WaitForMultipleObjectsEx_SO_TOLERANT(numWaiters, pHandles,bWaitAll, timeout,alertable);
    }

    then,再看WaitForMultipleObjectsEx_SO_TOLERANT的实现:

    DWORD WaitForMultipleObjectsEx_SO_TOLERANT (DWORD nCount, HANDLE *lpHandles, BOOL bWaitAll,DWORD dwMilliseconds, BOOL bAlertable)
    {
        DWORD dwRet = WAIT_FAILED;
        DWORD lastError = 0;
        BEGIN_SO_TOLERANT_CODE (GetThread ());
        dwRet = ::WaitForMultipleObjectsEx (nCount, lpHandles, bWaitAll, dwMilliseconds, bAlertable);
        lastError = ::GetLastError();
        END_SO_TOLERANT_CODE;

        // END_SO_TOLERANT_CODE overwrites lasterror.  Let's reset it.
        ::SetLastError(lastError);
        return dwRet;
    }

    到这里,万水千山,我们终于搞清楚Monitor.Wait的大概实现原理(事实上我们只捋了一遍本文示例中Monitor.Enter的调用stack),内部最终还是调用了WaitForMultipleObjectsEx,不过要注意CLREvent::WaitEx的实现有好几个分支,根据情况的不同,最后调的并不一定是WaitForMultipleObjectsEx,也有可能是CLREventWaitHelper->WaitForSingleObjectEx等等。

  • 相关阅读:
    xcode多target管理不同的环境(pod多target配置)
    OC与swift混编 #import "项目名-Swift.h"失效问题
    令人困惑的strtotime
    krpano 学习第一天
    git 全量同步分支
    MYSQL 什么时候用单列索引?什么使用用联合索引?
    _blocking_errnos = {errno.EAGAIN, errno.EWOULDBLOCK} pip
    Mac php 装imagick扩展 菜鸟教程
    git仓库搬家
    文章简介 字符串截取
  • 原文地址:https://www.cnblogs.com/dancewithautomation/p/2416290.html
Copyright © 2011-2022 走看看