zoukankan      html  css  js  c++  java
  • 并发设计模式之生产者消费者设计模式

    主函数:

     1 package com.ietree.basicskill.mutilthread.designpattern.ProducerConsumer;
     2 
     3 import java.util.concurrent.BlockingQueue;
     4 import java.util.concurrent.ExecutorService;
     5 import java.util.concurrent.Executors;
     6 import java.util.concurrent.LinkedBlockingQueue;
     7 
     8 /**
     9  * Created by Administrator on 2017/5/17.
    10  */
    11 public class Main {
    12     public static void main(String[] args) throws Exception {
    13         //内存缓冲区
    14         BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10);
    15         //生产者
    16         Producer p1 = new Producer(queue);
    17         Producer p2 = new Producer(queue);
    18         Producer p3 = new Producer(queue);
    19         //消费者
    20         Consumer c1 = new Consumer(queue);
    21         Consumer c2 = new Consumer(queue);
    22         Consumer c3 = new Consumer(queue);
    23 
    24         //创建线程池运行,这是一个缓存的线程池,可以创建无穷大的线程,没有任务的时候不创建线程。空闲线程存活时间为60s(默认值)
    25         ExecutorService cachePool = Executors.newCachedThreadPool();
    26         cachePool.execute(p1);
    27         cachePool.execute(p2);
    28         cachePool.execute(p3);
    29         cachePool.execute(c1);
    30         cachePool.execute(c2);
    31         cachePool.execute(c3);
    32 
    33         try {
    34             Thread.sleep(3000);
    35         } catch (InterruptedException e) {
    36             e.printStackTrace();
    37         }
    38         p1.stop();
    39         p2.stop();
    40         p3.stop();
    41         try {
    42             Thread.sleep(2000);
    43         } catch (InterruptedException e) {
    44             e.printStackTrace();
    45         }
    46 //        cachePool.shutdown();
    47 //        cachePool.shutdownNow();
    48 
    49     }
    50 }

    Producer类:

     1 package com.ietree.basicskill.mutilthread.designpattern.ProducerConsumer;
     2 
     3 import java.util.Random;
     4 import java.util.concurrent.BlockingQueue;
     5 import java.util.concurrent.TimeUnit;
     6 import java.util.concurrent.atomic.AtomicInteger;
     7 
     8 /**
     9  * Created by Administrator on 2017/5/17.
    10  */
    11 public class Producer implements Runnable {
    12 
    13     //共享缓存区
    14     private BlockingQueue<Data> queue;
    15     //多线程间是否启动变量,有强制从主内存中刷新的功能。即时返回线程的状态
    16     private volatile boolean isRunning = true;
    17     //id生成器
    18     private static AtomicInteger count = new AtomicInteger();
    19     //随机对象
    20     private static Random r = new Random();
    21 
    22     public Producer(BlockingQueue queue){
    23         this.queue = queue;
    24     }
    25 
    26     @Override
    27     public void run() {
    28         while(isRunning){
    29             try {
    30                 //随机休眠0 - 1000 毫秒 表示获取数据(产生数据的耗时)
    31                 Thread.sleep(r.nextInt(1000));
    32                 //获取的数据进行累计...
    33                 int id = count.incrementAndGet();
    34                 //比如通过一个getData方法获取了
    35                 Data data = new Data(Integer.toString(id), "数据" + id);
    36                 System.out.println("当前线程:" + Thread.currentThread().getName() + ", 获取了数据,id为:" + id + ", 进行装载到公共缓冲区中...");
    37                 if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){
    38                     System.out.println("提交缓冲区数据失败....");
    39                     //do something... 比如重新提交
    40                 }
    41             } catch (InterruptedException e) {
    42                 e.printStackTrace();
    43             }
    44         }
    45     }
    46 
    47     public void stop(){
    48         this.isRunning = false;
    49     }
    50 }

    Consumer类:

     1 package com.ietree.basicskill.mutilthread.designpattern.ProducerConsumer;
     2 
     3 import java.util.Random;
     4 import java.util.concurrent.BlockingQueue;
     5 
     6 /**
     7  * Created by Administrator on 2017/5/17.
     8  */
     9 public class Consumer implements Runnable {
    10     private BlockingQueue<Data> queue;
    11 
    12     public Consumer(BlockingQueue queue){
    13         this.queue = queue;
    14     }
    15 
    16     //随机对象
    17     private static Random r = new Random();
    18 
    19     @Override
    20     public void run() {
    21         while(true){
    22             try {
    23                 //获取数据
    24                 Data data = this.queue.take();
    25                 //进行数据处理。休眠0 - 1000毫秒模拟耗时
    26                 Thread.sleep(r.nextInt(1000));
    27                 System.out.println("当前消费线程:" + Thread.currentThread().getName() + ", 消费成功,消费数据为id: " + data.getId());
    28             } catch (InterruptedException e) {
    29                 e.printStackTrace();
    30             }
    31         }
    32     }
    33 }

    Data类

     1 package com.ietree.basicskill.mutilthread.designpattern.ProducerConsumer;
     2 
     3 /**
     4  * Created by Administrator on 2017/5/17.
     5  */
     6 public class Data {
     7     private String id;
     8     private String name;
     9 
    10     public Data(String id, String name){
    11         this.id = id;
    12         this.name = name;
    13     }
    14 
    15     public String getId() {
    16         return id;
    17     }
    18 
    19     public void setId(String id) {
    20         this.id = id;
    21     }
    22 
    23     public String getName() {
    24         return name;
    25     }
    26 
    27     public void setName(String name) {
    28         this.name = name;
    29     }
    30 
    31     @Override
    32     public String toString(){
    33         return "{id: " + id + ", name: " + name + "}";
    34     }
    35 }
  • 相关阅读:
    批量清理java源码的target目录
    前端移动node_modules到其他位置
    oracle物化视图和视图的数据不一致
    百词斩英语单词素材提取、听力练习
    2048自动游戏AI, 最高可以玩出一二十个2048
    switcheroo: Alt+Tab的替代工具、窗口搜索
    为知笔记wiz.editor.md增强
    腾讯北极星 Polaris 试用
    [分布式] 分布式事务、seata
    Mysql查询所有的表名和查询表中所有的字段名
  • 原文地址:https://www.cnblogs.com/Dylansuns/p/6869869.html
Copyright © 2011-2022 走看看