生产者消费者问题是一个著名的线程同步问题,该问题描述如下:
有一个生产者在生产产品,这些产品将提供给若干个消费者去消费,为了使生产者和消费者能并发执行,在两者之间设置一个具有多个缓冲区的缓冲池,生产者将它生产的产品放入一个缓冲区中,消费者可以从缓冲区中取走产品进行消费,显然生产者和消费者之间必须保持同步,即不允许消费者到一个空的缓冲区中取产品,也不允许生产者向一个已经放入产品的缓冲区中再次投放产品。
首先来简化问题,先假设生产者和消费者都只有一个,且缓冲区也只有一个。这样情况就简便多了。
1. 从缓冲区取出产品和向缓冲区投放产品必须是互斥进行的。可以用关键段和互斥量来完成。 2. 生产者要等待缓冲区为空,这样才可以投放产品,消费者要等待缓冲区不为空,这样才可以取出产品进行消费。 并且由于有两个等待过程,所以要用两个事件或信号量来控制。
#include <stdio.h> #include <process.h> #include <windows.h> CRITICAL_SECTION g_csThreadCode; HANDLE g_EventBuffFull; HANDLE g_EventBuffEmpty; int g_buffer; const int THREAD_NUM = 2; const int product_num = 100; unsigned int __stdcall producer_thread(void *param) { for (int i = 0; i < product_num; i++) { WaitForSingleObject(g_EventBuffEmpty, INFINITE); EnterCriticalSection(&g_csThreadCode); g_buffer = i; printf("生产者在缓冲区中放置一个产品,编号%d ", g_buffer); LeaveCriticalSection(&g_csThreadCode); SetEvent(g_EventBuffFull); } return 0; } unsigned int __stdcall consumer_thread(void *param) { for (int i = 0; i < product_num; i++) { WaitForSingleObject(g_EventBuffFull, INFINITE); EnterCriticalSection(&g_csThreadCode); printf(" 消费者取走%d产品 ", g_buffer); g_buffer = 0; LeaveCriticalSection(&g_csThreadCode); SetEvent(g_EventBuffEmpty); } Sleep(10); return 0; } int main() { InitializeCriticalSection(&g_csThreadCode); g_EventBuffEmpty = CreateEvent(NULL, false, true, NULL); g_EventBuffFull = CreateEvent(NULL, false, false, NULL); HANDLE handle[THREAD_NUM]; handle[0] = (HANDLE)_beginthreadex(NULL, 0, producer_thread, NULL, 0, NULL); handle[1] = (HANDLE)_beginthreadex(NULL, 0, consumer_thread, NULL, 0, NULL); WaitForMultipleObjects(THREAD_NUM, handle, true, INFINITE); //销毁同步资源 DeleteCriticalSection(&g_csThreadCode); CloseHandle(handle[0]); CloseHandle(handle[1]); CloseHandle(g_EventBuffFull); CloseHandle(g_EventBuffEmpty); return 0; }
代码执行结果:
可以看出生产者与消费者已经是有序的工作了。
先给出伪代码:
semaphore mutex=1; //临界区互斥信号量 semaphore empty=n; //空闲缓冲区 semaphore full=0; //缓冲区初始化为空 producer () { //生产者进程 while(1){ produce an item in nextp; //生产数据 P(empty); //获取空缓冲区单元 P(mutex); //进入临界区. add nextp to buffer; //将数据放入缓冲区 V(mutex); //离开临界区,释放互斥信号量 V(full); //满缓冲区数加1 } } consumer () { //消费者进程 while(1){ P(full); //获取满缓冲区单元 P(mutex); // 进入临界区 remove an item from buffer; //从缓冲区中取出数据 V (mutex); //离开临界区,释放互斥信号量 V (empty) ; //空缓冲区数加1 consume the item; //消费数据 } }
然后再对这个简单生产者消费者问题加大难度。将消费者改成2个,缓冲池改成拥有4个缓冲区的大缓冲池。
代码实现:
#include <stdio.h> #include <process.h> #include <windows.h> const int THREAD_NUM = 3; const int BUFFER_SIZE = 4; int g_buffer[BUFFER_SIZE]; int g_i; int g_j; const int product_num = 10; //所有产品都被取走,消费者线程结束标志 bool g_isOver; HANDLE g_hSemaphoreFull; HANDLE g_hSemaphoreEmpty; CRITICAL_SECTION g_csThreadCode; unsigned int __stdcall producer_thread(void *param) { for (int i = 0; i < product_num; i++) { WaitForSingleObject(g_hSemaphoreEmpty, INFINITE); EnterCriticalSection(&g_csThreadCode); g_buffer[g_i] = i; printf("%d位置上放置产品%d ", g_i, g_buffer[g_i]); g_i = (g_i + 1) % BUFFER_SIZE; LeaveCriticalSection(&g_csThreadCode); ReleaseSemaphore(g_hSemaphoreFull, 1, NULL); } printf("生产者任务完成,线程结束 "); return 0; } unsigned int __stdcall consumer_thread(void *param) { while (true) { WaitForSingleObject(g_hSemaphoreFull, INFINITE); if (g_isOver) break; EnterCriticalSection(&g_csThreadCode); printf(" 线程%d在%d位置取走产品%d ", GetCurrentThreadId(), g_j, g_buffer[g_j]); //所有的产品都被取走,这时所有的消费者线程应该结束 if (g_buffer[g_j] == product_num - 1) { g_isOver = true; LeaveCriticalSection(&g_csThreadCode); ReleaseSemaphore(g_hSemaphoreFull, 1, NULL); printf(" 所有的产品都已经被消费者取走,消费者线程结束 "); break; } g_j = (g_j + 1) % BUFFER_SIZE; LeaveCriticalSection(&g_csThreadCode); Sleep(50); ReleaseSemaphore(g_hSemaphoreEmpty, 1, NULL); }//while //printf("生产者任务完成,线程结束 "); return 0; } int main() { //初始化变量和内核对象 InitializeCriticalSection(&g_csThreadCode); g_hSemaphoreEmpty = CreateSemaphore(NULL, 4, 4, NULL); g_hSemaphoreFull = CreateSemaphore(NULL, 0, 4, NULL); g_i = 0; g_j = 0; memset(g_buffer, 0, sizeof(g_buffer)); HANDLE handle[THREAD_NUM]; handle[1] = (HANDLE)_beginthreadex(NULL, 0, consumer_thread, NULL, 0, NULL); handle[2] = (HANDLE)_beginthreadex(NULL, 0, consumer_thread, NULL, 0, NULL); handle[0] = (HANDLE)_beginthreadex(NULL, 0, producer_thread, NULL, 0, NULL); WaitForMultipleObjects(THREAD_NUM, handle, true, INFINITE); //释放资源 for (int i = 0; i < THREAD_NUM; i++) { CloseHandle(handle[i]); } DeleteCriticalSection(&g_csThreadCode); CloseHandle(g_hSemaphoreEmpty); CloseHandle(g_hSemaphoreFull); return 0; }
执行结果:
输出结果证明各线程的同步和互斥已经完成了。
至此,生产者消费者问题已经圆满的解决了,下面作个总结:
1. 首先要考虑生产者与消费者对缓冲区操作时的互斥。
2. 不管生产者与消费者有多少个,缓冲池有多少个缓冲区。都只有二个同步过程——分别是生产者要等待有空缓冲区才能投放产品,消费者要等待有非空缓冲区才能去取产品。