// We maintain two queues for SyncBlock::Wait.
// 1. Inside SyncBlock we queue all threads that are waiting on the SyncBlock.
// When we pulse, we pick the thread from this queue using FIFO.
// 2. We queue all SyncBlocks that a thread is waiting for in Thread::m_WaitEventLink.
// When we pulse a thread, we find the event from this queue to set, and we also
// or in a 1 bit in the syncblock value saved in the queue, so that we can return
// immediately from SyncBlock::Wait if the syncblock has been pulsed.
BOOL SyncBlock::Wait(INT32 timeOut, BOOL exitContext)
{
CONTRACTL
{
INSTANCE_CHECK;
THROWS;
GC_TRIGGERS;
MODE_ANY;
INJECT_FAULT(COMPlusThrowOM());
}
CONTRACTL_END;
Thread *pCurThread = GetThread();
BOOL isTimedOut = FALSE;
BOOL isEnqueued = FALSE;
WaitEventLink waitEventLink;
WaitEventLink *pWaitEventLink;
// As soon as we flip the switch, we are in a race with the GC, which could clean
// up the SyncBlock underneath us -- unless we report the object.
_ASSERTE(pCurThread->PreemptiveGCDisabled());
// Does this thread already wait for this SyncBlock? WaitEventLink *walk = pCurThread->WaitEventLinkForSyncBlock(this); if (walk->m_Next) {
if (walk->m_Next->m_WaitSB == this) {
// Wait on the same lock again.
walk->m_Next->m_RefCount ++;
pWaitEventLink = walk->m_Next;
}
else if ((SyncBlock*)(((DWORD_PTR)walk->m_Next->m_WaitSB) & ~1)== this) {
// This thread has been pulsed. No need to wait.
return TRUE;
}
}
else { // First time this thread is going to wait for this SyncBlock. CLREvent* hEvent;
if (pCurThread->m_WaitEventLink.m_Next == NULL) {
hEvent = &(pCurThread->m_EventWait); }
else {
hEvent = GetEventFromEventStore(); }
waitEventLink.m_WaitSB = this;
waitEventLink.m_EventWait = hEvent;
waitEventLink.m_Thread = pCurThread;
waitEventLink.m_Next = NULL;
waitEventLink.m_LinkSB.m_pNext = NULL;
waitEventLink.m_RefCount = 1;
pWaitEventLink = &waitEventLink;
walk->m_Next = pWaitEventLink;
// Before we enqueue it (and, thus, before it can be dequeued), reset the event
// that will awaken us.
hEvent->Reset();
// This thread is now waiting on this sync block ThreadQueue::EnqueueThread(pWaitEventLink, this);
isEnqueued = TRUE;
}
_ASSERTE ((SyncBlock*)((DWORD_PTR)walk->m_Next->m_WaitSB & ~1)== this);
PendingSync syncState(walk);
OBJECTREF obj = m_Monitor.GetOwningObject();
m_Monitor.IncrementTransientPrecious();
GCPROTECT_BEGIN(obj);
{
GCX_PREEMP();
// remember how many times we synchronized
syncState.m_EnterCount = LeaveMonitorCompletely();
_ASSERTE(syncState.m_EnterCount > 0);
Context* targetContext = pCurThread->GetContext();
_ASSERTE(targetContext);
Context* defaultContext = pCurThread->GetDomain()->GetDefaultContext();
_ASSERTE(defaultContext);
if (exitContext &&
targetContext != defaultContext)
{
Context::MonitorWaitArgs waitArgs = {timeOut, &syncState, &isTimedOut};
Context::CallBackInfo callBackInfo = {Context::MonitorWait_callback, (void*) &waitArgs};
Context::RequestCallBack(CURRENT_APPDOMAIN_ID, defaultContext, &callBackInfo);
}
else
{ isTimedOut = pCurThread->Block(timeOut, &syncState); }
}
GCPROTECT_END();
m_Monitor.DecrementTransientPrecious();
return !isTimedOut;
}
拜托,当你看到函数又臭又长的时候..尤其时还不熟悉的时候,一定要看函数的描述,该函数开头之前的函数说明解释了两件事情:
1.在SyncBlock 内部维护了一个等待所有这个SyncBlock 的线程队列,当调用pulse的时候(如Monitor.Pulse)会从该队列取出下一个线程,方式是先进先出。
2.使用另外一个队列维护所有有线程正在waiting的SyncBlock ,队列类型为WaitEventLink(也即是Thread::m_WaitEventLink的类型),一旦有pulse调用,会从该队列取出一个Event并set.
现在再来看函数代码部分,重点看横线的代码行:
WaitEventLink *walk = pCurThread->WaitEventLinkForSyncBlock(this);
先检查当前线程是否已经在等待对象的同步索引块,本示例中显然是第一次,然后通过
hEvent = GetEventFromEventStore();获取一个等待事件对象
之后会走 ThreadQueue::EnqueueThread(pWaitEventLink, this);
从而把当前线程加入到等待队列,这时候我的脑海中又想起来MSDN上对Monitor.Wait的描述:
当线程调用 Wait 时,它释放对象的锁并进入对象的等待队列。 对象的就绪队列中的下一个线程(如果有)获取锁并拥有对对象的独占使用。
这下大概能对上号了吧。
在函数最后,还是调用了isTimedOut = pCurThread->Block(timeOut, &syncState);以实现实现当前线程的等待(或曰阻塞)。
所以依旧要看看这个Block方法的实现:
// Called out of SyncBlock::Wait() to block this thread until the Notify occurs.
BOOL Thread::Block(INT32 timeOut, PendingSync *syncState)
{
WRAPPER_CONTRACT;
_ASSERTE(this == GetThread());
// Before calling Block, the SyncBlock queued us onto it's list of waiting threads.
// However, before calling Block the SyncBlock temporarily left the synchronized
// region. This allowed threads to enter the region and call Notify, in which
// case we may have been signalled before we entered the Wait. So we aren't in the
// m_WaitSB list any longer. Not a problem: the following Wait will return
// immediately. But it means we cannot enforce the following assertion:
// _ASSERTE(m_WaitSB != NULL);
return (Wait(syncState->m_WaitEventLink->m_Next->m_EventWait, timeOut, syncState) != WAIT_OBJECT_0);
}
Block又调用了Thread的Wait方法:
// Return whether or not a timeout occured. TRUE=>we waited successfully
DWORD Thread::Wait(CLREvent *pEvent, INT32 timeOut, PendingSync *syncInfo)
{
WRAPPER_CONTRACT;
DWORD dwResult;
DWORD dwTimeOut32;
_ASSERTE(timeOut >= 0 || timeOut == INFINITE_TIMEOUT);
dwTimeOut32 = (timeOut == INFINITE_TIMEOUT
? INFINITE
: (DWORD) timeOut);
dwResult = pEvent->Wait(dwTimeOut32, TRUE /*alertable*/, syncInfo);
// Either we succeeded in the wait, or we timed out
_ASSERTE((dwResult == WAIT_OBJECT_0) ||
(dwResult == WAIT_TIMEOUT));
return dwResult;
}
Wait又调用了pEvent的Wait方法,注意这里的pEvent是CLREvent类型,而该参数的值则是之前在SyncBlock::Wait获取的等待事件对象。这里我们可以大胆猜测CLREvent对应的其实是一个内核事件对象。
CLREvent的Wait实现如下,有点长,看关键的横线代码行:
DWORD CLREvent::Wait(DWORD dwMilliseconds, BOOL alertable, PendingSync *syncState)
{
WRAPPER_CONTRACT;
return WaitEx(dwMilliseconds, alertable?WaitMode_Alertable:WaitMode_None,syncState);
}
紧接着WaitEx的实现如下:
DWORD CLREvent::WaitEx(DWORD dwMilliseconds, WaitMode mode, PendingSync *syncState)
{
BOOL alertable = (mode & WaitMode_Alertable)!=0;
CONTRACTL
{
if (alertable)
{
THROWS; // Thread::DoAppropriateWait can throw
}
else
{
NOTHROW;
}
if (GetThread())
{
if (alertable)
GC_TRIGGERS;
else
GC_NOTRIGGER;
}
else
{
DISABLED(GC_TRIGGERS);
}
SO_TOLERANT;
PRECONDITION(m_handle != INVALID_HANDLE_VALUE); // Handle has to be valid
}
CONTRACTL_END;
_ASSERTE(Thread::AllowCallout());
Thread *pThread = GetThread();
#ifdef _DEBUG
// If a CLREvent is OS event only, we can not wait for the event on a managed thread
if (IsOSEvent())
_ASSERTE (!pThread);
#endif
_ASSERTE (pThread || !g_fEEStarted || dbgOnly_IsSpecialEEThread());
if (IsOSEvent() || !CLRSyncHosted()) {
if (pThread && alertable) {
DWORD dwRet = WAIT_FAILED;
BEGIN_SO_INTOLERANT_CODE_NOTHROW (pThread, return WAIT_FAILED;); dwRet = pThread->DoAppropriateWait(1, &m_handle, FALSE, dwMilliseconds, END_SO_INTOLERANT_CODE;
mode,
syncState);
return dwRet;
}
else {
_ASSERTE (syncState == NULL);
return CLREventWaitHelper(m_handle,dwMilliseconds,alertable);
}
}
else {
if (pThread && alertable) {
DWORD dwRet = WAIT_FAILED;
BEGIN_SO_INTOLERANT_CODE_NOTHROW (pThread, return WAIT_FAILED;);
dwRet = pThread->DoAppropriateWait(IsAutoEvent()?HostAutoEventWait:HostManualEventWait, END_SO_INTOLERANT_CODE;
m_handle,dwMilliseconds,
mode,
syncState);
return dwRet;
}
else {
_ASSERTE (syncState == NULL);
DWORD option = 0;
if (alertable) {
option |= WAIT_ALERTABLE;
}
if (IsAutoEvent()) {
return HostAutoEventWait((IHostAutoEvent*)m_handle,dwMilliseconds, option);
}
else {
return HostManualEventWait((IHostManualEvent*)m_handle,dwMilliseconds, option);
}
}
}
}
这里又调用了Thread的DoAppropriateWait;
DoAppropriateWait的实现如下:
DWORD Thread::DoAppropriateWait(int countHandles, HANDLE *handles, BOOL waitAll,
DWORD millis, WaitMode mode, PendingSync *syncState)
{
STATIC_CONTRACT_THROWS;
STATIC_CONTRACT_GC_TRIGGERS;
INDEBUG(BOOL alertable = (mode & WaitMode_Alertable) != 0;);
_ASSERTE(alertable || syncState == 0);
DWORD dwRet = (DWORD) -1;
EE_TRY_FOR_FINALLY {
dwRet =DoAppropriateWaitWorker(countHandles, handles, waitAll, millis, mode); }
EE_FINALLY {
if (syncState) {
if (!GOT_EXCEPTION() &&
dwRet >= WAIT_OBJECT_0 && dwRet < (DWORD)(WAIT_OBJECT_0 + countHandles)) {
// This thread has been removed from syncblk waiting list by the signalling thread
syncState->Restore(FALSE);
}
else
syncState->Restore(TRUE);
}
_ASSERTE (dwRet != WAIT_IO_COMPLETION);
}
EE_END_FINALLY;
return(dwRet);
}
then,DoAppropriateWaitWorker的实现如下,有点长,只看最关键那一句:
DWORD Thread::DoAppropriateWaitWorker(int countHandles, HANDLE *handles, BOOL waitAll,
DWORD millis, WaitMode mode)
{
CONTRACTL {
THROWS;
GC_TRIGGERS;
}
CONTRACTL_END;
DWORD ret = 0;
BOOL alertable = (mode & WaitMode_Alertable)!= 0;
BOOL ignoreSyncCtx = (mode & WaitMode_IgnoreSyncCtx)!= 0;
// Unless the ignoreSyncCtx flag is set, first check to see if there is a synchronization
// context on the current thread and if there is, dispatch to it to do the wait.
// If the wait is non alertable we cannot forward the call to the sync context
// since fundamental parts of the system (such as the GC) rely on non alertable
// waits not running any managed code. Also if we are past the point in shutdown were we
// are allowed to run managed code then we can't forward the call to the sync context.
if (!ignoreSyncCtx && alertable && CanRunManagedCode(FALSE))
{
GCX_COOP();
BOOL fSyncCtxPresent = FALSE;
OBJECTREF SyncCtxObj = NULL;
GCPROTECT_BEGIN(SyncCtxObj)
{
GetSynchronizationContext(&SyncCtxObj);
if (SyncCtxObj != NULL)
{
SYNCHRONIZATIONCONTEXTREF syncRef = (SYNCHRONIZATIONCONTEXTREF)SyncCtxObj;
if (syncRef->IsWaitNotificationRequired())
{
fSyncCtxPresent = TRUE;
ret = DoSyncContextWait(&SyncCtxObj, countHandles, handles, waitAll, millis);
}
}
}
GCPROTECT_END();
if (fSyncCtxPresent)
return ret;
}
GCX_PREEMP();
if(alertable)
{
DoAppropriateWaitWorkerAlertableHelper(mode);
}
LeaveRuntimeHolder holder((size_t)WaitForMultipleObjectsEx);
StateHolder<MarkOSAlertableWait,UnMarkOSAlertableWait> OSAlertableWait(alertable);
ThreadStateHolder tsh(alertable, TS_Interruptible | TS_Interrupted);
ULONGLONG dwStart = 0, dwEnd;
retry:
if (millis != INFINITE)
{
dwStart = CLRGetTickCount64();
}
ret = DoAppropriateAptStateWait(countHandles, handles, waitAll, millis, mode);
if (ret == WAIT_IO_COMPLETION)
{
_ASSERTE (alertable);
if (m_State & TS_Interrupted)
{
HandleThreadInterrupt(mode & WaitMode_ADUnload);
}
// We could be woken by some spurious APC or an EE APC queued to
// interrupt us. In the latter case the TS_Interrupted bit will be set
// in the thread state bits. Otherwise we just go back to sleep again.
if (millis != INFINITE)
{
dwEnd = CLRGetTickCount64();
if (dwEnd >= dwStart + millis)
{
ret = WAIT_TIMEOUT;
goto WaitCompleted;
}
else
{
millis -= (DWORD)(dwEnd - dwStart);
}
}
goto retry;
}
_ASSERTE((ret >= WAIT_OBJECT_0 && ret < (WAIT_OBJECT_0 + (DWORD)countHandles)) ||
(ret >= WAIT_ABANDONED && ret < (WAIT_ABANDONED + (DWORD)countHandles)) ||
(ret == WAIT_TIMEOUT) || (ret == WAIT_FAILED));
// countHandles is used as an unsigned -- it should never be negative.
_ASSERTE(countHandles >= 0);
if (ret == WAIT_FAILED)
{
DWORD errorCode = ::GetLastError();
if (errorCode == ERROR_INVALID_PARAMETER)
{
if (CheckForDuplicateHandles(countHandles, handles))
COMPlusThrow(kDuplicateWaitObjectException);
else
COMPlusThrowHR(HRESULT_FROM_WIN32(errorCode));
}
else if (errorCode == ERROR_ACCESS_DENIED)
{
// A Win32 ACL could prevent us from waiting on the handle.
COMPlusThrow(kUnauthorizedAccessException);
}
_ASSERTE(errorCode == ERROR_INVALID_HANDLE);
if (countHandles == 1)
ret = WAIT_OBJECT_0;
else if (waitAll)
{
// Probe all handles with a timeout of zero. When we find one that's
// invalid, move it out of the list and retry the wait.
#ifdef _DEBUG
BOOL fFoundInvalid = FALSE;
#endif
for (int i = 0; i < countHandles; i++)
{
// WaitForSingleObject won't pump memssage; we already probe enough space
// before calling this function and we don't want to fail here, so we don't
// do a transition to tolerant code here
DWORD subRet = WaitForSingleObject (handles[i], 0);
if (subRet != WAIT_FAILED)
continue;
_ASSERTE(::GetLastError() == ERROR_INVALID_HANDLE);
if ((countHandles - i - 1) > 0)
memmove(&handles[i], &handles[i+1], (countHandles - i - 1) * sizeof(HANDLE));
countHandles--;
#ifdef _DEBUG
fFoundInvalid = TRUE;
#endif
break;
}
_ASSERTE(fFoundInvalid);
// Compute the new timeout value by assume that the timeout
// is not large enough for more than one wrap
dwEnd = CLRGetTickCount64();
if (millis != INFINITE)
{
if (dwEnd >= dwStart + millis)
{
ret = WAIT_TIMEOUT;
goto WaitCompleted;
}
else
{
millis -= (DWORD)(dwEnd - dwStart);
}
}
goto retry;
}
else
{
// Probe all handles with a timeout as zero, succeed with the first
// handle that doesn't timeout.
ret = WAIT_OBJECT_0;
int i;
for (i = 0; i < countHandles; i++)
{
TryAgain:
// WaitForSingleObject won't pump memssage; we already probe enough space
// before calling this function and we don't want to fail here, so we don't
// do a transition to tolerant code here
DWORD subRet = WaitForSingleObject (handles[i], 0);
if ((subRet == WAIT_OBJECT_0) || (subRet == WAIT_FAILED))
break;
if (subRet == WAIT_ABANDONED)
{
ret = (ret - WAIT_OBJECT_0) + WAIT_ABANDONED;
break;
}
// If we get alerted it just masks the real state of the current
// handle, so retry the wait.
if (subRet == WAIT_IO_COMPLETION)
goto TryAgain;
_ASSERTE(subRet == WAIT_TIMEOUT);
ret++;
}
_ASSERTE(i != countHandles);
}
}
WaitCompleted:
_ASSERTE((ret != WAIT_TIMEOUT) || (millis != INFINITE));
return ret;
}
then, 还要看 DoAppropriateAptStateWait(countHandles, handles, waitAll, millis, mode)的实现:
DWORD Thread::DoAppropriateAptStateWait(int numWaiters, HANDLE* pHandles, BOOL bWaitAll,
DWORD timeout, WaitMode mode)
{
CONTRACTL {
THROWS;
GC_TRIGGERS;
}
CONTRACTL_END;
BOOL alertable = (mode&WaitMode_Alertable)!=0;
return WaitForMultipleObjectsEx_SO_TOLERANT(numWaiters, pHandles,bWaitAll, timeout,alertable);
}
then,再看WaitForMultipleObjectsEx_SO_TOLERANT的实现:
DWORD WaitForMultipleObjectsEx_SO_TOLERANT (DWORD nCount, HANDLE *lpHandles, BOOL bWaitAll,DWORD dwMilliseconds, BOOL bAlertable)
{
DWORD dwRet = WAIT_FAILED;
DWORD lastError = 0;
BEGIN_SO_TOLERANT_CODE (GetThread ());
dwRet = ::WaitForMultipleObjectsEx (nCount, lpHandles, bWaitAll, dwMilliseconds, bAlertable);
lastError = ::GetLastError();
END_SO_TOLERANT_CODE;
// END_SO_TOLERANT_CODE overwrites lasterror. Let's reset it.
::SetLastError(lastError);
return dwRet;
}
到这里,万水千山,我们终于搞清楚Monitor.Wait的大概实现原理(事实上我们只捋了一遍本文示例中Monitor.Enter的调用stack),内部最终还是调用了WaitForMultipleObjectsEx,不过要注意CLREvent::WaitEx的实现有好几个分支,根据情况的不同,最后调的并不一定是WaitForMultipleObjectsEx,也有可能是CLREventWaitHelper->WaitForSingleObjectEx等等。