代码如下:
namespace ProduceConsume
{
class Program
{
static void Main(string[] args)
{
int result = 0;
Product product = new Product();
Producer producer = new Producer { Product = product, Looper = 10 };
Consumer consumer = new Consumer { Product = product, Looper = 10 };
Thread threadP = new Thread(new ThreadStart(producer.Process));
Thread.Sleep(500);
Thread threadC1 = new Thread(new ThreadStart(consumer.Process));
Thread.Sleep(500);
Thread threadC2 = new Thread(new ThreadStart(consumer.Process));
try
{
threadP.Start();
threadC1.Start();
threadC2.Start();
threadP.Join();
threadC1.Join();
threadC2.Join();
Console.ReadLine();
}
catch (ThreadStateException ex)
{
Console.WriteLine("ThreadStateException: " + ex.Message);
result = 1;
}
catch (ThreadInterruptedException ex)
{
Console.WriteLine("ThreadInterruptedException: " + ex.Message);
result = 1;
}
Environment.ExitCode = result;
}
}
class Product
{
bool consumeFlag = false;
Queue<int> queue = new Queue<int>();
public void Consume()
{
Monitor.Enter(this);
if (!consumeFlag)
{
try
{
Monitor.Wait(this);
}
catch (Exception ex)
{
Console.WriteLine("Exception: " + ex.Message);
}
}
if (queue.Count > 0)
{
Console.WriteLine("Consume --> Thread ID: {0}, Product Index: {1}, Product Left: {2}", Thread.CurrentThread.ManagedThreadId, queue.Dequeue(), queue.Count);
consumeFlag = queue.Count > 0;
}
Monitor.Pulse(this);
Monitor.Exit(this);
}
public void Produce()
{
Monitor.Enter(this);
if (consumeFlag)
{
try
{
Monitor.Wait(this);
}
catch (Exception ex)
{
Console.WriteLine("Exception: " + ex.Message);
}
}
if (queue.Count == 0)
{
Random random = new Random();
int count = random.Next(10);
for (int i = 1; i <= count; i++)
{
queue.Enqueue(i);
}
Console.Write("Produce --> ");
Console.WriteLine(count == 1 ? "{0} product is enqueued." : "{0} products are enqueued.", count);
}
consumeFlag = true;
Monitor.Pulse(this);
Monitor.Exit(this);
}
}
class Producer
{
public Product Product { get; set; }
public int Looper { get; set; }
public void Process()
{
for (int i = 0; i < Looper; i++)
{
Product.Produce();
}
}
}
class Consumer
{
public Product Product { get; set; }
public int Looper { get; set; }
public void Process()
{
for (int i = 0; i < Looper; i++)
{
Product.Consume();
}
}
}
}
{
class Program
{
static void Main(string[] args)
{
int result = 0;
Product product = new Product();
Producer producer = new Producer { Product = product, Looper = 10 };
Consumer consumer = new Consumer { Product = product, Looper = 10 };
Thread threadP = new Thread(new ThreadStart(producer.Process));
Thread.Sleep(500);
Thread threadC1 = new Thread(new ThreadStart(consumer.Process));
Thread.Sleep(500);
Thread threadC2 = new Thread(new ThreadStart(consumer.Process));
try
{
threadP.Start();
threadC1.Start();
threadC2.Start();
threadP.Join();
threadC1.Join();
threadC2.Join();
Console.ReadLine();
}
catch (ThreadStateException ex)
{
Console.WriteLine("ThreadStateException: " + ex.Message);
result = 1;
}
catch (ThreadInterruptedException ex)
{
Console.WriteLine("ThreadInterruptedException: " + ex.Message);
result = 1;
}
Environment.ExitCode = result;
}
}
class Product
{
bool consumeFlag = false;
Queue<int> queue = new Queue<int>();
public void Consume()
{
Monitor.Enter(this);
if (!consumeFlag)
{
try
{
Monitor.Wait(this);
}
catch (Exception ex)
{
Console.WriteLine("Exception: " + ex.Message);
}
}
if (queue.Count > 0)
{
Console.WriteLine("Consume --> Thread ID: {0}, Product Index: {1}, Product Left: {2}", Thread.CurrentThread.ManagedThreadId, queue.Dequeue(), queue.Count);
consumeFlag = queue.Count > 0;
}
Monitor.Pulse(this);
Monitor.Exit(this);
}
public void Produce()
{
Monitor.Enter(this);
if (consumeFlag)
{
try
{
Monitor.Wait(this);
}
catch (Exception ex)
{
Console.WriteLine("Exception: " + ex.Message);
}
}
if (queue.Count == 0)
{
Random random = new Random();
int count = random.Next(10);
for (int i = 1; i <= count; i++)
{
queue.Enqueue(i);
}
Console.Write("Produce --> ");
Console.WriteLine(count == 1 ? "{0} product is enqueued." : "{0} products are enqueued.", count);
}
consumeFlag = true;
Monitor.Pulse(this);
Monitor.Exit(this);
}
}
class Producer
{
public Product Product { get; set; }
public int Looper { get; set; }
public void Process()
{
for (int i = 0; i < Looper; i++)
{
Product.Produce();
}
}
}
class Consumer
{
public Product Product { get; set; }
public int Looper { get; set; }
public void Process()
{
for (int i = 0; i < Looper; i++)
{
Product.Consume();
}
}
}
}
程序所做的事情是:
1. 生产者生产random.Next(10)个数放入队列中;
2. 消费者(threadC1, threadC2 两个线程,两个消费者)在队列不为空的情况下,做Dequeue()操作,直到队列为空;
3. 重复执行1步骤10次
开始没注意到代码中标注颜色区块的两个Looper次数,所以对下图中的结果很不理解,以为哪个地方死锁了

发现这个失误之后,将消费者的Looper设置为10*max(random.Next(10))=100,得到下图中的结果:

可以看出,每次生产的产品都可以被消费完(queue.Count==0),但是生产的次数没有达到设定的次数(10次)
仔细想过之后,才发现这跟Monitor.Pulse(obj)方法的作用有关。
Monitor背后管理着两个队列,这两个队列的元素都是线程,一个被称作就绪队列(ready),一个被称作等待队列(waiting), ready队列中存放的是准备获取锁的线程
ready队列中保存的是准备获取锁的线程。就是说,如果某个线程(记作线程A)执行了Monitor.Wait(),那么ready队列中队头的线程就会获得锁,开始运行(如果ready队列中没有线程,那么就没有这个效果);同时线程A自动进入waiting队列中的队尾了。
waiting队列中保存的是正在等待锁定对象状态变化的通知的线程。就是说,如果某个线程执行了Monitor.Pulse(),那么waiting队列中队头的线程就进入ready队列中。
在此程序中,当消费者消费完一个产品后,如果队列中还有产品,在调用Monitor.Pulse(this)的时候,对头的线程不一定是消费者线程,如果是生产者的线程,这就会花去一次loop,所以没有执行够10次生产是很正常的。