前置知识
此处以Java描述该问题,需要Java并发的知识,这里附上一些不错的教程:
Youtube上的一个教程,非常实战,能快速找到感觉。
Oracle的并发官方教程,很全面,但不深入。
国人的Java并发系列教程,很全面,但有地方深入到源码,有的浅停在使用方法。
免费的翻译书,没有看,据说不错。
问题描述
生产者-消费者(Producer-Consumer Problem)以下简称为PC问题。其描述以下问题:
- 多个生产者生产产品,多个消费者消费产品,两者间有一个大小固定的缓冲区;
- 生产者、消费者不可同时访问缓冲区;
- 生产者不可向满缓冲区放产品;
- 消费者不可从空缓冲区取产品。
信号量解法
public class PCSemaphore {
private final static int BUFFER_SIZE = 10;
public static void main(String[] args) {
Semaphore mutex = new Semaphore(1);
Semaphore full = new Semaphore(0);
Semaphore empty = new Semaphore(BUFFER_SIZE);
Queue<Integer> buffer = new LinkedList<>();
Producer producer = new Producer(mutex, empty, full, buffer);
Consumer consumer = new Consumer(mutex, empty, full, buffer);
// 可以初始化多个生产者、消费者
new Thread(producer, "p1").start();
new Thread(producer, "p2").start();
new Thread(consumer, "c1").start();
new Thread(consumer, "c2").start();
new Thread(consumer, "c3").start();
}
}
class Producer implements Runnable {
private Semaphore mutex, empty, full;
private Queue<Integer> buffer;
private Integer counter = 0;
public Producer(Semaphore mutex, Semaphore empty, Semaphore full, Queue<Integer> buffer) {
this.mutex = mutex;
this.empty = empty;
this.full = full;
this.buffer = buffer;
}
@Override
public void run() {
while (true) {
try {
empty.acquire();
mutex.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
buffer.offer(counter++);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
full.release();
mutex.release();
}
}
}
class Consumer implements Runnable {
private Semaphore mutex, empty, full;
private Queue<Integer> buffer;
public Consumer(Semaphore mutex, Semaphore empty, Semaphore full, Queue<Integer> buffer) {
this.mutex = mutex;
this.empty = empty;
this.full = full;
this.buffer = buffer;
}
@Override
public void run() {
String threadName = Thread.currentThread().getName();
while (true) {
try {
full.acquire();
mutex.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
Integer product = buffer.poll();
int left = buffer.size();
System.out.printf("%s consumed %d left %d%n", threadName, product, left);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
empty.release();
mutex.release();
}
}
}
教科书的解法,问题有三:
-
不可交换消费者中的full和mutex信号量的请求顺序,若我们交换他们则会有:
同样地不可交换生产者中的empty和mutex信号量的请求顺序。
即必须遵守先资源信号量,再互斥信号量的请求顺序。
教科书中使用AND型信号量解决该问题,但这对程序员来说算是一个Dirty Solution,为什么呢,见问题2。 -
将进程控制部分和业务逻辑放在一起,这种代码看着混乱不堪。
-
效率低,Java中是不会用信号量来实现这东西,信号量在Java中常用来限制对一个共享资源的最大并发访问数。
注意,因为这里写的写的是一个小示例,所以我们没有考虑产品的生产、使用和入队、出队(放入、拿出缓存区的过程)。但实际中我们只需要将产品的入队和出队进行互斥即可,产品的生产和使用可并发执行,甚至在每个生产者、消费者内部可建立单独的缓冲区,暂存生产出来但还不能放到公共缓冲区的产品,直到可以放入公共缓冲区。
wait() & notify()
public class PCWaitNotify {
public final static int BUFFER_SIZE = 10;
public static void main(String[] args) {
Object mutex = new Object();
AtomicInteger counter = new AtomicInteger(0);
Queue<Integer> buffer = new LinkedList<>();
Producer producer = new Producer(buffer, counter, mutex);
Consumer consumer = new Consumer(buffer, mutex);
new Thread(producer).start();
new Thread(producer).start();
new Thread(consumer).start();
}
}
class Producer implements Runnable {
private Random rand = new Random();
private Queue<Integer> buffer;
private AtomicInteger counter; // 支持原子操作的基本类型包装类
private Object mutex;
public Producer(Queue<Integer> buffer, AtomicInteger counter, Object mutex) {
this.buffer = buffer;
this.counter = counter;
this.mutex = mutex;
}
@Override
public void run() {
while (true) {
synchronized (mutex) {
while (buffer.size() == PCWaitNotify.BUFFER_SIZE) {
try {
mutex.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
buffer.offer(counter.incrementAndGet());
mutex.notify();
}
try {
Thread.sleep(rand.nextInt(800));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
private Random rand = new Random();
private Queue<Integer> buffer;
private Object mutex;
public Consumer(Queue<Integer> buffer, Object mutex) {
this.buffer = buffer;
this.mutex = mutex;
}
@Override
public void run() {
while (true) {
synchronized (mutex) {
while (buffer.size() == 0) {
try {
mutex.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("consumed " + buffer.poll() + " left " + buffer.size());
mutex.notify();
}
try {
Thread.sleep(rand.nextInt(500));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
以Java的底层并发API,wait()和notify()实现,效率虽高,但进程控制部分和业务逻辑同样混在一起,没有完全解决问题。
BlockingQueue
public class PCBlockingQueue {
private final static int BUFFER_SIZE = 10;
public static void main(String[] args) {
BlockingQueue<Integer> buffer = new LinkedBlockingQueue<>(BUFFER_SIZE);
AtomicInteger counter = new AtomicInteger(0);
Producer producer = new Producer(buffer, counter);
Consumer consumer = new Consumer(buffer);
new Thread(producer).start();
new Thread(producer).start();
new Thread(consumer).start();
}
}
class Producer implements Runnable {
private Random rand = new Random();
private AtomicInteger counter;
private BlockingQueue<Integer> buffer;
public Producer(BlockingQueue<Integer> buffer, AtomicInteger counter) {
this.buffer = buffer;
this.counter = counter;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(rand.nextInt(800));
Integer product = counter.incrementAndGet();
buffer.put(product);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
private Random rand = new Random();
private BlockingQueue<Integer> buffer;
public Consumer(BlockingQueue<Integer> buffer) {
this.buffer = buffer;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(rand.nextInt(600));
Integer product = buffer.take(); // 队列空时,会阻塞,直到有新元素,并将新元素返回。
System.out.println("consumed " + product + " left " + buffer.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
你觉得像Java这样的企业级语言会不考虑到代码的功能分离问题吗(SRP原则)?Java中早就提供了一堆线程安全的数据结构,这里用了BlockingQueue,其他线程安全类参考java.util.concurrent包。