zoukankan      html  css  js  c++  java
  • 生产者-消费者

    生产者消费者也是一个非常经典的多线程模式,我们在实际开发中应用非常广泛的思想理念。在生产者-消费模式中:通常由两类线程,即若干个生产者的线程和若干个消费者的线程。生产者线程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务,在生产者和消费者之间通过共享内存缓存区进行通信。

    MQ:messageQueue消息队列,是一个中间件

    代码实现:

    Provide:

    package com.java.day04_mode_pro_custom;
    
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class Provider implements Runnable{
        
        //共享缓存区
        private BlockingQueue<Data> queue;
        
        //多线程间启动自动变量,有强制从主内存中刷新的功能,即时返回线程的状态
        private volatile boolean isRunning  = true;
        
        //id生成器
        private static AtomicInteger count = new AtomicInteger();
        
        //随即对象
        private static Random r = new Random();
        
        public Provider(BlockingQueue<Data> queue){
            this.queue=queue;
        }
    
        @Override
        public void run() {
    
            while(isRunning){
                System.out.println("*******开始生产数据*********");
                try {
                    //随机休眠0-1000毫秒 表示获取数据(产生数据的耗时)
                    Thread.sleep(r.nextInt(1000));
                    //获取的数据进行累计
                    int id = count.incrementAndGet();
                    //比如通过一个getData方法获取了
                    Data data = new Data(id,"数据"+id);
                    System.out.println("当前线程:"+Thread.currentThread().getName()+"获取了数据,id为:"+id+",进行装载到公共缓冲区中。。。。");
                    //有可能缓冲区数据满了的情况,此时会提交失败
                    if(!this.queue.offer(data,2,TimeUnit.SECONDS)){
                        System.out.println("数据装载到缓冲区失败。。。");
                        //do something 比如重新提交。。。。
                    }
                    System.out.println("*******一次生产完成*********");
                    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            
            
        }
        
        
        public void stop(){
            this.isRunning=false;
            
        }
        
        
    }

    Consumer:

     1 package com.java.day04_mode_pro_custom;
     2 
     3 import java.util.Random;
     4 import java.util.concurrent.BlockingQueue;
     5 
     6 public class Consumer implements Runnable{
     7 
     8     //数据缓冲区
     9     private BlockingQueue <Data>queue;
    10     
    11     //随机对象
    12     private static Random r = new Random();
    13     
    14     public Consumer(BlockingQueue<Data> queue){
    15         this.queue=queue;
    16     }
    17 
    18     @Override
    19     public void run() {
    20         //线程不停的在处理数据
    21         while(true){
    22             try {
    23                 Data data = this.queue.take();
    24                 //随机休眠时间为0-1000  表示正在处理数据
    25                 Thread.sleep(r.nextInt(1000));
    26             
    27                 System.out.println("当前消费线程:"+Thread.currentThread().getName()+"消费成功,消费数据为:"+data.getName());
    28                 
    29             
    30             } catch (InterruptedException e) {
    31                 e.printStackTrace();
    32             }
    33             
    34             
    35         }
    36     }
    37     
    38     
    39     
    40     
    41 }

    Main:

     1 package com.java.day04_mode_pro_custom;
     2 
     3 import java.util.concurrent.BlockingQueue;
     4 import java.util.concurrent.ExecutorService;
     5 import java.util.concurrent.Executors;
     6 import java.util.concurrent.LinkedBlockingQueue;
     7 
     8 public class Main {
     9 
    10     
    11     public static void main(String[] args) {
    12         //内存缓冲区
    13         BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10);
    14         
    15         //生产者
    16         Provider p1 = new Provider(queue);
    17         Provider p2 = new Provider(queue);
    18         Provider p3 = new Provider(queue);
    19         
    20         //消费者
    21         Consumer c1 = new Consumer(queue);
    22 
    23         Consumer c2 = new Consumer(queue);
    24         Consumer c3 = new Consumer(queue);
    25         
    26         //创建线程池运行,这是一个缓存的线程池,可以创建无穷大的线程,没有任务的时候不创建线程,空闲线程的存活时间为60s
    27         ExecutorService cachePool = Executors.newCachedThreadPool();
    28         //传的对象要实现runnable接口
    29         cachePool.execute(p1);
    30         cachePool.execute(p2);
    31         cachePool.execute(p3);
    32         cachePool.execute(c1);
    33         cachePool.execute(c2);
    34         cachePool.execute(c3);
    35         
    36         try {
    37             Thread.sleep(3000);
    38         } catch (InterruptedException e) {
    39             e.printStackTrace();
    40         }
    41 
    42         //不再产生数据
    43         p1.stop();
    44         p2.stop();
    45         p3.stop();
    46         
    47         System.out.println("停止生产数据。。。");
    48         try {
    49             Thread.sleep(2000);
    50         } catch (InterruptedException e) {
    51             e.printStackTrace();
    52         }
    53 
    54         
    55         
    56         
    57     }
    58     
    59     
    60     
    61     
    62     
    63     
    64     
    65     
    66 }

    Data:

     1 package com.java.day04_mode_pro_custom;
     2 
     3 public class Data {
     4 
     5     private int id;
     6     private String name;
     7     
     8     public Data(int id,String name){
     9         this.id=id;
    10         this.name=name;
    11     }
    12 
    13     public int getId() {
    14         return id;
    15     }
    16 
    17     public void setId(int id) {
    18         this.id = id;
    19     }
    20 
    21     public String getName() {
    22         return name;
    23     }
    24 
    25     public void setName(String name) {
    26         this.name = name;
    27     }
    28 
    29     @Override
    30     public String toString() {
    31         return "Data [id=" + id + ", name=" + name + "]";
    32     }
    33     
    34     
    35     
    36 }

    运行结果:

     1 *******开始生产数据*********
     2 *******开始生产数据*********
     3 *******开始生产数据*********
     4 当前线程:pool-1-thread-2获取了数据,id为:1,进行装载到公共缓冲区中。。。。
     5 *******一次生产完成*********
     6 *******开始生产数据*********
     7 当前线程:pool-1-thread-1获取了数据,id为:2,进行装载到公共缓冲区中。。。。
     8 当前消费线程:pool-1-thread-4消费成功,消费数据为:数据1
     9 *******一次生产完成*********
    10 *******开始生产数据*********
    11 当前线程:pool-1-thread-3获取了数据,id为:3,进行装载到公共缓冲区中。。。。
    12 *******一次生产完成*********
    13 *******开始生产数据*********
    14 当前线程:pool-1-thread-2获取了数据,id为:4,进行装载到公共缓冲区中。。。。
    15 *******一次生产完成*********
    16 *******开始生产数据*********
    17 当前消费线程:pool-1-thread-6消费成功,消费数据为:数据2
    18 当前消费线程:pool-1-thread-4消费成功,消费数据为:数据4
    19 当前消费线程:pool-1-thread-5消费成功,消费数据为:数据3
    20 当前线程:pool-1-thread-3获取了数据,id为:5,进行装载到公共缓冲区中。。。。
    21 *******一次生产完成*********
    22 *******开始生产数据*********
    23 当前线程:pool-1-thread-1获取了数据,id为:6,进行装载到公共缓冲区中。。。。
    24 *******一次生产完成*********
    25 *******开始生产数据*********
    26 当前线程:pool-1-thread-2获取了数据,id为:7,进行装载到公共缓冲区中。。。。
    27 *******一次生产完成*********
    28 *******开始生产数据*********
    29 当前消费线程:pool-1-thread-6消费成功,消费数据为:数据5
    30 当前消费线程:pool-1-thread-5消费成功,消费数据为:数据7
    31 当前消费线程:pool-1-thread-4消费成功,消费数据为:数据6
    32 当前线程:pool-1-thread-2获取了数据,id为:8,进行装载到公共缓冲区中。。。。
    33 *******一次生产完成*********
    34 *******开始生产数据*********
    35 当前消费线程:pool-1-thread-6消费成功,消费数据为:数据8
    36 当前线程:pool-1-thread-3获取了数据,id为:9,进行装载到公共缓冲区中。。。。
    37 *******一次生产完成*********
    38 *******开始生产数据*********
    39 当前线程:pool-1-thread-3获取了数据,id为:10,进行装载到公共缓冲区中。。。。
    40 *******一次生产完成*********
    41 *******开始生产数据*********
    42 当前消费线程:pool-1-thread-4消费成功,消费数据为:数据10
    43 当前线程:pool-1-thread-3获取了数据,id为:11,进行装载到公共缓冲区中。。。。
    44 *******一次生产完成*********
    45 *******开始生产数据*********
    46 当前线程:pool-1-thread-1获取了数据,id为:12,进行装载到公共缓冲区中。。。。
    47 *******一次生产完成*********
    48 *******开始生产数据*********
    49 当前消费线程:pool-1-thread-4消费成功,消费数据为:数据12
    50 当前线程:pool-1-thread-3获取了数据,id为:13,进行装载到公共缓冲区中。。。。
    51 *******一次生产完成*********
    52 *******开始生产数据*********
    53 当前消费线程:pool-1-thread-5消费成功,消费数据为:数据9
    54 当前线程:pool-1-thread-1获取了数据,id为:14,进行装载到公共缓冲区中。。。。
    55 *******一次生产完成*********
    56 *******开始生产数据*********
    57 当前线程:pool-1-thread-2获取了数据,id为:15,进行装载到公共缓冲区中。。。。
    58 *******一次生产完成*********
    59 *******开始生产数据*********
    60 当前线程:pool-1-thread-3获取了数据,id为:16,进行装载到公共缓冲区中。。。。
    61 *******一次生产完成*********
    62 *******开始生产数据*********
    63 当前消费线程:pool-1-thread-6消费成功,消费数据为:数据11
    64 当前消费线程:pool-1-thread-4消费成功,消费数据为:数据13
    65 停止生产数据。。。
    66 当前消费线程:pool-1-thread-6消费成功,消费数据为:数据15
    67 当前消费线程:pool-1-thread-5消费成功,消费数据为:数据14
    68 当前线程:pool-1-thread-3获取了数据,id为:17,进行装载到公共缓冲区中。。。。
    69 *******一次生产完成*********
    70 当前消费线程:pool-1-thread-4消费成功,消费数据为:数据16
    71 当前线程:pool-1-thread-2获取了数据,id为:18,进行装载到公共缓冲区中。。。。
    72 *******一次生产完成*********
    73 当前线程:pool-1-thread-1获取了数据,id为:19,进行装载到公共缓冲区中。。。。
    74 *******一次生产完成*********
    75 当前消费线程:pool-1-thread-6消费成功,消费数据为:数据17
    76 当前消费线程:pool-1-thread-4消费成功,消费数据为:数据19
    77 当前消费线程:pool-1-thread-5消费成功,消费数据为:数据18
  • 相关阅读:
    xss漏洞
    web日志分析(待)
    linux命令学习摘记
    浏览器的MIME映射(程序映射)
    文件上传靶场-Upload-Labs
    目录遍历用字典
    cmd、bat分割单行字符串
    iptables使用
    Spring AOP 学习(五)
    Spring 使用注解注入 学习(四)
  • 原文地址:https://www.cnblogs.com/syousetu/p/6757265.html
Copyright © 2011-2022 走看看