package demo7.MQ;public class QueueData {private int id;private String name;private String taskCode;public QueueData() {}public QueueData(int id, String name, String taskCode) {this.id = id;this.name = name;this.taskCode = taskCode;}public int getId() {return id;}public void setId(int id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getTaskCode() {return taskCode;}public void setTaskCode(String taskCode) {this.taskCode = taskCode;}}
package demo7.MQ;import java.util.Random;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;public class Provider implements Runnable {//共享缓存区private BlockingQueue<QueueData> queue;//多线程释放启动?private volatile boolean isRunning = true;//ID生成器private static AtomicInteger count = new AtomicInteger();//生产随机对象private static Random random = new Random();public Provider(BlockingQueue<QueueData> queue) {this.queue = queue;}@Overridepublic void run() {while (isRunning){try {//随机休眠 - 1000 表示读取数据、生产数据的耗时Thread.sleep(random.nextInt(1000));//incrementAndGet 进行累加int id = count.incrementAndGet();QueueData queueData = new QueueData(id,"任务"+String.valueOf(id),String.valueOf(id).hashCode()+"");System.err.println("线程:"+Thread.currentThread().getName()+" 生产task:"+queueData.getName()+" "+queueData.getId());if (!queue.offer(queueData,2, TimeUnit.SECONDS)){System.err.println("!!!!!!!!!生产数据失败 error");}} catch (InterruptedException e) {e.printStackTrace();}}}public void stop(){this.isRunning=false;}}
package demo7.MQ;import java.util.Random;import java.util.WeakHashMap;import java.util.concurrent.BlockingQueue;public class Consumer implements Runnable{private BlockingQueue<QueueData> queue;public Consumer(BlockingQueue<QueueData> queue) {this.queue = queue;}private static Random random = new Random();@Overridepublic void run() {while (true){try {//take:无阻塞QueueData queueData = this.queue.take();Thread.sleep(random.nextInt(1000));System.err.println("线程:"+Thread.currentThread().getName()+" 消费task->:"+queueData.getName()+" "+queueData.getId());} catch (InterruptedException e) {e.printStackTrace();}}}}
package demo7.MQ;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;public class MainMQ {/*** 生产者、消费者(多线程模式)* 1.生产、消费:通常由2类线程,即若干了生产者的线程、若干个消费者的线程、* 2.生产者线程负责提交用户请求、消费者线程负责处理生产者提交的任务请求* 3.生产者、消费者之间通过共享内存缓存进行通信*/public static void main(String[] args) {//1.内存缓存区BlockingQueue<QueueData> queueData = new LinkedBlockingQueue<QueueData>();//2.生产者Provider p1 = new Provider(queueData);Provider p2 = new Provider(queueData);Provider p3 = new Provider(queueData);//3.消费者Consumer c1 = new Consumer(queueData);Consumer c2 = new Consumer(queueData);Consumer c3 = new Consumer(queueData);//创建【线程池】运行,可以创建n个线程,没有任务的时候不创建线程,空闲线程存活时间为60s(默认)ExecutorService executorService = Executors.newCachedThreadPool();executorService.execute(p1);executorService.execute(p2);executorService.execute(p3);executorService.execute(c1);executorService.execute(c2);executorService.execute(c3);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}p1.stop();p2.stop();p3.stop();try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}executorService.shutdown();}}