zoukankan      html  css  js  c++  java
  • 生产者消费者模式--

    生产者/消费者问题的多种Java实现方式一:
    wait() / notify()方法
    
    package com.etc.jichu;
    import java.util.LinkedList;
    /*
     * wait() / nofity()方法是基类Object的两个方法,也就意味着所有Java类都会拥有这两个方法,这样,我们就可以为任何对象实现同步机制。
    wait()方法:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等等状态,让其他线程执行。
    notify()方法:当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。
     */
    //仓库类Storage实现缓冲区
    public class Storage {
    	// 仓库最大存储量
    private final int MAX_SIZE=100;
    	//仓库存储的载体
    private LinkedList<Object> list=new LinkedList<Object>();
    
    public LinkedList<Object> getList() {
    	return list;
    }
    public void setList(LinkedList<Object> list) {
    	this.list = list;
    }
    public int getMAX_SIZE() {
    	return MAX_SIZE;
    }
    //生产num个产品
    public void produce(int num){
    	// 同步代码段
    	synchronized (list)
    	{
    		// 如果仓库剩余容量不足
    		while(list.size()+num>MAX_SIZE)
    		{
    			System.out.println("【要生产的产品数量】:" + num + "/t【库存量】:"  
                        + list.size() + "/t暂时不能执行生产任务!");
    			try 
    			{
    				// 由于条件不满足,生产阻塞
    				list.wait();
    			} 
    			catch (InterruptedException e) 
    			{
    				
    				e.printStackTrace();
    			}
    		}
    		// 生产条件满足情况下,生产num个产品  
    		for(int i=1;i<=num;++i)
    		{
    			list.add(new Object());
    		}
    		System.out.println("【已经生产产品数】:" + num + "/t【现仓储量为】:" + list.size());  
    		list.notifyAll();
    	}
    }
    //消费num个产品
    public void consume(int num)
    {
    	// 同步代码段 
    	synchronized (list) {
    		// 如果仓库存储量不足 
    		while(list.size()<num)
    		{
    			System.out.println("【要消费的产品数量】:" + num + "/t【库存量】:"  
                        + list.size() + "/t暂时不能执行生产任务!");
    			try 
    			{
    				// 由于条件不满足,消费阻塞 
    				list.wait();
    			} 
    			catch (InterruptedException e) 
    			{
    				
    				e.printStackTrace();
    			}
    		}
    		// 消费条件满足情况下,消费num个产品
    		for(int i=1;i<=num;++i)
    		{
    			list.remove();
    		}
    		System.out.println("【已经消费产品数】:" + num + "/t【现仓储量为】:" + list.size());  		  
            list.notifyAll(); 
    	}
    }
    }
    ==============================================================
    package com.etc.jichu;
    //生产者类Producer继承线程类Thread
    public class Producer extends Thread{
    private int num;// 每次生产的产品数量
    private Storage storage; // 所在放置的仓库
    
    public int getNum() {
    	return num;
    }
    public void setNum(int num) {
    	this.num = num;
    }
    public Storage getStorage() {
    	return storage;
    }
    public void setStorage(Storage storage) {
    	this.storage = storage;
    }
    //构造函数,设置仓库 
    public Producer(Storage storage) {
    	
    	this.storage = storage;
    }
    //线程run函数
    public void run()
    {
    	produce(num);
    }
    //调用仓库Storage的生产函数
    public void produce(int num)
    {
    	storage.produce(num);
    }
    }
    ======================================================
    package com.etc.jichu;
    //消费者类Consumer继承线程类Thread
    public class Consumer extends Thread
    {
    private int num;// 每次消费的产品数量
    private Storage storage;// 所在放置的仓库 
    
    public int getNum() {
    	return num;
    }
    public void setNum(int num) {
    	this.num = num;
    }
    public Storage getStorage() {
    	return storage;
    }
    public void setStorage(Storage storage) {
    	this.storage = storage;
    }
    //构造函数,设置仓库
    public Consumer(Storage storage) {
    	
    	this.storage = storage;
    }
    //线程run函数
    public void run()
    {
    	consume(num);
    }
    //调用仓库Storage的生产函数 
    public void consume(int num)
    {
    	storage.consume(num);
    }
    }
    =============================================
    package com.etc.jichu;
    
    public class Test 
    {
    public static void main(String[] args) {
    	// 仓库对象 
    	Storage storage=new Storage();
    	// 生产者对象  
        Producer p1 = new Producer(storage);  
        Producer p2 = new Producer(storage);  
        Producer p3 = new Producer(storage);  
        Producer p4 = new Producer(storage);  
        Producer p5 = new Producer(storage);  
         
     // 消费者对象  
        Consumer c1 = new Consumer(storage);  
        Consumer c2 = new Consumer(storage);  
        Consumer c3 = new Consumer(storage);  
     // 设置生产者产品生产数量  
        p1.setNum(20);  
        p2.setNum(20);  
        p3.setNum(20);  
        p4.setNum(20);  
        p5.setNum(20); 
     // 设置消费者产品消费数量  
        c1.setNum(50);  
        c2.setNum(20);  
        c3.setNum(30);  
     // 线程开始执行
        c1.start();  
        c2.start();  
        c3.start();  
        p1.start();  
        p2.start();  
        p3.start();  
        p4.start();  
        p5.start();  
    }
    }
    
    //在Storage类中定义public void produce(int num);和public void consume(int num);方法,并在生产者类Producer和消费者类Consumer中调用Storage类中的实现,将可能发生的变化集中到一个类中,不影响原有的构架设计,同时无需修改其他业务层代码。
    
    生产者/消费者问题的多种Java实现方式二:
    await() / signal()方法
    在JDK5.0之后,Java提供了更加健壮的线程处理机制,包括同步、锁定、线程池等,它们可以实现更细粒度的线程控制。await()和signal()就是其中用来做同步的两种方法,它们的功能基本上和wait() / nofity()相同,完全可以取代它们,但是它们和新引入的锁定机制Lock直接挂钩,具有更大的灵活性。通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。
    
    //只需要更新仓库类Storage的代码即可,生产者Producer、消费者Consumer、测试类Test的代码均不需要进行任何更改。
    package com.etc.jichu;
    import java.util.LinkedList;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    //仓库类Storage实现缓冲区
    public class Storage {
    	// 仓库最大存储量
    private final int MAX_SIZE=100;
    	//仓库存储的载体
    private LinkedList<Object> list=new LinkedList<Object>();
    //锁
    private final Lock lock=new ReentrantLock();//重入锁ReentrantLock
    //仓库满的条件变量  
    private final Condition full=lock.newCondition();
    //仓库空的条件变量
    private final Condition empty=lock.newCondition();
    public LinkedList<Object> getList() {
    	return list;
    }
    public void setList(LinkedList<Object> list) {
    	this.list = list;
    }
    public int getMAX_SIZE() {
    	return MAX_SIZE;
    }
    //生产num个产品
    public void produce(int num){
    	lock.lock();// 获得锁
    		// 如果仓库剩余容量不足
    		while(list.size()+num>MAX_SIZE)
    		{
    			System.out.println("【要生产的产品数量】:" + num + "/t【库存量】:"  
                        + list.size() + "/t暂时不能执行生产任务!");
    			try 
    			{
    				// 由于条件不满足,生产阻塞
    				full.wait();
    			} 
    			catch (InterruptedException e) 
    			{
    				
    				e.printStackTrace();
    			}
    		}
    		// 生产条件满足情况下,生产num个产品  
    		for(int i=1;i<=num;++i)
    		{
    			list.add(new Object());
    		}
    		System.out.println("【已经生产产品数】:" + num + "/t【现仓储量为】:" + list.size());  
    		// 唤醒其他所有线程
    		full.signalAll();
    		empty.signalAll();
    		lock.unlock();// 释放锁 
    }
    //消费num个产品
    public void consume(int num)
    {
    	//获得锁
    	lock.lock();
    		// 如果仓库存储量不足 
    		while(list.size()<num)
    		{
    			System.out.println("【要消费的产品数量】:" + num + "/t【库存量】:"  
                        + list.size() + "/t暂时不能执行生产任务!");
    			try 
    			{
    				// 由于条件不满足,消费阻塞 
    				empty.wait();
    			} 
    			catch (InterruptedException e) 
    			{
    				
    				e.printStackTrace();
    			}
    		}
    		// 消费条件满足情况下,消费num个产品
    		for(int i=1;i<=num;++i)
    		{
    			list.remove();
    		}
    		System.out.println("【已经消费产品数】:" + num + "/t【现仓储量为】:" + list.size());  		  
    		// 唤醒其他所有线程
    		full.signalAll();
    		empty.signalAll();
    		lock.unlock();// 释放锁 
    	
    }
    }
    ===================================
    
    生产者/消费者问题的多种Java实现方式三:
    BlockingQueue阻塞队列方法
    BlockingQueue是JDK5.0的新增内容,它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种await() / signal()方法。它可以在生成对象时指定容量大小。它用于阻塞操作的是put()和take()方法。
    put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。
    take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。
    
    //只需要更改仓库类Storage的代码即可
    public class Storage  
    {  
        // 仓库最大存储量  
        private final int MAX_SIZE = 100;  
      
        // 仓库存储的载体  
        private LinkedBlockingQueue<Object> list = new LinkedBlockingQueue<Object>(  
                100);  
      
        // 生产num个产品  
        public void produce(int num)  
        {  
            // 如果仓库剩余容量为0  
            if (list.size() == MAX_SIZE)  
            {  
                System.out.println("【库存量】:" + MAX_SIZE + "/t暂时不能执行生产任务!");  
            }  
      
            // 生产条件满足情况下,生产num个产品  
            for (int i = 1; i <= num; ++i)  
            {  
                try  
                {  
                    // 放入产品,自动阻塞  
                    list.put(new Object());  
                }  
                catch (InterruptedException e)  
                {  
                    e.printStackTrace();  
                }  
      
                System.out.println("【现仓储量为】:" + list.size());  
            }  
        }  
      
        // 消费num个产品  
        public void consume(int num)  
        {  
            // 如果仓库存储量不足  
            if (list.size() == 0)  
            {  
                System.out.println("【库存量】:0/t暂时不能执行生产任务!");  
            }  
      
            // 消费条件满足情况下,消费num个产品  
            for (int i = 1; i <= num; ++i)  
            {  
                try  
                {  
                    // 消费产品,自动阻塞  
                    list.take();  
                }  
                catch (InterruptedException e)  
                {  
                    e.printStackTrace();  
                }  
            }  
      
            System.out.println("【现仓储量为】:" + list.size());  
        }  
      
        // set/get方法  
        public LinkedBlockingQueue<Object> getList()  
        {  
            return list;  
        }  
      
        public void setList(LinkedBlockingQueue<Object> list)  
        {  
            this.list = list;  
        }  
      
        public int getMAX_SIZE()  
        {  
            return MAX_SIZE;  
        }  
    }  
    
    有时使用BlockingQueue可能会出现put()和System.out.println()输出不匹配的情况,这是由于它们之间没有同步造成的。当缓冲区已满,生产者在put()操作时,put()内部调用了await()方法,放弃了线程的执行,然后消费者线程执行,调用take()方法,take()内部调用了signal()方法,通知生产者线程可以执行,致使在消费者的println()还没运行的情况下生产者的println()先被执行,所以有了输出不匹配的情况。
    对于BlockingQueue大家可以放心使用,这可不是它的问题,只是在它和别的对象之间的同步有问题。
    ==============================================================================================
    

      

  • 相关阅读:
    hadoop再次集群搭建(3)-如何选择相应的hadoop版本
    48. Rotate Image
    352. Data Stream as Disjoint Interval
    163. Missing Ranges
    228. Summary Ranges
    147. Insertion Sort List
    324. Wiggle Sort II
    215. Kth Largest Element in an Array
    快速排序
    280. Wiggle Sort
  • 原文地址:https://www.cnblogs.com/ipetergo/p/6385419.html
Copyright © 2011-2022 走看看