zoukankan      html  css  js  c++  java
  • Java并发---生产者消费者实现

    生产者消费者

    生产者消费者模型是并发时线程之间同步和通信重要的实现,本文主要用一下四种方式来实现

    1. wait()/notify()方法
    2. 显式Lock和Condition
    3. BlockingQueue阻塞队列方法
    4. PipedWriter/PipedReader方法

    wait()/notify()方法

    wait() / nofity()方法是基类Object的两个方法,也就意味着所有Java类都会拥有这两个方法,这样,我们就可以为任何对象实现同步机制。
    wait()方法:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,让出CPU和放弃锁,使自己处于等待状态,让其他线程执行。
    notifyAll()方法:当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。
    实现代码如下:

    package concurrency.interview;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    public class ProducerAndConsumer_1 {
    	static class Factory {
    
    		private final int MAX_SIZE = 10;
    		private int number = 0;
    
    		private Object obj = new Object();
    
    		// private LinkedList<Object> list = new LinkedList<>();
    
    		public void produce() throws Exception {
    
    			synchronized (obj) {
    				while (number == MAX_SIZE) {
    					System.out.println("仓库已满!请先消费");
    					obj.wait();
    				}
    
    				number++;
    				System.out.println("生产成功");
    				TimeUnit.SECONDS.sleep(1);
    				obj.notifyAll();
    			}
    
    		}
    
    		public void consumer() throws Exception {
    			synchronized (obj) {
    				while (number <= 0) {
    					System.out.println("仓库是空的,不能消费");
    					obj.wait();
    				}
    
    				number--;
    				System.out.println("消费成功");
    				TimeUnit.SECONDS.sleep(1);
    				obj.notifyAll();
    			}
    
    		}
    
    	}
    
    	static class Producer implements Runnable {
    		Factory factory;
    
    		Producer(Factory factory) {
    			this.factory = factory;
    		}
    
    		@Override
    		public void run() {
    
    			try {
    				while (!Thread.interrupted()) {
    					factory.produce();
    				}
    			} catch (Exception e) {
    				System.out.println("结束");
    			}
    		}
    
    	}
    
    	static class Consumer implements Runnable {
    		Factory factory;
    
    		Consumer(Factory factory) {
    			this.factory = factory;
    		}
    
    		@Override
    		public void run() {
    			try {
    				while (!Thread.interrupted()) {
    					factory.consumer();
    				}
    			} catch (Exception e) {
    				System.out.println("结束");
    			}
    		}
    
    	}
    
    	public static void main(String[] args) throws Exception {
    		Factory factory = new Factory(); 
    		ExecutorService executor = Executors.newCachedThreadPool();
    		executor.execute(new Producer(factory));
    		executor.execute(new Consumer(factory));
    
    		TimeUnit.SECONDS.sleep(2);
    
    		executor.shutdownNow();
    
    		// Thread producer = new Thread(new Producer(factory));
    		// Thread consumer = new Thread(new Consumer(factory));
    		//
    		// producer.start();
    		// consumer.start();
    		//
    		// TimeUnit.SECONDS.sleep(5);
    		//
    		// producer.interrupt();
    		// consumer.interrupt();
    	}
    }
    
    

    显式Lock和Condition

    JDK1.5引入了Lock和Condition,使用它们会更安全。可以通过在Condition上调用await()方法来挂起任务,类似wait(),调用signal()/signalAll(),类似notify()/notifyAll()

    代码如下:

    package concurrency.interview;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class ProducerAndConsumer_2 {
    	static class Factory {
    
    		private final int MAX_SIZE = 10;
    		private int number = 0;
    
    		
    		private Lock lock = new ReentrantLock();
    		private Condition condition = lock.newCondition();
    
    		public void produce() throws Exception {
    			lock.lock();
    			try {
    				while (number == MAX_SIZE) {
    					System.out.println("仓库已满!请先消费");
    					condition.await();
    				}
    
    				number++;
    				System.out.println("生产成功");
    				TimeUnit.SECONDS.sleep(1);
    				condition.signalAll();
    			} finally {
    				lock.unlock();
    			}
    
    		}
    
    		public void consumer() throws Exception {
    			lock.lock();
    			try {
    				while (number <= 0) {
    					System.out.println("仓库是空的,不能消费");
    					condition.await();
    				}
    
    				number--;
    				System.out.println("消费成功");
    				TimeUnit.SECONDS.sleep(1);
    				condition.signalAll();
    			} finally {
    				lock.unlock();
    			}
    
    		}
    
    	}
    
    	static class Producer implements Runnable {
    		Factory factory;
    
    		Producer(Factory factory) {
    			this.factory = factory;
    		}
    
    		@Override
    		public void run() {
    
    			try {
    				while (!Thread.interrupted()) {
    					factory.produce();
    				}
    			} catch (Exception e) {
    				System.out.println("结束");
    			}
    		}
    
    	}
    
    	static class Consumer implements Runnable {
    		Factory factory;
    
    		Consumer(Factory factory) {
    			this.factory = factory;
    		}
    
    		@Override
    		public void run() {
    			try {
    				while (!Thread.interrupted()) {
    					factory.consumer();
    				}
    			} catch (Exception e) {
    				System.out.println("结束");
    			}
    		}
    
    	}
    
    	public static void main(String[] args) throws Exception {
    		Factory factory = new Factory();
    		ExecutorService executor = Executors.newCachedThreadPool();
    		executor.execute(new Producer(factory));
    		executor.execute(new Consumer(factory));
    
    		TimeUnit.SECONDS.sleep(2);
    
    		executor.shutdownNow();
    
    		// Thread producer = new Thread(new Producer(factory));
    		// Thread consumer = new Thread(new Consumer(factory));
    		//
    		// producer.start();
    		// consumer.start();
    		//
    		// TimeUnit.SECONDS.sleep(5);
    		//
    		// producer.interrupt();
    		// consumer.interrupt();
    	}
    }
    
    

    BlockingQueue阻塞队列方法

    BlockingQueue也是JDK1.5的新增内容,它是已经在内部实现了同步的队列。主要有以下两个方法
    put()方法:容量达到最大时,自动阻塞。
    take()方法:容量为0时,自动阻塞。
    我们可以看看ArrayBlockingQueue的上面两个方法实现

    public void put(E e) throws InterruptedException {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length)
                    notFull.await();
                enqueue(e);
            } finally {
                lock.unlock();
            }
        }
    
    public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0)
                    notEmpty.await();
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
    

    可以看到ArrayBlockingQueue内部的同步就是使用的Lock和Condition。
    使用BlockingQueue生产者和消费者代码如下:

    package concurrency.interview;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    public class ProducerAndConsumer_3 {
    	static class Factory {
    
    		private final int MAX_SIZE = 10;
    
    		private BlockingQueue<Object> queue = new ArrayBlockingQueue<>(MAX_SIZE);
    
    		public void produce() throws Exception {
    
    			queue.put(new Object());
    			TimeUnit.SECONDS.sleep(1);
    			System.out.println("生产成功");
    
    		}
    
    		public void consumer() throws Exception {
    
    			queue.take();
    			TimeUnit.SECONDS.sleep(1);
    			System.out.println("消费成功");
    
    		}
    
    	}
    
    	static class Producer implements Runnable {
    		Factory factory;
    
    		Producer(Factory factory) {
    			this.factory = factory;
    		}
    
    		@Override
    		public void run() {
    
    			try {
    				while (!Thread.interrupted()) {
    					factory.produce();
    				}
    			} catch (Exception e) {
    				System.out.println("结束");
    			}
    		}
    
    	}
    
    	static class Consumer implements Runnable {
    		Factory factory;
    
    		Consumer(Factory factory) {
    			this.factory = factory;
    		}
    
    		@Override
    		public void run() {
    			try {
    				while (!Thread.interrupted()) {
    					factory.consumer();
    				}
    			} catch (Exception e) {
    				System.out.println("结束");
    			}
    		}
    
    	}
    
    	public static void main(String[] args) throws Exception {
    		Factory factory = new Factory();
    		ExecutorService executor = Executors.newCachedThreadPool();
    		executor.execute(new Producer(factory));
    		executor.execute(new Producer(factory));
    		executor.execute(new Consumer(factory));
    
    		TimeUnit.SECONDS.sleep(5);
    
    		executor.shutdownNow();
    
    		// Thread producer = new Thread(new Producer(factory));
    		// Thread consumer = new Thread(new Consumer(factory));
    		//
    		// producer.start();
    		// consumer.start();
    		//
    		// TimeUnit.SECONDS.sleep(5);
    		//
    		// producer.interrupt();
    		// consumer.interrupt();
    	}
    }
    
    

    PipedWriter/PipedReader方法

    在jdk引入BlockingQueue之前大多是使用这种方式来实现同步和通信,它基本上可以看做是一个阻塞队列,代码如下:

    package concurrency.interview;
    
    import java.io.IOException;
    import java.io.PipedReader;
    import java.io.PipedWriter;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    public class ProducerAndConsumer_4 {
    	static class Factory {
    
    		private final int MAX_SIZE = 10;
    
    //		private BlockingQueue<Object> queue = new ArrayBlockingQueue<>(MAX_SIZE);
    		private PipedWriter out = null;
    		private PipedReader in = null;
    		
    		Factory() throws IOException {
    			out = new PipedWriter();
    			in = new PipedReader(out);
    		}
    		
    		public void produce() throws Exception {
    
    			out.write("hello world!");;
    			TimeUnit.SECONDS.sleep(1);
    			System.out.println("生产成功");
    
    		}
    
    		public void consumer() throws Exception {
    
    			char[] buf = new char[12];
    			in.read(buf);
    			TimeUnit.SECONDS.sleep(1);
    			System.out.println("消费成功");
    			for (char c : buf) {
    				System.out.print(c);
    			}
    			System.out.println();
    		}
    
    	}
    
    	static class Producer implements Runnable {
    		Factory factory;
    
    		Producer(Factory factory) {
    			this.factory = factory;
    		}
    
    		@Override
    		public void run() {
    
    			try {
    				while (!Thread.interrupted()) {
    					factory.produce();
    				}
    			} catch (Exception e) {
    				System.out.println("结束");
    			}
    		}
    
    	}
    
    	static class Consumer implements Runnable {
    		Factory factory;
    
    		Consumer(Factory factory) {
    			this.factory = factory;
    		}
    
    		@Override
    		public void run() {
    			try {
    				while (!Thread.interrupted()) {
    					factory.consumer();
    				}
    			} catch (Exception e) {
    				System.out.println("结束");
    			}
    		}
    
    	}
    
    	public static void main(String[] args) throws Exception {
    		Factory factory = new Factory();
    		ExecutorService executor = Executors.newCachedThreadPool();
    		executor.execute(new Producer(factory));
    		executor.execute(new Producer(factory));
    		executor.execute(new Consumer(factory));
    
    		TimeUnit.SECONDS.sleep(5);
    
    		executor.shutdownNow();
    
    		// Thread producer = new Thread(new Producer(factory));
    		// Thread consumer = new Thread(new Consumer(factory));
    		//
    		// producer.start();
    		// consumer.start();
    		//
    		// TimeUnit.SECONDS.sleep(5);
    		//
    		// producer.interrupt();
    		// consumer.interrupt();
    	}
    }
    
    

    参考资料

    http://blog.csdn.net/monkey_d_meng/article/details/6251879/
    https://zhuanlan.zhihu.com/p/20300609
    《Java编程思想》

  • 相关阅读:
    Luogu P4716 【模板】最小树形图
    P4180 严格次小生成树[BJWC2010] Kruskal,倍增
    LA4080/UVa1416 Warfare And Logistics 最短路树
    LA4255/UVa1423 Guess 拓扑排序 并查集
    【算法竞赛入门经典—训练指南】学习笔记(含例题代码与思路)第三章:实用数据结构
    【算法竞赛入门经典—训练指南】学习笔记(含例题代码与思路)第二章:数学基础
    【算法竞赛入门经典—训练指南】学习笔记(含例题代码与思路)第一章:算法设计基础
    P4177 [CEOI2008]order 网络流,最小割,最大权闭合子图
    [USACO5.1] 乐曲主题Musical Themes
    [USACO06DEC] 牛奶模式Milk Patterns
  • 原文地址:https://www.cnblogs.com/yangtong/p/7155078.html
Copyright © 2011-2022 走看看