zoukankan      html  css  js  c++  java
  • 并发模式(三)——生产者-消费模式

        生产者-消费模式,通常有两类线程。即若干个生产者线程和若干个消费者线程。生产者线程负责提交用户请求,消费者线程负责详细处理生产者提交的任务。两者之间通过共享内存缓冲去进行通信。

    一、架构模式图:

    类图:

    生产者:提交用户请求。提取用户任务。并装入内存缓冲区;

    消费者:在内存缓冲区中提取并处理任务。

    内存缓冲区:缓存生产者提交的任务或数据,供消费者使用;

    任务:生产者向内存缓冲区提交的数据结构;

    Main:使用生产者和消费者的client。


    二、代码实现一个基于生产者-消费者模式的求整数平方的并行计算:

    (1)Producer生产者线程:

    <span style="font-size:18px;">package ProducerConsumer;
    
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class Producer  implements Runnable{
    	
    	//Volatile修饰的成员变量在每次被线程訪问时。都强迫从共享内存中重读该成员变量的值。
    	//并且。当成员变量发生变化时,强迫线程将变化值回写到共享内存。
    	//这样在不论什么时刻,两个不同的线程总是看到某个成员变量的同一个值。

    private volatile boolean isRunning= true; //内存缓冲区 private BlockingQueue<PCData> queue; //总数。原子操作 private static AtomicInteger count = new AtomicInteger(); private static final int SLEEPTIME=1000; public Producer(BlockingQueue<PCData> queue) { this.queue = queue; } @Override public void run() { PCData data=null; Random r = new Random(); System.out.println("start producer id = "+ Thread .currentThread().getId()); try{ while(isRunning){ Thread.sleep(r.nextInt(SLEEPTIME)); //构造任务数据 data= new PCData(count.incrementAndGet()); System.out.println("data is put into queue "); //提交数据到缓冲区 if(!queue.offer(data,2,TimeUnit.SECONDS)){ System.out.println("faile to put data: "+ data); } } }catch (InterruptedException e){ e.printStackTrace(); Thread.currentThread().interrupt(); } } public void stop(){ isRunning=false; } } </span>


    (2)Consumer消费者线程:

    <span style="font-size:18px;">package ProducerConsumer;
    
    import java.text.MessageFormat;
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    
    public class Consumer implements Runnable {
    	//缓冲区	
    	private BlockingQueue<PCData> queue;
    	private static final int SLEEPTIME=1000;
    	
    	
    	public Consumer(BlockingQueue<PCData> queue) {		
    		this.queue = queue;
    	}
    
    
    	@Override
    	public void run() {
    		System.out.println("start Consumer id= "+ Thread .currentThread().getId());
    		Random r = new Random();
    		
    			try {
    				//提取任务
    				while(true){
    					PCData data= queue.take();
    					if(null!= data){
    						//计算平方
    						int re= data.getData()*data.getData();
    						System.out.println(MessageFormat.format("{0}*{1}={2}",
    									data.getData(),data.getData(),re
    								));
    						Thread.sleep(r.nextInt(SLEEPTIME));
    												
    					}
    				}
    			} catch (InterruptedException e) {				
    				e.printStackTrace();
    				Thread.currentThread().interrupt();
    			}
    			
    		
    		
    	}
    	
    	
    
    	
    
    }
    </span>

    (3)PCData共享数据模型:

    <span style="font-size:18px;">package ProducerConsumer;
    
    public  final class PCData {
    
    	private final int intData;
    
    	public PCData(int d) {
    		intData=d;
    	}
    	
    	public PCData(String  d) {
    		intData=Integer.valueOf(d);
    	}
    	
    	public int getData(){
    		
    		return intData;
    		
    	}
    	@Override
    	public String toString(){
    		return "data:"+ intData ;
    	}
    	
    }
    </span>


    (4)Main函数:

    <span style="font-size:18px;">package ProducerConsumer;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.Executor;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingDeque;
    
    public class Main {
    
    	/**
    	 * @param args
    	 */
    	public static void main(String[] args)  throws InterruptedException{
    		//建立缓冲区
    		BlockingQueue<PCData> queue = new LinkedBlockingDeque<PCData>(10);
    		//建立生产者
    		Producer producer1 = new Producer(queue);
    		Producer producer2 = new Producer(queue);
    		Producer producer3 = new Producer(queue);
    		
    		//建立消费者
    		Consumer consumer1 = new Consumer(queue);
    		Consumer consumer2 = new Consumer(queue);
    		Consumer consumer3 = new Consumer(queue);		
    				
    		//建立线程池
    		ExecutorService service = Executors.newCachedThreadPool();
    		
    		//执行生产者
    		service.execute(producer1);
    		service.execute(producer2);
    		service.execute(producer3);
    		//执行消费者
    		service.execute(consumer1);
    		service.execute(consumer2);
    		service.execute(consumer3);
    	
    		Thread.sleep(10*1000);
    		
    		//停止生产者
    		producer1.stop();
    		producer2.stop();
    		producer3.stop();
    		
    		Thread.sleep(3000);
    		service.shutdown();
    	}
    
    }
    </span>


    三、注意:

        volatilekeyword:Volatile修饰的成员变量在每次被线程訪问时,都强迫从共享内存中重读该成员变量的值。并且,当成员变量发生变化时。强迫线程将变化值回写到共享内存。这样在不论什么时刻,两个不同的线程总是看到某个成员变量的同一个值。

        生产-消费模式的核心组件是共享内存缓冲区,是两者的通信桥梁,起到解耦作用,优化系统总体结构。

        因为缓冲区的存在,生产者和消费者。不管谁在某一局部时间内速度相对较高,都能够使用缓冲区得到缓解,保证系统正常执行,这在一定程度上缓解了性能瓶颈对系统系能的影响。


  • 相关阅读:
    移动端web
    递归求和
    json的基础了解
    冒泡排序的编程方法
    js面向对象
    1002,javascript的原型属性
    1001,instanceof关键字以及typeof关键字
    19,简述一下src与href的区别(不懂)
    531,<form>action属性
    530,css outline属性
  • 原文地址:https://www.cnblogs.com/cynchanpin/p/6856462.html
Copyright © 2011-2022 走看看