(手写生产者消费者模型,写BlockingQueue较简便 )
1、背景
生产者生产数据到缓冲区中,消费者从缓冲区中取数据。
如果缓冲区已经满了,则生产者线程阻塞;
如果缓冲区为空,那么消费者线程阻塞。
2、方式一:synchronized、wait和notify
定义Resouce资源类,类中定义资源池大小。资源类的add()和remove()方法是synchronized 的。生产者/消费者线程持有一个资源类Resouce的成员变量,Main方法中通过构造函数将Resouce类传入,线程run方法中操作Resouce类的add,remove方法
package producerConsumer; //wait 和 notify public class ProducerConsumerWithWaitNofity { public static void main(String[] args) { Resource resource = new Resource(); //生产者线程 ProducerThread p1 = new ProducerThread(resource); ProducerThread p2 = new ProducerThread(resource); ProducerThread p3 = new ProducerThread(resource); //消费者线程 ConsumerThread c1 = new ConsumerThread(resource); //ConsumerThread c2 = new ConsumerThread(resource); //ConsumerThread c3 = new ConsumerThread(resource); p1.start(); p2.start(); p3.start(); c1.start(); //c2.start(); //c3.start(); } } /** * 公共资源类 * @author * */ class Resource{//重要 //当前资源数量 private int num = 0; //资源池中允许存放的资源数目 private int size = 10; /** * 从资源池中取走资源 */ public synchronized void remove(){ if(num > 0){ num--; System.out.println("消费者" + Thread.currentThread().getName() + "消耗一件资源," + "当前线程池有" + num + "个"); notifyAll();//通知生产者生产资源 }else{ try { //如果没有资源,则消费者进入等待状态 wait(); System.out.println("消费者" + Thread.currentThread().getName() + "线程进入等待状态"); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 向资源池中添加资源 */ public synchronized void add(){ if(num < size){ num++; System.out.println(Thread.currentThread().getName() + "生产一件资源,当前资源池有" + num + "个"); //通知等待的消费者 notifyAll(); }else{ //如果当前资源池中有10件资源 try{ wait();//生产者进入等待状态,并释放锁 System.out.println(Thread.currentThread().getName()+"线程进入等待"); }catch(InterruptedException e){ e.printStackTrace(); } } } } /** * 消费者线程 */ class ConsumerThread extends Thread{ private Resource resource; public ConsumerThread(Resource resource){ this.resource = resource; } @Override public void run() { while(true){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } resource.remove(); } } } /** * 生产者线程 */ class ProducerThread extends Thread{ private Resource resource; public ProducerThread(Resource resource){ this.resource = resource; } @Override public void run() { //不断地生产资源 while(true){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } resource.add(); } } }
3、方式二:lock和condition的await、signalAll
package producerConsumer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 使用Lock 和 Condition解决生产者消费者问题 * @author tangzhijing * */ public class LockCondition { public static void main(String[] args) { Lock lock = new ReentrantLock(); Condition producerCondition = lock.newCondition(); Condition consumerCondition = lock.newCondition(); Resource2 resource = new Resource2(lock,producerCondition,consumerCondition); //生产者线程 ProducerThread2 producer1 = new ProducerThread2(resource); //消费者线程 ConsumerThread2 consumer1 = new ConsumerThread2(resource); ConsumerThread2 consumer2 = new ConsumerThread2(resource); ConsumerThread2 consumer3 = new ConsumerThread2(resource); producer1.start(); consumer1.start(); consumer2.start(); consumer3.start(); } } /** * 消费者线程 */ class ConsumerThread2 extends Thread{ private Resource2 resource; public ConsumerThread2(Resource2 resource){ this.resource = resource; //setName("消费者"); } public void run(){ while(true){ try { Thread.sleep((long) (1000 * Math.random())); } catch (InterruptedException e) { e.printStackTrace(); } resource.remove(); } } } /** * 生产者线程 * @author tangzhijing * */ class ProducerThread2 extends Thread{ private Resource2 resource; public ProducerThread2(Resource2 resource){ this.resource = resource; setName("生产者"); } public void run(){ while(true){ try { Thread.sleep((long) (1000 * Math.random())); } catch (InterruptedException e) { e.printStackTrace(); } resource.add(); } } } /** * 公共资源类 * @author tangzhijing * */ class Resource2{ private int num = 0;//当前资源数量 private int size = 10;//资源池中允许存放的资源数目 private Lock lock; private Condition producerCondition; private Condition consumerCondition; public Resource2(Lock lock, Condition producerCondition, Condition consumerCondition) { this.lock = lock; this.producerCondition = producerCondition; this.consumerCondition = consumerCondition; } /** * 向资源池中添加资源 */ public void add(){ lock.lock(); try{ if(num < size){ num++; System.out.println(Thread.currentThread().getName() + "生产一件资源,当前资源池有" + num + "个"); //唤醒等待的消费者 consumerCondition.signalAll(); }else{ //让生产者线程等待 try { producerCondition.await(); System.out.println(Thread.currentThread().getName() + "线程进入等待"); } catch (InterruptedException e) { e.printStackTrace(); } } }finally{ lock.unlock(); } } /** * 从资源池中取走资源 */ public void remove(){ lock.lock(); try{ if(num > 0){ num--; System.out.println("消费者" + Thread.currentThread().getName() + "消耗一件资源," + "当前资源池有" + num + "个"); producerCondition.signalAll();//唤醒等待的生产者 }else{ try { consumerCondition.await(); System.out.println(Thread.currentThread().getName() + "线程进入等待"); } catch (InterruptedException e) { e.printStackTrace(); }//让消费者等待 } }finally{ lock.unlock(); } } }
4、方式三:BlockingQueue
定义Resouce资源类,资源类持有一个BlockingQueue。生产者/消费者线程持有一个资源类Resouce的成员变量,Main方法中通过构造函数将Resouce类传入,线程run方法中操作Resouce类的add,remove方法,add,remove调用Queue的put()和take()
package producerConsumer; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; //使用阻塞队列BlockingQueue解决生产者消费者 public class BlockingQueueConsumerProducer { public static void main(String[] args) { Resource3 resource = new Resource3(); //生产者线程 ProducerThread3 p = new ProducerThread3(resource); //多个消费者 ConsumerThread3 c1 = new ConsumerThread3(resource); ConsumerThread3 c2 = new ConsumerThread3(resource); ConsumerThread3 c3 = new ConsumerThread3(resource); p.start(); c1.start(); c2.start(); c3.start(); } } /** * 消费者线程 * @author tangzhijing * */ class ConsumerThread3 extends Thread { private Resource3 resource3; public ConsumerThread3(Resource3 resource) { this.resource3 = resource; //setName("消费者"); } public void run() { while (true) { try { Thread.sleep((long) (1000 * Math.random())); } catch (InterruptedException e) { e.printStackTrace(); } resource3.remove(); } } } /** * 生产者线程 * @author tangzhijing * */ class ProducerThread3 extends Thread{ private Resource3 resource3; public ProducerThread3(Resource3 resource) { this.resource3 = resource; //setName("生产者"); } public void run() { while (true) { try { Thread.sleep((long) (1000 * Math.random())); } catch (InterruptedException e) { e.printStackTrace(); } resource3.add(); } } } class Resource3{ private BlockingQueue<Integer> resourceQueue = new LinkedBlockingQueue<>(10); /** * 向资源池中添加资源 */ public void add(){ try { resourceQueue.put(1); //1当做生产和消费的Integer资源 System.out.println("生产者" + Thread.currentThread().getName() + "生产一件资源," + "当前资源池有" + resourceQueue.size() + "个资源"); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 向资源池中移除资源 */ public void remove(){ try { resourceQueue.take(); System.out.println("消费者" + Thread.currentThread().getName() + "消耗一件资源," + "当前资源池有" + resourceQueue.size() + "个资源"); } catch (InterruptedException e) { e.printStackTrace(); } } }
为什么用put和take:
为什么用put和take:https://blog.csdn.net/qiuchaoxi/article/details/80359462