zoukankan      html  css  js  c++  java
  • 生产者消费者模式实现

    生产者与消费者解耦,典型应用:MQ。不多解释,code talking:


    调用模块:

    package com.array7.ds.pc;
    
    import java.util.concurrent.BlockingDeque;
    import java.util.concurrent.LinkedBlockingDeque;
    
    public class Run {
    	public static void main(String[] args) {
    		BlockingDeque<Integer> queue = new LinkedBlockingDeque<Integer>(50);
    		int stopConditon = 100;
    		new Productor(queue).begin(stopConditon);
    		new Customer(queue).begin(stopConditon);
    		
    	}
    }
    


    生产者模块:

    package com.array7.ds.pc;
    
    import java.util.concurrent.BlockingDeque;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class Productor {
    	static AtomicInteger flag = new AtomicInteger(0);
    	private BlockingDeque<Integer> queue;
    
    	public Productor(BlockingDeque<Integer> queue) {
    		this.queue = queue;
    	}
    
    	public void begin(int stopConditon) {
    		new Thread(new Create(queue, stopConditon)).start();
    	}
    
    	public static class Create implements Runnable {
    		private BlockingDeque<Integer> queue;
    		private int stopConditon;
    
    		public Create(BlockingDeque<Integer> queue, int stopConditon) {
    			this.queue = queue;
    			this.stopConditon = stopConditon;
    		}
    
    		@Override
    		public void run() {
    			while (true) {
    				try {
    					Thread.sleep(5);
    					int i = flag.incrementAndGet();
    					if (i == stopConditon) {
    						queue.put(i);
    						System.out.println("stop put element >>> ");
    						break;
    					}
    					queue.put(i);
    					System.out.println("put element >>> " + i);
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    		}
    
    	}
    }
    


    消费者模块

    package com.array7.ds.pc;
    
    import java.util.concurrent.BlockingDeque;
    
    public class Customer {
    	BlockingDeque<Integer> queue;
    
    	public Customer(BlockingDeque<Integer> queue) {
    		this.queue = queue;
    	}
    
    	public void begin(int stopConditon) {
    		new Thread(new Get(queue, stopConditon)).start();
    	}
    
    	public static class Get implements Runnable {
    		private BlockingDeque<Integer> queue;
    		private int stopConditon;
    
    		public Get(BlockingDeque<Integer> queue, int stopConditon) {
    			this.queue = queue;
    			this.stopConditon = stopConditon;
    		}
    
    		@Override
    		public void run() {
    			while (true) {
    				try {
    
    					Thread.sleep(10);
    					Integer e = queue.take();
    					if (e != null) {
    						if (e.intValue() == stopConditon) {
    							System.out.println("stop get element >>>>>> ");
    							break;
    						}
    						System.out.println("get element >>>>>> " + e);
    					}
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    		}
    
    	}
    }
    



    版权声明:本文为博主原创文章,未经博主允许不得转载。

  • 相关阅读:
    cmd 新建空文件
    查看Linux版本
    centos7时间调整
    正确卸载vs2015及以前版本方式
    vs2017,vs2019 无法连接到Web服务器“IIS Express”
    .netcore开发环境和服务器注意事项
    .netcore 网站启动后 502.5
    CentOS7开机报错piix4_smbus ****host smbus controller not enabled
    centos7 升级系统后,启动界面出现多个选项
    .gitkeep文件
  • 原文地址:https://www.cnblogs.com/liushijie/p/4712910.html
Copyright © 2011-2022 走看看