生产者-消费者模式是一个经典的多线程设计模式,它为多线程间的协作提供了良好的解决方案。在生产者-消费者模式中,通常有两类线程,即若干个生产者线程和若干个消费者线程。生产者线程负责提交用户请求,消费者线程负责具体处理生产者提交的任务。生产者和消费者之间通过共享内存缓冲区进行通信。
生产者-消费者模式的核心组件是共享内存缓冲区,它作为生产者和消费者时间通信的桥梁,避免了生产者和消费者之间直接通信,从而将生产者和消费者时间进行解耦。生产者不需要知道消费者的存在,消费者也不需要知道生产者的存在。同时由于缓冲区的存在,允许生产者和消费者在执行速度上存在时间差,无论是生产者在某一局部时间内速度高于消费者,还是消费者在某一局部时间内速度高于生产者,都可以通过共享内存缓冲区得到缓解,确保系统正常运行。
生产者-消费者模式的主要角色输入下表所示:
生产者 | 用于提交用户请求,提取用户任务,并装入内存缓冲区 |
消费者 | 在内存缓冲区中提取并处理任务 |
内存缓冲区 | 缓存生产者提交的任务或者数据,供消费者使用 |
任务 | 生产者向内存缓冲区提交的数据结构 |
Main | 使用生产者和消费者的客户端 |
下图显示了生产者-消费者模式一种实现的具体结构:
其中,BlockingQueue充当了共享内存缓冲区,用于维护任务或数据队列(PCData对象)。PCData对象表示一个生产任务或者相关任务的数据。生产者对象和消费者对象均引用同一个BlockingQueue对象。生产者负责创建PCData,并将它加入到BlockingQueue对象中,消费者则从同一个BlockingQueue中获取PCData,并执行完该任务。
下面代码实现了基于生产者-消费者模式的求整数平方的并行程序。
1 public class PCData { 2 private final int intData;//数据 3 4 public PCData(int intData){ 5 this.intData = intData; 6 } 7 8 public PCData(String d){ 9 intData = Integer.valueOf(d); 10 } 11 12 public int getIntData(){ 13 return intData; 14 } 15 16 @Override 17 public String toString(){ 18 return "data:" + intData; 19 } 20 }
1 public class Producer implements Runnable { 2 3 private volatile boolean isRunning = true; 4 //内存缓冲区 5 private BlockingQueue<PCData> queue; 6 //总数,原子操作 7 private static AtomicInteger count = new AtomicInteger(); 8 private static final int SLEEP_TIME = 1000; 9 10 public Producer(BlockingQueue<PCData> queue){ 11 this.queue = queue; 12 } 13 14 @Override 15 public void run() { 16 PCData data = null; 17 Random random = new Random(); 18 19 System.out.println("start producer id = " + Thread.currentThread().getId()); 20 while (isRunning){ 21 try { 22 Thread.sleep(random.nextInt(SLEEP_TIME)); 23 data = new PCData(count.incrementAndGet());//构造任务数据 24 System.out.println(data + " is put into queue!"); 25 if (!queue.offer(data,2, TimeUnit.SECONDS)){//提交数据到缓冲区,offer(),当队列满时,直接返回false。 26 System.out.println("failed to put data:" + data); 27 } 28 } catch (InterruptedException e) { 29 e.printStackTrace(); 30 Thread.currentThread().interrupt(); 31 } 32 } 33 } 34 35 public void stop(){ 36 isRunning = false; 37 } 38 }
public class Customer implements Runnable { private BlockingQueue<PCData> queue;//缓冲区 private static final int SLEEP_TIME = 1000; public Customer(BlockingQueue<PCData> queue){ this.queue = queue; } @Override public void run() { System.out.println("start customer id = " + Thread.currentThread().getId()); Random random = new Random(); while (true){ try { PCData data = queue.poll();//提取数据 if (null != data){ int re = data.getIntData() * data.getIntData();//计算平方 System.out.println(MessageFormat.format("{0}*{1}={2}",data.getIntData(),data.getIntData(),re)); Thread.sleep(random.nextInt(SLEEP_TIME)); } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } } } }
public class Client{ //测试 public static void main(String[] args) throws InterruptedException { //建立缓冲区 BlockingQueue<PCData> queue = new LinkedBlockingQueue<PCData>(10); Producer producer1 = new Producer(queue);//生产者 Producer producer2 = new Producer(queue); Producer producer3 = new Producer(queue); Customer customer1 = new Customer(queue);//消费者 Customer customer2 = new Customer(queue); Customer customer3 = new Customer(queue); ExecutorService es = Executors.newCachedThreadPool(); es.execute(producer1);//运行生产者 es.execute(producer2); es.execute(producer3); es.execute(customer1);//运行消费者 es.execute(customer2); es.execute(customer3); Thread.sleep(1000); producer1.stop();//停止生产者 producer2.stop(); producer3.stop(); Thread.sleep(3000); es.shutdown(); } }
输出结果:
start producer id = 11 start customer id = 14 start customer id = 15 start producer id = 12 start customer id = 16 start producer id = 13 data:1 is put into queue! 1*1=1 data:2 is put into queue! 2*2=4 data:3 is put into queue! 3*3=9 data:4 is put into queue! 4*4=16 data:5 is put into queue! 5*5=25 data:6 is put into queue! 6*6=36 data:7 is put into queue! 7*7=49 data:8 is put into queue! 8*8=64 data:9 is put into queue! 9*9=81
上述代码很简单,看过文章数据共享通道:BlockingQueue后,注释也比较详细,就不再赘述代码的意思了。
总结:
生产者-消费者模式很好的对生产者和消费者进行解耦,优化了系统整体结构。同时,由于缓冲区的作用,允许生产者线程和消费者线程存在执行上的性能差异,从一定的程度上缓解了性能瓶颈对系统性能的影响。