生产/消费者问题是个非常典型的多线程问题,涉及到的对象包括“生产者”、“消费者”、“仓库”和“产品”。他们之间的关系如下:
① 生产者仅仅在仓储未满时候生产,仓满则停止生产。
② 消费者仅仅在仓储有产品时候才能消费,仓空则等待。
③ 当消费者发现仓库没产品可消费时候会通知生产者生产。
④ 生产者在生产出可消费产品时候,应该通知等待的消费者去消费。
用wait/notify/notifyAll实现和用Lock的Condition实现。
用wait/notify/notifyAll 实现生产者消费者模型:
方法一:用五个类来实现,分别为Produce(实现生产过程), Consumer(实现消费过程), ProduceThread(实现生产者线程),ConsumeThread(实现消费者线程),Main等。需要注意的是有两个地方。
① 用while判断当前list是否为空;
② 调用的是object的notifyAll()方法而不是notify()方法。
方法二:用四个类实现,分别为MyService(实现生产和消费过程用synchronized关键字实现同步),ProduceThread(实现生产者线程),ConsumeThread(实现消费者线程),Main。需要注意的也是方法一中的两个地方while和notifyAll()。
用Lock和Condition实现。共有四个类,分别是分别为MyService(实现生产和消费过程,用lock实现线程间同步),ProduceThread(实现生产者线程),ConsumeThread(实现消费者线程),Main。需要注意的也是方法一中的两个地方while和signalAll()。
方法一:
- package ProduceConsumer;
- import java.util.ArrayList;
- public class Produce {
- public Object object;
- public ArrayList<Integer> list;//用list存放生产之后的数据,最大容量为1
- public Produce(Object object,ArrayList<Integer> list ){
- this.object = object;
- this.list = list;
- }
- public void produce() {
- synchronized (object) {
- /*只有list为空时才会去进行生产操作*/
- try {
- while(!list.isEmpty()){
- System.out.println("生产者"+Thread.currentThread().getName()+" waiting");
- object.wait();
- }
- int value = 9999;
- list.add(value);
- System.out.println("生产者"+Thread.currentThread().getName()+" Runnable");
- object.notifyAll();//然后去唤醒因object调用wait方法处于阻塞状态的线程
- }catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- package ProduceConsumer;
- import java.util.ArrayList;
- public class Consumer {
- public Object object;
- public ArrayList<Integer> list;//用list存放生产之后的数据,最大容量为1
- public Consumer(Object object,ArrayList<Integer> list ){
- this.object = object;
- this.list = list;
- }
- public void consmer() {
- synchronized (object) {
- try {
- /*只有list不为空时才会去进行消费操作*/
- while(list.isEmpty()){
- System.out.println("消费者"+Thread.currentThread().getName()+" waiting");
- object.wait();
- }
- list.clear();
- System.out.println("消费者"+Thread.currentThread().getName()+" Runnable");
- object.notifyAll();//然后去唤醒因object调用wait方法处于阻塞状态的线程
- }catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- package ProduceConsumer;
- public class ProduceThread extends Thread {
- private Produce p;
- public ProduceThread(Produce p){
- this.p = p;
- }
- @Override
- public void run() {
- while (true) {
- p.produce();
- }
- }
- }
- package ProduceConsumer;
- public class ConsumeThread extends Thread {
- private Consumer c;
- public ConsumeThread(Consumer c){
- this.c = c;
- }
- @Override
- public void run() {
- while (true) {
- c.consmer();
- }
- }
- }
- package ProduceConsumer;
- import java.util.ArrayList;
- public class Main {
- public static void main(String[] args) {
- Object object = new Object();
- ArrayList<Integer> list = new ArrayList<Integer>();
- Produce p = new Produce(object, list);
- Consumer c = new Consumer(object, list);
- ProduceThread[] pt = new ProduceThread[2];
- ConsumeThread[] ct = new ConsumeThread[2];
- for(int i=0;i<2;i++){
- pt[i] = new ProduceThread(p);
- pt[i].setName("生产者 "+(i+1));
- ct[i] = new ConsumeThread(c);
- ct[i].setName("消费者"+(i+1));
- pt[i].start();
- ct[i].start();
- }
- }
- }
方法二:
- package ProduceConsumer2;
- import java.util.ArrayList;
- public class MyService {
- public ArrayList<Integer> list = new ArrayList<Integer>();//用list存放生产之后的数据,最大容量为1
- synchronized public void produce() {
- try {
- /*只有list为空时才会去进行生产操作*/
- while(!list.isEmpty()){
- System.out.println("生产者"+Thread.currentThread().getName()+" waiting");
- this.wait();
- }
- int value = 9999;
- list.add(value);
- System.out.println("生产者"+Thread.currentThread().getName()+" Runnable");
- this.notifyAll();//然后去唤醒因object调用wait方法处于阻塞状态的线程
- }catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- synchronized public void consmer() {
- try {
- /*只有list不为空时才会去进行消费操作*/
- while(list.isEmpty()){
- System.out.println("消费者"+Thread.currentThread().getName()+" waiting");
- this.wait();
- }
- list.clear();
- System.out.println("消费者"+Thread.currentThread().getName()+" Runnable");
- this.notifyAll();//然后去唤醒因object调用wait方法处于阻塞状态的线程
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- package ProduceConsumer2;
- public class ProduceThread extends Thread {
- private MyService p;
- public ProduceThread(MyService p){
- this.p = p;
- }
- @Override
- public void run() {
- while (true) {
- p.produce();
- }
- }
- }
- package ProduceConsumer2;
- public class ConsumeThread extends Thread {
- private MyService c;
- public ConsumeThread(MyService c){
- this.c = c;
- }
- @Override
- public void run() {
- while (true) {
- c.consmer();
- }
- }
- }
用Lock和Condition实现
- package ConditionProduceConsumer;
- import java.util.concurrent.locks.Condition;
- import java.util.concurrent.locks.ReentrantLock;
- public class MyService {
- private ReentrantLock lock = new ReentrantLock();
- private Condition condition = lock.newCondition();
- private boolean hasValue = false;
- public void produce() {
- lock.lock();
- try {
- /*只有list为空时才会去进行生产操作*/
- while(hasValue == true){
- System.out.println("生产者"+Thread.currentThread().getName()+" waiting");
- condition.await();
- }
- hasValue = true;
- System.out.println("生产者"+Thread.currentThread().getName()+" Runnable");
- condition.signalAll();//然后去唤醒因object调用wait方法处于阻塞状态的线程
- } catch (InterruptedException e) {
- e.printStackTrace();
- }finally{
- lock.unlock();
- }
- }
- public void consmer() {
- lock.lock();
- try {
- /*只有list为空时才会去进行生产操作*/
- while(hasValue == false){
- System.out.println("消费者"+Thread.currentThread().getName()+" waiting");
- condition.await();
- }
- hasValue = false;
- System.out.println("消费者"+Thread.currentThread().getName()+" Runnable");
- condition.signalAll();//然后去唤醒因object调用wait方法处于阻塞状态的线程
- } catch (InterruptedException e) {
- e.printStackTrace();
- }finally{
- lock.unlock();
- }
- }
- }
- package ConditionProduceConsumer;
- public class ProduceThread extends Thread {
- private MyService p;
- public ProduceThread(MyService p){
- this.p = p;
- }
- @Override
- public void run() {
- while (true) {
- p.produce();
- }
- }
- }
- package ConditionProduceConsumer;
- public class ConsumeThread extends Thread {
- private MyService c;
- public ConsumeThread(MyService c){
- this.c = c;
- }
- @Override
- public void run() {
- while (true) {
- c.consmer();
- }
- }
- }
- package ConditionProduceConsumer;
- public class Main {
- public static void main(String[] args) {
- MyService service = new MyService();
- ProduceThread[] pt = new ProduceThread[2];
- ConsumeThread[] ct = new ConsumeThread[2];
- for(int i=0;i<1;i++){
- pt[i] = new ProduceThread(service);
- pt[i].setName("Condition 生产者 "+(i+1));
- ct[i] = new ConsumeThread(service);
- ct[i].setName("Condition 消费者"+(i+1));
- pt[i].start();
- ct[i].start();
- }
- }
- }