zoukankan      html  css  js  c++  java
  • java学习之生产者消费者模式

            在并发编程中,生产者消费者模式是一种很常见的一种模式。生产者和消费者模式是有两类线程,即若干个生产者线程和若干个消费者线程,生产者线程负责提交用户请求,消费者线程负责处理消费者提交的任务,在生产者和消费者之间通过共享缓存区进行通信。

            生产者消费者模式的核心组件为共享内存缓存区,它作为生产者和消费者通信的桥梁,避免了生产者和消费者直接通信,从而将生产者和消费者直接解耦。生产者不需要指导消费者的存在,消费者也不需要知道生产者的存在。同时由于内存缓冲区的存在,允许生产者和消费者在执行速度上存在时间差,无论是生产者在某一局部时间内速度高于消费者,还是消费者在局部时间内高于生产者,都可以通过共享缓存区得到缓解,确保系统的正常运行。

    该模式的架构为:

        

    生产者:

     1 package com.css.demo;
     2 
     3 import java.util.Random;
     4 import java.util.concurrent.BlockingQueue;
     5 import java.util.concurrent.TimeUnit;
     6 import java.util.concurrent.atomic.AtomicInteger;
     7 
     8 public class Provider implements Runnable{
     9     
    10     //共享缓存区
    11     private BlockingQueue<Data> queue;
    12     //多线程间是否启动变量,有强制从主内存中刷新的功能。即时返回线程的状态
    13     private volatile boolean isRunning = true;
    14     //id生成器
    15     private static AtomicInteger count = new AtomicInteger();
    16     //随机对象
    17     private static Random r = new Random(); 
    18     
    19     public Provider(BlockingQueue queue){
    20         this.queue = queue;
    21     }
    22 
    23     @Override
    24     public void run() {
    25         while(isRunning){
    26             try {
    27                 //随机休眠0 - 1000 毫秒 表示获取数据(产生数据的耗时) 
    28                 Thread.sleep(r.nextInt(1000));
    29                 //获取的数据进行累计...
    30                 int id = count.incrementAndGet();
    31                 //比如通过一个getData方法获取了
    32                 Data data = new Data(Integer.toString(id), "数据" + id);
    33                 System.out.println("当前线程:" + Thread.currentThread().getName() + ", 获取了数据,id为:" + id + ", 进行装载到公共缓冲区中...");
    34                 if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){
    35                     System.out.println("提交缓冲区数据失败....");
    36                     //do something... 比如重新提交
    37                 }
    38             } catch (InterruptedException e) {
    39                 e.printStackTrace();
    40             }
    41         }
    42     }
    43     
    44     public void stop(){
    45         this.isRunning = false;
    46     }
    47     
    48 }

    消费者:

     1 package com.ssc.demo;
     2 
     3 import java.util.Random;
     4 import java.util.concurrent.BlockingQueue;
     5 import java.util.concurrent.TimeUnit;
     6 
     7 public class Consumer implements Runnable{
     8 
     9     private BlockingQueue<Data> queue;
    10     
    11     public Consumer(BlockingQueue queue){
    12         this.queue = queue;
    13     }
    14     
    15     //随机对象
    16     private static Random r = new Random(); 
    17 
    18     @Override
    19     public void run() {
    20         while(true){
    21             try {
    22                 //获取数据
    23                 Data data = this.queue.take();
    24                 //进行数据处理。休眠0 - 1000毫秒模拟耗时
    25                 Thread.sleep(r.nextInt(1000));
    26                 System.out.println("当前消费线程:" + Thread.currentThread().getName() + ", 消费成功,消费数据为id: " + data.getId());
    27             } catch (InterruptedException e) {
    28                 e.printStackTrace();
    29             }
    30         }
    31     }
    32 }

    主要类:

     1 package com.ssc,demo;
     2 
     3 import java.util.concurrent.BlockingQueue;
     4 import java.util.concurrent.ExecutorService;
     5 import java.util.concurrent.Executors;
     6 import java.util.concurrent.LinkedBlockingQueue;
     7 
     8 public class Main {
     9 
    10     public static void main(String[] args) throws Exception {
    11         //内存缓冲区
    12         BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10);
    13         //生产者
    14         Provider p1 = new Provider(queue);
    15         
    16         Provider p2 = new Provider(queue);
    17         Provider p3 = new Provider(queue);
    18         //消费者
    19         Consumer c1 = new Consumer(queue);
    20         Consumer c2 = new Consumer(queue);
    21         Consumer c3 = new Consumer(queue);
    22         //创建线程池运行,这是一个缓存的线程池,可以创建无穷大的线程,没有任务的时候不创建线程。空闲线程存活时间为60s(默认值)
    23 
    24         ExecutorService cachePool = Executors.newCachedThreadPool();
    25         cachePool.execute(p1);
    26         cachePool.execute(p2);
    27         cachePool.execute(p3);
    28         cachePool.execute(c1);
    29         cachePool.execute(c2);
    30         cachePool.execute(c3);
    31 
    32         try {
    33             Thread.sleep(3000);
    34         } catch (InterruptedException e) {
    35             e.printStackTrace();
    36         }
    37         p1.stop();
    38         p2.stop();
    39         p3.stop();
    40         try {
    41             Thread.sleep(2000);
    42         } catch (InterruptedException e) {
    43             e.printStackTrace();
    44         }        
    45 //        cachePool.shutdown(); 
    46 //        cachePool.shutdownNow();
    47         
    48 
    49     }
    50     
    51 }

    对象:

     1 package com.ssc.demo;
     2 
     3 public final class Data {
     4 
     5     private String id;
     6     private String name;
     7     
     8     public Data(String id, String name){
     9         this.id = id;
    10         this.name = name;
    11     }
    12     
    13     public String getId() {
    14         return id;
    15     }
    16 
    17     public void setId(String id) {
    18         this.id = id;
    19     }
    20 
    21     public String getName() {
    22         return name;
    23     }
    24 
    25     public void setName(String name) {
    26         this.name = name;
    27     }
    28 
    29     @Override
    30     public String toString(){
    31         return "{id: " + id + ", name: " + name + "}";
    32     }
    33     
    34 }

    主要参考:《白鹤翔视频》

    生于忧患,死于安乐
  • 相关阅读:
    08-蓝图&单元测试
    07-多对多操作
    JAVA 判断字符串是否可转化为JSONObject、JSONArray
    git常用命令
    线程和进程的一些基本概念
    JavaWeb学习总结(七):通过Servlet生成验证码及其应用 (BufferedImage类)
    JavaWeb学习总结(十):Session简单使用
    Ubuntu16.04 安装maven
    Ubuntu16.04 安装redis
    Ubuntu16.04 下安装tomcat
  • 原文地址:https://www.cnblogs.com/songlove/p/14962408.html
Copyright © 2011-2022 走看看