zoukankan      html  css  js  c++  java
  • 经典并发问题:生产者-消费者

    前置知识

    此处以Java描述该问题,需要Java并发的知识,这里附上一些不错的教程:

    Youtube上的一个教程,非常实战,能快速找到感觉。

    Oracle的并发官方教程,很全面,但不深入。

    国人的Java并发系列教程,很全面,但有地方深入到源码,有的浅停在使用方法。

    免费的翻译书,没有看,据说不错。

    问题描述

    生产者-消费者(Producer-Consumer Problem)以下简称为PC问题。其描述以下问题:

    • 多个生产者生产产品,多个消费者消费产品,两者间有一个大小固定的缓冲区;
    • 生产者、消费者不可同时访问缓冲区;
    • 生产者不可向满缓冲区放产品;
    • 消费者不可从空缓冲区取产品。

    信号量解法

    public class PCSemaphore {
    	private final static int BUFFER_SIZE = 10;
    
    	public static void main(String[] args) {
    		Semaphore mutex = new Semaphore(1);
    		Semaphore full = new Semaphore(0);
    		Semaphore empty = new Semaphore(BUFFER_SIZE);
    
    		Queue<Integer> buffer = new LinkedList<>();
    
    		Producer producer = new Producer(mutex, empty, full, buffer);
    		Consumer consumer = new Consumer(mutex, empty, full, buffer);
    
    		// 可以初始化多个生产者、消费者
    		new Thread(producer, "p1").start();
    		new Thread(producer, "p2").start();
    		new Thread(consumer, "c1").start();
    		new Thread(consumer, "c2").start();
    		new Thread(consumer, "c3").start();
    	}
    }
    
    class Producer implements Runnable {
    	private Semaphore mutex, empty, full;
    	private Queue<Integer> buffer;
    	private Integer counter = 0;
    
    	public Producer(Semaphore mutex, Semaphore empty, Semaphore full, Queue<Integer> buffer) {
    		this.mutex = mutex;
    		this.empty = empty;
    		this.full = full;
    		this.buffer = buffer;
    	}
    
    	@Override
    	public void run() {
    		while (true) {
    			try {
    				empty.acquire();
    				mutex.acquire();
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    
    			buffer.offer(counter++);
    			
    			try {
    				Thread.sleep(100);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    
    			full.release();
    			mutex.release();
    		}
    	}
    }
    
    class Consumer implements Runnable {
    	private Semaphore mutex, empty, full;
    	private Queue<Integer> buffer;
    
    	public Consumer(Semaphore mutex, Semaphore empty, Semaphore full, Queue<Integer> buffer) {
    		this.mutex = mutex;
    		this.empty = empty;
    		this.full = full;
    		this.buffer = buffer;
    	}
    
    	@Override
    	public void run() {
    		String threadName = Thread.currentThread().getName();
    		
    		while (true) {
    			try {
    				full.acquire();
    				mutex.acquire();
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    
    			Integer product = buffer.poll();
    			int left = buffer.size();
    			System.out.printf("%s consumed %d left %d%n", threadName, product, left);
    			
    			try {
    				Thread.sleep(200);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    
    			empty.release();
    			mutex.release();
    		}
    	}
    }
    

    教科书的解法,问题有三:

    1. 不可交换消费者中的full和mutex信号量的请求顺序,若我们交换他们则会有:
      image_1camh79sc162efh2a42ir2eq19.png-20.5kB
      同样地不可交换生产者中的empty和mutex信号量的请求顺序。
      即必须遵守先资源信号量,再互斥信号量的请求顺序。
      教科书中使用AND型信号量解决该问题,但这对程序员来说算是一个Dirty Solution,为什么呢,见问题2。

    2. 将进程控制部分和业务逻辑放在一起,这种代码看着混乱不堪。

    3. 效率低,Java中是不会用信号量来实现这东西,信号量在Java中常用来限制对一个共享资源的最大并发访问数。

    注意,因为这里写的写的是一个小示例,所以我们没有考虑产品的生产、使用和入队、出队(放入、拿出缓存区的过程)。但实际中我们只需要将产品的入队和出队进行互斥即可,产品的生产和使用可并发执行,甚至在每个生产者、消费者内部可建立单独的缓冲区,暂存生产出来但还不能放到公共缓冲区的产品,直到可以放入公共缓冲区。

    wait() & notify()

    public class PCWaitNotify {
    	public final static int BUFFER_SIZE = 10;
    
    	public static void main(String[] args) {
    		Object mutex = new Object();
    		AtomicInteger counter = new AtomicInteger(0);
    		Queue<Integer> buffer = new LinkedList<>();
    
    		Producer producer = new Producer(buffer, counter, mutex);
    		Consumer consumer = new Consumer(buffer, mutex);
    
    		new Thread(producer).start();
    		new Thread(producer).start();
    		new Thread(consumer).start();
    	}
    }
    
    class Producer implements Runnable {
    	private Random rand = new Random();
    	private Queue<Integer> buffer;
    	private AtomicInteger counter; // 支持原子操作的基本类型包装类
    	private Object mutex;
    
    	public Producer(Queue<Integer> buffer, AtomicInteger counter, Object mutex) {
    		this.buffer = buffer;
    		this.counter = counter;
    		this.mutex = mutex;
    	}
    
    	@Override
    	public void run() {
    		while (true) {
    			synchronized (mutex) {
    				while (buffer.size() == PCWaitNotify.BUFFER_SIZE) {
    					try {
    						mutex.wait();
    					} catch (InterruptedException e) {
    						e.printStackTrace();
    					}
    				}
    
    				buffer.offer(counter.incrementAndGet());
    				mutex.notify();
    			}
    			
    			try {
    				Thread.sleep(rand.nextInt(800));
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    }
    
    class Consumer implements Runnable {
    	private Random rand = new Random();
    	private Queue<Integer> buffer;
    	private Object mutex;
    
    	public Consumer(Queue<Integer> buffer, Object mutex) {
    		this.buffer = buffer;
    		this.mutex = mutex;
    	}
    
    	@Override
    	public void run() {
    		while (true) {
    			synchronized (mutex) {
    				while (buffer.size() == 0) {
    					try {
    						mutex.wait();
    					} catch (InterruptedException e) {
    						e.printStackTrace();
    					}
    				}
    				
    				System.out.println("consumed " + buffer.poll() + " left " + buffer.size());
    				mutex.notify();
    			}
    			try {
    				Thread.sleep(rand.nextInt(500));
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    }
    

    以Java的底层并发API,wait()和notify()实现,效率虽高,但进程控制部分和业务逻辑同样混在一起,没有完全解决问题。

    BlockingQueue

    public class PCBlockingQueue {
    	private final static int BUFFER_SIZE = 10;
    
    	public static void main(String[] args) {
    		BlockingQueue<Integer> buffer = new LinkedBlockingQueue<>(BUFFER_SIZE);
    		AtomicInteger counter = new AtomicInteger(0);
    		
    		Producer producer = new Producer(buffer, counter);
    		Consumer consumer = new Consumer(buffer);
    		
    		new Thread(producer).start();
    		new Thread(producer).start();
    		new Thread(consumer).start();
    	}
    }
    
    class Producer implements Runnable {
    	private Random rand = new Random();
    	private AtomicInteger counter;
    	private BlockingQueue<Integer> buffer;
    
    	public Producer(BlockingQueue<Integer> buffer, AtomicInteger counter) {
    		this.buffer = buffer;
    		this.counter = counter;
    	}
    
    	@Override
    	public void run() {
    		while (true) {
    			try {
    				Thread.sleep(rand.nextInt(800));
    				Integer product = counter.incrementAndGet();
    				buffer.put(product);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    }
    
    class Consumer implements Runnable {
    	private Random rand = new Random();
    	private BlockingQueue<Integer> buffer;
    	
    	public Consumer(BlockingQueue<Integer> buffer) {
    		this.buffer = buffer;
    	}
    
    	@Override
    	public void run() {
    		while (true) {
    			try {
    				Thread.sleep(rand.nextInt(600));
    				Integer product = buffer.take(); // 队列空时,会阻塞,直到有新元素,并将新元素返回。
    				System.out.println("consumed " + product + " left " + buffer.size());
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    }
    

    你觉得像Java这样的企业级语言会不考虑到代码的功能分离问题吗(SRP原则)?Java中早就提供了一堆线程安全的数据结构,这里用了BlockingQueue,其他线程安全类参考java.util.concurrent包。

  • 相关阅读:
    【BZOJ 1185】 凸包+旋转卡壳
    【BZOJ 2829】 2829: 信用卡凸包 (凸包)
    【BZOJ 1045】 1045: [HAOI2008] 糖果传递
    【BZOJ 2453|bzoj 2120】 2453: 维护队列 (分块+二分)
    【BZOJ 3343 】 分块
    【BZOJ 1069】 凸包+旋转卡壳
    【NOIP 2016 总结】
    【无聊放个模板系列】洛谷 负环 模板
    【无聊放个模板系列】BZOJ 3172 (AC自动机)
    【无聊放个模板系列】HDU 3506 (四边形不等式优化DP-经典石子合并问题[环形])
  • 原文地址:https://www.cnblogs.com/sequix/p/8776716.html
Copyright © 2011-2022 走看看