Condition Variables
Condition variables are synchronization primitives that enable threads to wait until a particular condition occurs.
Condition variables are user-mode objects that cannot be shared across processes.
Condition variables enable threads to atomically release a lock and enter the sleeping state.
They can be used with critical sections or slim reader/writer (SRW) locks.
Condition variables support operations that "wake one" or "wake all" waiting threads.
After a thread is woken, it re-acquires the lock it released when the thread entered the sleeping state.
条件变量是能够在特殊条件满足前使线程处于等待状态的同步原语.
条件变量是不能被跨进程共享的用户模式下的同步对象.
条件变量能够使线程原子性释放一个锁同一时候进入sleep 状态. 条件变量和Critical Section Object一起使用.
条件变量支持wake one 或者wake all 等待的线程.
Windows Server 2003 and Windows XP: Condition variables are not supported.
It is often convenient to use more than one condition variable with the same lock.
For example, an implementation of a reader/writer lock might use a single critical section but separate condition variables for readers and writers.
在同一个锁上面使用条件变量是很实用的.
比方:对于同一个临界区通过把读操作和写操作通过条件变量来分离能够实现读锁,写锁以及读写锁.
The following code implements a producer/consumer queue.
The queue is represented as a bounded circular buffer, and is protected by a critical section.
The code uses two condition variables: one used by producers (BufferNotFull) and one used by consumers (BufferNotEmpty).
The code calls the InitializeConditionVariable function to create the condition variables.
The consumer threads call the SleepConditionVariableCS function to wait for items to be added to the queue and
the WakeConditionVariable function to signal the producer that it is ready for more items.
The producer threads call SleepConditionVariableCS to wait for the consumer to remove items from the queue and
WakeConditionVariable to signal the consumer that there are more items in the queue.
实现一个生产者/消费者队列.
队列是一个被Critical Section Object 保护的有界限圆形BUFFER.
通过调用InitializeConditionVariable()函数去创建一个条件变量.
消费者调用SleepConditionVariableCS()函数等待有物品被增加到队列中,通过WakeConditionVariable()函数通知生产者生产很多其它的物品.
生产者调用SleepConditionVariableCS()函数等待消费者把物品从队列中移除,通过WakeConditionVariable()函数来通知消费者去消费很多其它的物品.
測试代码:UsingConditionVariables.cpp
#include <windows.h> #include <stdlib.h> #include <stdio.h> #define BUFFER_SIZE 10 #define PRODUCER_SLEEP_TIME_MS 500 #define CONSUMER_SLEEP_TIME_MS 2000 LONG Buffer[BUFFER_SIZE]; LONG LastItemProduced; ULONG QueueSize; ULONG QueueStartOffset; ULONG TotalItemsProduced; ULONG TotalItemsConsumed; CONDITION_VARIABLE BufferNotEmpty; CONDITION_VARIABLE BufferNotFull; CRITICAL_SECTION BufferLock; BOOL StopRequested; DWORD WINAPI ProducerThreadProc (PVOID p) { ULONG ProducerId = (ULONG)(ULONG_PTR)p; while (true) { // Produce a new item. Sleep (rand() % PRODUCER_SLEEP_TIME_MS); //原子锁 ULONG Item = InterlockedIncrement (&LastItemProduced); //进入临界区,其它线程不能訪问下面被保护的资源 EnterCriticalSection (&BufferLock); //仅仅有当有界缓冲区满了之后,才通知消费者来消费资源,否则就一直生产物品 while (QueueSize == BUFFER_SIZE && StopRequested == FALSE) { // Buffer is full - sleep so consumers can get items. SleepConditionVariableCS (&BufferNotFull, &BufferLock, INFINITE); } if (StopRequested == TRUE) { LeaveCriticalSection (&BufferLock); break; } // Insert the item at the end of the queue and increment size. Buffer[(QueueStartOffset + QueueSize) % BUFFER_SIZE] = Item; QueueSize++; TotalItemsProduced++; printf ("Producer %u: item %2d, queue size %2u ", ProducerId, Item, QueueSize); //离开临界区,其它线程可訪问该临界区 LeaveCriticalSection (&BufferLock); // If a consumer is waiting, wake it. WakeConditionVariable (&BufferNotEmpty); } printf ("Producer %u exiting ", ProducerId); return 0; } //消费者线程 DWORD WINAPI ConsumerThreadProc (PVOID p) { //消费数量 ULONG ConsumerId = (ULONG)(ULONG_PTR)p; while (true) { //临界区,当一个线程在获取临界区权利时,其它线程都要等待. EnterCriticalSection (&BufferLock); //当前缓存区为零 while (QueueSize == 0 && StopRequested == FALSE) { // Buffer is empty - sleep so producers can create items. //通知生产者进行生产物品.当生产者完毕生产后,则通知消费者来消费 SleepConditionVariableCS (&BufferNotEmpty, &BufferLock, INFINITE); } if (StopRequested == TRUE && QueueSize == 0) { LeaveCriticalSection (&BufferLock); break; } // Consume the first available item. LONG Item = Buffer[QueueStartOffset]; QueueSize--; QueueStartOffset++; TotalItemsConsumed++; if (QueueStartOffset == BUFFER_SIZE) { QueueStartOffset = 0; } printf ("Consumer %u: item %2d, queue size %2u ", ConsumerId, Item, QueueSize); LeaveCriticalSection (&BufferLock); // If a producer is waiting, wake it. WakeConditionVariable (&BufferNotFull); // Simulate processing of the item. Sleep (rand() % CONSUMER_SLEEP_TIME_MS); } printf ("Consumer %u exiting ", ConsumerId); return 0; } int main ( void ) { InitializeConditionVariable (&BufferNotEmpty); InitializeConditionVariable (&BufferNotFull); InitializeCriticalSection (&BufferLock); DWORD id; HANDLE hProducer1 = CreateThread (NULL, 0, ProducerThreadProc, (PVOID)1, 0, &id); HANDLE hConsumer1 = CreateThread (NULL, 0, ConsumerThreadProc, (PVOID)1, 0, &id); HANDLE hConsumer2 = CreateThread (NULL, 0, ConsumerThreadProc, (PVOID)2, 0, &id); puts ("Press enter to stop..."); getchar(); EnterCriticalSection (&BufferLock); StopRequested = TRUE; LeaveCriticalSection (&BufferLock); WakeAllConditionVariable (&BufferNotFull); WakeAllConditionVariable (&BufferNotEmpty); WaitForSingleObject (hProducer1, INFINITE); WaitForSingleObject (hConsumer1, INFINITE); WaitForSingleObject (hConsumer2, INFINITE); printf ("TotalItemsProduced: %u, TotalItemsConsumed: %u ", TotalItemsProduced, TotalItemsConsumed); return 0; }