zoukankan      html  css  js  c++  java
  • 15.并发编程--多线程设计模式

    并发编程--多线程设计模式 - 生产者-消费者模式

    1. 生产者-消费者模式

    • 生产者和消费者也是一个非常经典的多线程模式,我们在实际中开发应用非常广泛的思想理念。在生产-消费模式中:通常由两类线程,即若干个生产者和若干个消费者的线程。
    • 生产者负责提交用户数据,消费者负责具体处理生产者提交的任务,在生产者和消费者之间通过共享内存缓存区进行通信。

    示例:下面示例为Future的原理实现;
    说明:看类注释即可明了 不明白可以去屎了

      1 //Data.java   * 首先看这个接口Data,数据.
      2   public final class Data {
      3 
      4       private String id;
      5       private String name;
      6 
      7       public Data(String id, String name){
      8           this.id = id;
      9           this.name = name;
     10       }
     11 
     12       public String getId() {
     13           return id;
     14       }
     15 
     16       public void setId(String id) {
     17           this.id = id;
     18       }
     19 
     20       public String getName() {
     21           return name;
     22       }
     23 
     24       public void setName(String name) {
     25           this.name = name;
     26       }
     27 
     28       @Override
     29       public String toString(){
     30           return "{id: " + id + ", name: " + name + "}";
     31       }
     32 
     33   }
     34   //Provider.java   * 然后再看Provider这个类,实现了Runnable接口,生产者真实的业务逻辑.
     35   public class Provider implements Runnable{
     36 
     37       //共享缓存区 理解为消息队列
     38       private BlockingQueue<Data> queue;
     39       //多线程间是否启动变量,有强制从主内存中刷新的功能。即时返回线程的状态
     40       private volatile boolean isRunning = true;
     41       //id生成器
     42       private static AtomicInteger count = new AtomicInteger();
     43       //随机对象
     44       private static Random r = new Random();
     45 
     46       public Provider(BlockingQueue queue){
     47           this.queue = queue;
     48       }
     49 
     50       @Override
     51       public void run() {
     52           while(isRunning){
     53               try {
     54                   //随机休眠0 - 1000 毫秒 表示获取数据(产生数据的耗时)
     55                   Thread.sleep(r.nextInt(1000));
     56                   //获取的数据进行累计...
     57                   int id = count.incrementAndGet();
     58                   //比如通过一个getData方法获取了
     59                   Data data = new Data(Integer.toString(id), "数据" + id);
     60                   System.out.println("当前线程:" + Thread.currentThread().getName() + ", 获取了数据,id为:" + id + ", 进行装载到公共缓冲区中...");
     61                   if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){
     62                       System.out.println("提交缓冲区数据失败....");
     63                       //do something... 比如重新提交
     64                   }
     65               } catch (InterruptedException e) {
     66                   e.printStackTrace();
     67               }
     68           }
     69       }
     70 
     71       public void stop(){
     72           this.isRunning = false;
     73       }
     74 
     75   }
     76 
     77   //Consumer.java*
     78   public class Consumer implements Runnable{
     79 
     80       private BlockingQueue<Data> queue;
     81 
     82       public Consumer(BlockingQueue queue){
     83           this.queue = queue;
     84       }
     85 
     86       //随机对象
     87       private static Random r = new Random();
     88 
     89       @Override
     90       public void run() {
     91           while(true){
     92               try {
     93                   //获取数据
     94                   Data data = this.queue.take();
     95                   //进行数据处理。休眠0 - 1000毫秒模拟耗时
     96                   Thread.sleep(r.nextInt(1000));
     97                   System.out.println("当前消费线程:" + Thread.currentThread().getName() + ", 消费成功,消费数据为id: " + data.getId());
     98               } catch (InterruptedException e) {
     99                   e.printStackTrace();
    100               }
    101           }
    102       }
    103   }
    104   //main方法
    105   public class Main {
    106 
    107       public static void main(String[] args) throws Exception {
    108           //内存缓冲区
    109           BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10);
    110           //生产者
    111           Provider p1 = new Provider(queue);
    112           Provider p2 = new Provider(queue);
    113           Provider p3 = new Provider(queue);
    114           //消费者
    115           Consumer c1 = new Consumer(queue);
    116           Consumer c2 = new Consumer(queue);
    117           Consumer c3 = new Consumer(queue);
    118           //创建线程池运行,这是一个缓存的线程池,可以创建无穷大的线程,没有任务的时候不创建线程。空闲线程存活时间为60s(默认值)
    119 
    120           ExecutorService cachePool = Executors.newCachedThreadPool();
    121           cachePool.execute(p1);
    122           cachePool.execute(p2);
    123           cachePool.execute(p3);
    124           cachePool.execute(c1);
    125           cachePool.execute(c2);
    126           cachePool.execute(c3);
    127           try {
    128               Thread.sleep(3000);
    129           } catch (InterruptedException e) {
    130               e.printStackTrace();
    131           }
    132           p1.stop();
    133           p2.stop();
    134           p3.stop();
    135           try {
    136               Thread.sleep(2000);
    137           } catch (InterruptedException e) {
    138               e.printStackTrace();
    139           }       
    140   //      cachePool.shutdown();
    141   //      cachePool.shutdownNow();
    142 
    143       }
    144   }
  • 相关阅读:
    mysql自动备份shell
    程序员,架构师有话对你说
    Chief Technology Officer
    读《对软件开发的一点心得体会》有感
    shell编程值之shell流程控制(7)
    shell编程值之正则表达式与字符截取(6)
    shell编程之环境变量配置文件(4)
    shell编程之运算符(3)
    shell编程之BASH变量(2)
    shell编程之SHELL基础(1)
  • 原文地址:https://www.cnblogs.com/Mao-admin/p/9989164.html
Copyright © 2011-2022 走看看