生产者不断向队列中添加数据,消费者不断从队列中获取数据。如果队列满了,则生产者不能添加数据;如果队列为空,则消费者不能获取数据。借助实现了BlockingQueue接口的LinkedBlockingQueue来模拟同步。
1 import java.util.Random; 2 import java.util.concurrent.BlockingQueue; 3 import java.util.concurrent.LinkedBlockingQueue; 4 5 /** 6 * 生产者消费者模式:使用{@link java.util.concurrent.BlockingQueue}实现 7 */ 8 public class Main{ 9 private static final int CAPACITY = 5; 10 11 public static void main(String args[]){ 12 BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(CAPACITY); 13 14 Thread producer1 = new Producer("P-1", blockingQueue, CAPACITY); 15 Thread producer2 = new Producer("P-2", blockingQueue, CAPACITY); 16 Thread consumer1 = new Consumer("C1", blockingQueue, CAPACITY); 17 Thread consumer2 = new Consumer("C2", blockingQueue, CAPACITY); 18 Thread consumer3 = new Consumer("C3", blockingQueue, CAPACITY); 19 20 producer1.start(); 21 producer2.start(); 22 consumer1.start(); 23 consumer2.start(); 24 consumer3.start(); 25 } 26 27 /** 28 * 生产者 29 */ 30 public static class Producer extends Thread{ 31 private BlockingQueue<Integer> blockingQueue; 32 String name; 33 int maxSize; 34 int i = 0; 35 36 public Producer(String name, BlockingQueue<Integer> queue, int maxSize){ 37 super(name); 38 this.name = name; 39 this.blockingQueue = queue; 40 this.maxSize = maxSize; 41 } 42 43 @Override 44 public void run(){ 45 while(true){ 46 try { 47 blockingQueue.put(i); 48 System.out.println("[" + name + "] Producing value : " + i); 49 i++; 50 51 //暂停最多1秒 52 Thread.sleep(new Random().nextInt(1000)); 53 } catch (InterruptedException e) { 54 e.printStackTrace(); 55 } 56 } 57 58 } 59 } 60 61 /** 62 * 消费者 63 */ 64 public static class Consumer extends Thread{ 65 private BlockingQueue<Integer> blockingQueue; 66 String name; 67 int maxSize; 68 69 public Consumer(String name, BlockingQueue<Integer> queue, int maxSize){ 70 super(name); 71 this.name = name; 72 this.blockingQueue = queue; 73 this.maxSize = maxSize; 74 } 75 76 @Override 77 public void run(){ 78 while(true){ 79 try { 80 int x = blockingQueue.take(); 81 System.out.println("[" + name + "] Consuming : " + x); 82 83 //暂停最多1秒 84 Thread.sleep(new Random().nextInt(1000)); 85 } catch (InterruptedException e) { 86 e.printStackTrace(); 87 } 88 } 89 } 90 } 91 }
输出结果:
参考资料