紧接上回。
Monitor.Enter究竟有没有自旋,还记得在Monitor.Wait初探系列中我们顺带也分析了Enter对应的代码逻辑嘛?当时通过阅读代码以及windbg调试跟踪两种方式都确认等待锁的线程最终会调用WaitXXXX进入内核态。现在再来回顾那段代码,就是AwareLock类的AwareLock::Enter方法:
void AwareLock::Enter() { CONTRACTL { INSTANCE_CHECK; THROWS; GC_TRIGGERS; MODE_ANY; INJECT_FAULT(COMPlusThrowOM();); } CONTRACTL_END; Thread *pCurThread = GetThread(); for (;;) { // Read existing lock state. volatile LONG state = m_MonitorHeld; if (state == 0) { // Common case: lock not held, no waiters. Attempt to acquire lock by // switching lock bit. if (FastInterlockCompareExchange((LONG*)&m_MonitorHeld, 1, 0) == 0) { break; } } else { // It's possible to get here with waiters but no lock held, but in this // case a signal is about to be fired which will wake up a waiter. So // for fairness sake we should wait too. // Check first for recursive lock attempts on the same thread. if (m_HoldingThread == pCurThread) { goto Recursion; } // Attempt to increment this count of waiters then goto contention // handling code. if (FastInterlockCompareExchange((LONG*)&m_MonitorHeld, (state + 2), state) == state) { goto MustWait; } } } // We get here if we successfully acquired the mutex. m_HoldingThread = pCurThread; m_Recursion = 1; pCurThread->IncLockCount(); #if defined(_DEBUG) && defined(TRACK_SYNC) { // The best place to grab this is from the ECall frame Frame *pFrame = pCurThread->GetFrame(); int caller = (pFrame && pFrame != FRAME_TOP ? (int) pFrame->GetReturnAddress() : -1); pCurThread->m_pTrackSync->EnterSync(caller, this); } #endif return; MustWait: // Didn't manage to get the mutex, must wait. EnterEpilog(pCurThread); return; Recursion: // Got the mutex via recursive locking on the same thread. _ASSERTE(m_Recursion >= 1); m_Recursion++; #if defined(_DEBUG) && defined(TRACK_SYNC) // The best place to grab this is from the ECall frame Frame *pFrame = pCurThread->GetFrame(); int caller = (pFrame && pFrame != FRAME_TOP ? (int) pFrame->GetReturnAddress() : -1); pCurThread->m_pTrackSync->EnterSync(caller, this); #endif }
映入眼帘的先是一个无限循环For(;;),循环里有个函数FastInterlockCompareExchange,好像那个InterlockedCompareExchange啊,于是我们有足够的理由怀疑此处实现了一个自旋,对嘛????再来看代码,先假设InterlockedCompareExchange的功能和InterlockedCompareExchange是一样的(否则就必然非自旋了),再分析这个循环体。我们看到这里的InterlockedCompareExchange只能防止多个线程同时操作一个变量MonitorHeld而已,但是并没有做到spin,因为在每一次循环开头有一句volatile LONG state = m_MonitorHeld; 假定m_MonitorHeld的初始值是0,那麽在第一个线程通过if (FastInterlockCompareExchange((LONG*)&m_MonitorHeld, 1, 0) == 0) { break; } 跳出循环体获得锁之后,第二个线程即使在同时间执行了同一个if判断也会返回False并进入下一次循环,而这第二个线程的下一次循环自然会执行到if (FastInterlockCompareExchange((LONG*)&m_MonitorHeld, (state + 2), state) == state) { goto MustWait; } 而此时if判断为true,于是goto MustWait,而MustWait紧接就进入了EnterEpilog,紧接着就WaitXXXX,所以意味第二个线程或曰等锁的线程并没有spin而直接进入WaitXXXX内核态等待。
那究竟是谁欺骗了我们的感情,说lock和CtriticalSection很像的?是.Net还是微软还是网友?别着急,故事还远未结束呢,是不是真的被骗了还需要进一步考察,我们再来看看lock的实现。
在VS2010里创建.Net2.0的ConsoleApp,写下如下代码并编译:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
namespace ConsoleApplication3
{
class Program
{
static object o = new object();
static void Main(string[] args)
{
lock (o)
{
Console.WriteLine("12345");
}
}
}
}
使用IL Disassembler打开编译出来的exe,看到如下内容:
.method private hidebysig static void Main(string[] args) cil managed
{
.entrypoint
// Code size 39 (0x27)
.maxstack 2
.locals init ([0] object CS$2$0000)
IL_0000: nop
IL_0001: ldsfld object ConsoleApplication3.Program::o
IL_0006: dup
IL_0007: stloc.0
IL_0008: call void [mscorlib]System.Threading.Monitor::Enter(object)
IL_000d: nop
.try
{
IL_000e: nop
IL_000f: ldstr "12345"
IL_0014: call void [mscorlib]System.Console::WriteLine(string)
IL_0019: nop
IL_001a: nop
IL_001b: leave.s IL_0025
} // end .try
finally
{
IL_001d: ldloc.0
IL_001e: call void [mscorlib]System.Threading.Monitor::Exit(object)
IL_0023: nop
IL_0024: endfinally
} // end handler
IL_0025: nop
IL_0026: ret
} // end of method Program::Main
现在我们基本可以确定在.Net 2.0中实现的lock使用了Monitor.Enter(object)这个函数,而这个函数通过之前的分析已经知道是没有实现自旋的。这是结论一。
而与此同时,我通过reflector看到Monitor类有一个奇怪的方法,这个方法名叫ReliableEnter,原型是:
[MethodImpl(MethodImplOptions.InternalCall)] internal static extern void ReliableEnter(object obj, ref bool tookLock);
听起来这个方法应该比那个Enter更reliable一些嘛,但是令人诧异的是,这个方法并没有公开,是internal的,也即是在我们代码中通过正常的调用是看不到的,而且更更奇怪的是,这个方法在类的内部竟然也没有被调用。但是通过relector我们发现这个internal的方法被其它.Net framework的类所调用,当然前提是这些类必须和Monitor在同一个命名空间下,这些类基本上也都是一些internal类,只会被.Net framework内部使用,不是为了提供给开发者的,如下所示:
为啥提供一个internal的ReliableEnter且只供内部使用呢?真让人好生纳闷,再来看看ReliableEnter的用法,也和Enter是一致的,随便拿一个调用了ReliableEnter的地方反射一下看看代码:
private static unsafe void* InitializeCompareInfo(void* pNativeGlobalizationAssembly, int sortingLCID)
{
void* voidPtr = null;
bool tookLock = false;
RuntimeHelpers.PrepareConstrainedRegions();
try
{
Monitor.ReliableEnter(typeof(CultureTableRecord), ref tookLock);
voidPtr = InitializeNativeCompareInfo(pNativeGlobalizationAssembly, sortingLCID);
}
finally
{
if (tookLock)
{
Monitor.Exit(typeof(CultureTableRecord));
}
}
return voidPtr;
}
和Monitor.Enter用法是一致的,也即意味着这两个方法的实现应该大差不差,那麽我们很有必要看看ReliableEnter的底层实现了。对应的底层实现在ComObject.cpp文件中,如下所示:
1: FCIMPL2(void, JIT_MonReliableEnter, Object* pThisUNSAFE, CLR_BOOL *tookLock)
2: {
3: CONTRACTL
4: {
5: MODE_COOPERATIVE;
6: DISABLED(GC_TRIGGERS); // can't use this in an FCALL because we're in forbid gc mode until we setup a H_M_F.
7: THROWS;
8: SO_TOLERANT;
9: }
10: CONTRACTL_END;
11:
12: OBJECTREF obj = (OBJECTREF) pThisUNSAFE;
13: HELPER_METHOD_FRAME_BEGIN_1(obj);
14: //-[autocvtpro]-------------------------------------------------------
15:
16: if (obj == NULL)
17: COMPlusThrow(kNullReferenceException, L"NullReference_This");
18:
19: GCPROTECT_BEGININTERIOR(tookLock);
20:
21: class AwareLock *awareLock = NULL;
22: SyncBlock* syncBlock = NULL;
23: ObjHeader* objHeader = NULL;
24: int spincount = 50;
25: const int MaxSpinCount = 20000 * g_SystemInfo.dwNumberOfProcessors;
26: LONG oldvalue, state;
27: DWORD tid;
28:
29: Thread *pThread = GetThread();
30:
31: tid = pThread->GetThreadId();
32:
33: if (tid > SBLK_MASK_LOCK_THREADID)
34: {
35: goto FramedLockHelper;
36: }
37:
38: objHeader = obj->GetHeader();
39:
40: while (true)
41: {
42: oldvalue = objHeader->m_SyncBlockValue;
43:
44: if ((oldvalue & (BIT_SBLK_IS_HASH_OR_SYNCBLKINDEX +
45: BIT_SBLK_SPIN_LOCK +
46: SBLK_MASK_LOCK_THREADID +
47: SBLK_MASK_LOCK_RECLEVEL)) == 0)
48: {
49:
50: LONG newvalue = oldvalue | tid;
51: if (FastInterlockCompareExchangeAcquire((LONG*)&(objHeader->m_SyncBlockValue), newvalue, oldvalue) == oldvalue)
52: {
53: pThread->IncLockCount();
54: goto UpdateLockState;
55: }
56: continue;
57: }
58:
59: if (oldvalue & BIT_SBLK_IS_HASH_OR_SYNCBLKINDEX)
60: {
61: goto HaveHashOrSyncBlockIndex;
62: }
63:
64: if (oldvalue & BIT_SBLK_SPIN_LOCK)
65: {
66: if (1 == g_SystemInfo.dwNumberOfProcessors)
67: {
68: goto FramedLockHelper;
69: }
70: }
71: else if (tid == (DWORD) (oldvalue & SBLK_MASK_LOCK_THREADID))
72: {
73: LONG newvalue = oldvalue + SBLK_LOCK_RECLEVEL_INC;
74:
75: if ((newvalue & SBLK_MASK_LOCK_RECLEVEL) == 0)
76: {
77: goto FramedLockHelper;
78: }
79:
80: if (FastInterlockCompareExchangeAcquire((LONG*)&(objHeader->m_SyncBlockValue), newvalue, oldvalue) == oldvalue)
81: {
82: goto UpdateLockState;
83: }
84: }
85:
86: // exponential backoff
87: for (int i = 0; i < spincount; i++)
88: {
89: YieldProcessor();
90: }
91: if (spincount > MaxSpinCount)
92: {
93: goto FramedLockHelper;
94: }
95: spincount *= 3;
96: } /* while(true) */
97:
98: HaveHashOrSyncBlockIndex:
99: if (oldvalue & BIT_SBLK_IS_HASHCODE)
100: {
101: goto FramedLockHelper;;
102: }
103:
104: syncBlock = obj->PassiveGetSyncBlock();
105: if (NULL == syncBlock)
106: {
107: goto FramedLockHelper;;
108: }
109:
110: awareLock = syncBlock->QuickGetMonitor();
111: state = awareLock->m_MonitorHeld;
112: if (state == 0)
113: {
114: if (FastInterlockCompareExchangeAcquire((LONG*)&(awareLock->m_MonitorHeld), 1, 0) == 0)
115: {
116: syncBlock->SetAwareLock(pThread,1);
117: pThread->IncLockCount();
118: goto UpdateLockState;
119: }
120: else
121: {
122: goto FramedLockHelper;;
123: }
124: }
125: else if (awareLock->GetOwningThread() == pThread) /* monitor is held, but it could be a recursive case */
126: {
127: awareLock->m_Recursion++;
128: goto UpdateLockState;
129: }
130: FramedLockHelper:
131: obj->EnterObjMonitor();
132:
133: UpdateLockState:
134: tookLock != NULL ? *tookLock = true : false;
135:
136: GCPROTECT_END();
137: //-[autocvtepi]-------------------------------------------------------
138: HELPER_METHOD_FRAME_END();
139: }
140: FCIMPLEND
我们惊奇地发现ReliableEnter竟然发现了spin,自旋!自旋!虽然有点遗憾,自旋的次数不能指定,我们看到这里的最大自旋次数是处理器数量的20000倍,25: 25行:const int MaxSpinCount = 20000 * g_SystemInfo.dwNumberOfProcessors;
第40行进入while循环体之后,在42行将对象头的同步索引块值赋给oldValue,而如果之前没有线程锁住该对象,则m_SyncBlockValue应该为0,so线程第一次ReliableEnter应该进入44的if block,通过Interlock函数将newValue赋给m_SyncBlockValue,而此时的newValue应该是线程的Id。然后执行到54行goto UpdateLockStatus标签,此时意味成功acquire the lock,在标签内134行看到成功将toolLock设为false,这个参数其实对应着ReliableEnter的第二个ref类型的参数。如果假设此时就那麽巧,有另外一个线程也同时执行到51的判断语句,由于interlock函数的原子性使得if判断返回false紧接着便会执行到56的continue进入下一次循环。而下一次循环会在59行处往下执行,在第64行if (oldvalue & BIT_SBLK_SPIN_LOCK)将oldValue与BIT_SBLK_SPIN_LOCK进行与运算,此时的oldValue在前面已赋值为线程Id,Id一般不为零,而BIT_SBLK_SPIN_LOCK的值又为1023即二进制11111111所以此处的与元算总是为true,也即总会执行进if块,进入if块在66行又会判断处理器数量如果为1跳转到FramedLockHelper,如果处理器不止一个,则直接执行到87~95行,而此处恰恰实现了一个spin且有一个次数上限的判断。而处理为1的情况呢,我们看到FramedLockHelper标签处直接调用了EnterObjMonitor,这个方法最终会调用到之前的那个会WaitXXXX的AwareLock::Enter,也即只有多处理器的情况下,才会自旋,这又是一个结论。
现在问题来了,既然ReliableEnter实现了自旋,为什么要藏着掖着呢?我觉得很有可能这在当时推出.Net2.0之际是一个没有经过充分测试的方法,但是又急着发布,所以没有public在情理之中,这样等到下一个.Net版本再public出来。事实是如何的,我们把前面的那段反编译的lock示例代码在.Net4.0中重新编译再用IL Disassembler打开,看到如下输出,这时候,我们又得出一个结论,在.Net4.0,lock使用了升级版的Enter(object,bool&),这个Enter有两个参数,我们在通过Reflector看看Enter(object,bool&),我们看到Enter(object,bool&)的实现就是调用了ReliableEnter.
所以我们再得出一个结论,在.Net4.0中的lock才能媲美CriticalSection,这时候不管你称呼他们为胞弟还是慕名模仿者都行,都很达意嘛。
.method private hidebysig static void Main(string[] args) cil managed { .entrypoint // Code size 51 (0x33) .maxstack 2 .locals init ([0] bool '<>s__LockTaken0', [1] object CS$2$0000, [2] bool CS$4$0001) IL_0000: nop IL_0001: ldc.i4.0 IL_0002: stloc.0 .try { IL_0003: ldsfld object ConsoleApplication3.Program::o IL_0008: dup IL_0009: stloc.1 IL_000a: ldloca.s '<>s__LockTaken0' IL_000c: call void [mscorlib]System.Threading.Monitor::Enter(object, bool&) IL_0011: nop IL_0012: nop IL_0013: ldstr "12345" IL_0018: call void [mscorlib]System.Console::WriteLine(string) IL_001d: nop IL_001e: nop IL_001f: leave.s IL_0031 } // end .try finally { IL_0021: ldloc.0 IL_0022: ldc.i4.0 IL_0023: ceq IL_0025: stloc.2 IL_0026: ldloc.2 IL_0027: brtrue.s IL_0030 IL_0029: ldloc.1 IL_002a: call void [mscorlib]System.Threading.Monitor::Exit(object) IL_002f: nop IL_0030: endfinally } // end handler IL_0031: nop IL_0032: ret } // end of method Program::Main
关于InterlockedCompareExchange的另外一个link:
http://stackoverflow.com/questions/3338661/net-equivalent-of-the-x86-asm-command-xadd
有兴趣可参考。