1、使用 BlockingQueue
ArrayBlockingQueue 完成了很多工作,比如队列满了就去阻塞生产者线程,队列有空就去唤醒生产者线程等。
import java.util.concurrent.*; public class MyProdCons { public static void main(String[] args){ BlockingQueue<Object> queue = new ArrayBlockingQueue<>(10); Runnable producer = () ->{ int num1 = 0; while(true) { try { queue.put(new Object()); System.out.println("生产个数: " + num1++); }catch(InterruptedException e){ e.printStackTrace(); } } }; new Thread(producer).start(); new Thread(producer).start(); Runnable consumer = () ->{ int num2 = 0; while(true) { try { queue.take(); System.out.println("消费个数: " + num2++); }catch(InterruptedException e) { e.printStackTrace(); } } }; new Thread(consumer).start(); new Thread(consumer).start(); } }
2、使用 Condition
import java.util.*; import java.util.concurrent.*; import java.util.concurrent.locks.*; public class MyBlockingQueueForCondition { private Queue<Object> queue; private int max = 16; private ReentrantLock lock = new ReentrantLock(); private Condition notEmpty = lock.newCondition(); private Condition notFull = lock.newCondition(); public MyBlockingQueueForCondition(int size) { this.max = size; queue = new LinkedList(); } public void put(Object o) throws InterruptedException{ lock.lock(); try { while(queue.size() == max) { notFull.await(); } queue.add(o); notEmpty.signalAll(); }finally{ lock.unlock(); } } public Object take() throws InterruptedException{ lock.lock(); try { while(queue.size() == 0) { notEmpty.await(); } Object item = queue.remove(); notFull.signalAll(); return item; }finally { lock.unlock(); } } }
3、使用 wait/notify
public class WaitStyle { public static void main(String[] args) { MyBlockingQueue myBlockingQueue = new MyBlockingQueue(5); Producer producer = new Producer(myBlockingQueue); Consumer consumer = new Consumer(myBlockingQueue); new Thread(producer).start(); new Thread(consumer).start(); } } class Producer implements Runnable{ private MyBlockingQueue storage; public Producer(MyBlockingQueue storage) { this.storage = storage; } public void run() { for(int i=0;i<10;i++) { try { storage.put(); }catch(InterruptedException e) { e.printStackTrace(); } } } } class Consumer implements Runnable{ private MyBlockingQueue storage; public Consumer(MyBlockingQueue storage) { this.storage = storage; } public void run() { for(int i=0;i<10;i++) { try { storage.take(); }catch(InterruptedException e) { e.printStackTrace(); } } } } import java.util.*; public class MyBlockingQueue { private int maxSize; private LinkedList<Object> storage; public MyBlockingQueue(int size) { this.maxSize = size; storage = new LinkedList<>(); } public synchronized void put() throws InterruptedException{ while(storage.size() == maxSize) { wait(); } storage.add(new Object()); System.out.println("放入后个数 " + storage.size()); notifyAll(); } public synchronized void take() throws InterruptedException{ while(storage.size() == 0) { wait(); } //System.out.println(storage.remove()); storage.remove(); System.out.println("取出后个数 " + storage.size()); notifyAll(); } }
以上是三种实现生产者消费者模式的方法,其中,第一种 BlockingQueue 模式实现比较简单,但其背后的实现原理在第二种、第三种实现方法中得以体现,第二种、第三种实现方法本质上是我们自己实现了 BlockingQueue 的一些核心逻辑,供生产者与消费者使用。
ref:
拉勾课堂 徐隆曦 《Java 并发编程 78 讲》