在并发编程中,生产者消费者模式是一种很常见的一种模式。生产者和消费者模式是有两类线程,即若干个生产者线程和若干个消费者线程,生产者线程负责提交用户请求,消费者线程负责处理消费者提交的任务,在生产者和消费者之间通过共享缓存区进行通信。
生产者消费者模式的核心组件为共享内存缓存区,它作为生产者和消费者通信的桥梁,避免了生产者和消费者直接通信,从而将生产者和消费者直接解耦。生产者不需要指导消费者的存在,消费者也不需要知道生产者的存在。同时由于内存缓冲区的存在,允许生产者和消费者在执行速度上存在时间差,无论是生产者在某一局部时间内速度高于消费者,还是消费者在局部时间内高于生产者,都可以通过共享缓存区得到缓解,确保系统的正常运行。
该模式的架构为:
生产者:
1 package com.css.demo; 2 3 import java.util.Random; 4 import java.util.concurrent.BlockingQueue; 5 import java.util.concurrent.TimeUnit; 6 import java.util.concurrent.atomic.AtomicInteger; 7 8 public class Provider implements Runnable{ 9 10 //共享缓存区 11 private BlockingQueue<Data> queue; 12 //多线程间是否启动变量,有强制从主内存中刷新的功能。即时返回线程的状态 13 private volatile boolean isRunning = true; 14 //id生成器 15 private static AtomicInteger count = new AtomicInteger(); 16 //随机对象 17 private static Random r = new Random(); 18 19 public Provider(BlockingQueue queue){ 20 this.queue = queue; 21 } 22 23 @Override 24 public void run() { 25 while(isRunning){ 26 try { 27 //随机休眠0 - 1000 毫秒 表示获取数据(产生数据的耗时) 28 Thread.sleep(r.nextInt(1000)); 29 //获取的数据进行累计... 30 int id = count.incrementAndGet(); 31 //比如通过一个getData方法获取了 32 Data data = new Data(Integer.toString(id), "数据" + id); 33 System.out.println("当前线程:" + Thread.currentThread().getName() + ", 获取了数据,id为:" + id + ", 进行装载到公共缓冲区中..."); 34 if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){ 35 System.out.println("提交缓冲区数据失败...."); 36 //do something... 比如重新提交 37 } 38 } catch (InterruptedException e) { 39 e.printStackTrace(); 40 } 41 } 42 } 43 44 public void stop(){ 45 this.isRunning = false; 46 } 47 48 }
消费者:
1 package com.ssc.demo; 2 3 import java.util.Random; 4 import java.util.concurrent.BlockingQueue; 5 import java.util.concurrent.TimeUnit; 6 7 public class Consumer implements Runnable{ 8 9 private BlockingQueue<Data> queue; 10 11 public Consumer(BlockingQueue queue){ 12 this.queue = queue; 13 } 14 15 //随机对象 16 private static Random r = new Random(); 17 18 @Override 19 public void run() { 20 while(true){ 21 try { 22 //获取数据 23 Data data = this.queue.take(); 24 //进行数据处理。休眠0 - 1000毫秒模拟耗时 25 Thread.sleep(r.nextInt(1000)); 26 System.out.println("当前消费线程:" + Thread.currentThread().getName() + ", 消费成功,消费数据为id: " + data.getId()); 27 } catch (InterruptedException e) { 28 e.printStackTrace(); 29 } 30 } 31 } 32 }
主要类:
1 package com.ssc,demo; 2 3 import java.util.concurrent.BlockingQueue; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 import java.util.concurrent.LinkedBlockingQueue; 7 8 public class Main { 9 10 public static void main(String[] args) throws Exception { 11 //内存缓冲区 12 BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10); 13 //生产者 14 Provider p1 = new Provider(queue); 15 16 Provider p2 = new Provider(queue); 17 Provider p3 = new Provider(queue); 18 //消费者 19 Consumer c1 = new Consumer(queue); 20 Consumer c2 = new Consumer(queue); 21 Consumer c3 = new Consumer(queue); 22 //创建线程池运行,这是一个缓存的线程池,可以创建无穷大的线程,没有任务的时候不创建线程。空闲线程存活时间为60s(默认值) 23 24 ExecutorService cachePool = Executors.newCachedThreadPool(); 25 cachePool.execute(p1); 26 cachePool.execute(p2); 27 cachePool.execute(p3); 28 cachePool.execute(c1); 29 cachePool.execute(c2); 30 cachePool.execute(c3); 31 32 try { 33 Thread.sleep(3000); 34 } catch (InterruptedException e) { 35 e.printStackTrace(); 36 } 37 p1.stop(); 38 p2.stop(); 39 p3.stop(); 40 try { 41 Thread.sleep(2000); 42 } catch (InterruptedException e) { 43 e.printStackTrace(); 44 } 45 // cachePool.shutdown(); 46 // cachePool.shutdownNow(); 47 48 49 } 50 51 }
对象:
1 package com.ssc.demo; 2 3 public final class Data { 4 5 private String id; 6 private String name; 7 8 public Data(String id, String name){ 9 this.id = id; 10 this.name = name; 11 } 12 13 public String getId() { 14 return id; 15 } 16 17 public void setId(String id) { 18 this.id = id; 19 } 20 21 public String getName() { 22 return name; 23 } 24 25 public void setName(String name) { 26 this.name = name; 27 } 28 29 @Override 30 public String toString(){ 31 return "{id: " + id + ", name: " + name + "}"; 32 } 33 34 }
主要参考:《白鹤翔视频》