zoukankan      html  css  js  c++  java
  • 生产者-消费者模式的三种实现方式

    1、背景                                                                    

    生产者生产数据到缓冲区中,消费者从缓冲区中取数据。

    如果缓冲区已经满了,则生产者线程阻塞;

    如果缓冲区为空,那么消费者线程阻塞。

    2、方式一:synchronized、wait和notify                

    复制代码
      1 package producerConsumer;
      2 //wait 和 notify
      3 public class ProducerConsumerWithWaitNofity {
      4     public static void main(String[] args) {
      5         Resource resource = new Resource();
      6         //生产者线程
      7         ProducerThread p1 = new ProducerThread(resource);
      8         ProducerThread p2 = new ProducerThread(resource);
      9         ProducerThread p3 = new ProducerThread(resource);
     10         //消费者线程
     11         ConsumerThread c1 = new ConsumerThread(resource);
     12         //ConsumerThread c2 = new ConsumerThread(resource);
     13         //ConsumerThread c3 = new ConsumerThread(resource);
     14     
     15         p1.start();
     16         p2.start();
     17         p3.start();
     18         c1.start();
     19         //c2.start();
     20         //c3.start();
     21     }
     22     
     23     
     24     
     25 }
     26 /**
     27  * 公共资源类
     28  * @author 
     29  *
     30  */
     31 class Resource{//重要
     32     //当前资源数量
     33     private int num = 0;
     34     //资源池中允许存放的资源数目
     35     private int size = 10;
     36 
     37     /**
     38      * 从资源池中取走资源
     39      */
     40     public synchronized void remove(){
     41         if(num > 0){
     42             num--;
     43             System.out.println("消费者" + Thread.currentThread().getName() +
     44                     "消耗一件资源," + "当前线程池有" + num + "个");
     45             notifyAll();//通知生产者生产资源
     46         }else{
     47             try {
     48                 //如果没有资源,则消费者进入等待状态
     49                 wait();
     50                 System.out.println("消费者" + Thread.currentThread().getName() + "线程进入等待状态");
     51             } catch (InterruptedException e) {
     52                 e.printStackTrace();
     53             }
     54         }
     55     }
     56     /**
     57      * 向资源池中添加资源
     58      */
     59     public synchronized void add(){
     60         if(num < size){
     61             num++;
     62             System.out.println(Thread.currentThread().getName() + "生产一件资源,当前资源池有" 
     63             + num + "个");
     64             //通知等待的消费者
     65             notifyAll();
     66         }else{
     67             //如果当前资源池中有10件资源
     68             try{
     69                 wait();//生产者进入等待状态,并释放锁
     70                 System.out.println(Thread.currentThread().getName()+"线程进入等待");
     71             }catch(InterruptedException e){
     72                 e.printStackTrace();
     73             }
     74         }
     75     }
     76 }
     77 /**
     78  * 消费者线程
     79  */
     80 class ConsumerThread extends Thread{
     81     private Resource resource;
     82     public ConsumerThread(Resource resource){
     83         this.resource = resource;
     84     }
     85     @Override
     86     public void run() {
     87         while(true){
     88             try {
     89                 Thread.sleep(1000);
     90             } catch (InterruptedException e) {
     91                 e.printStackTrace();
     92             }
     93             resource.remove();
     94         }
     95     }
     96 }
     97 /**
     98  * 生产者线程
     99  */
    100 class ProducerThread extends Thread{
    101     private Resource resource;
    102     public ProducerThread(Resource resource){
    103         this.resource = resource;
    104     }
    105     @Override
    106     public void run() {
    107         //不断地生产资源
    108         while(true){
    109             try {
    110                 Thread.sleep(1000);
    111             } catch (InterruptedException e) {
    112                 e.printStackTrace();
    113             }
    114             resource.add();
    115         }
    116     }
    117     
    118 }
    复制代码

    方式二:lock和condition的await、signalAll

    复制代码
      1 package producerConsumer;
      2 
      3 import java.util.concurrent.locks.Condition;
      4 import java.util.concurrent.locks.Lock;
      5 import java.util.concurrent.locks.ReentrantLock;
      6 /**
      7  * 使用Lock 和 Condition解决生产者消费者问题
      8  * @author tangzhijing
      9  *
     10  */
     11 public class LockCondition {
     12         public static void main(String[] args) {
     13             Lock lock = new ReentrantLock();
     14             Condition producerCondition = lock.newCondition();
     15             Condition consumerCondition = lock.newCondition();
     16             Resource2 resource = new Resource2(lock,producerCondition,consumerCondition);
     17             
     18             //生产者线程
     19             ProducerThread2 producer1 = new ProducerThread2(resource);
     20             
     21             //消费者线程
     22             ConsumerThread2 consumer1 = new ConsumerThread2(resource);
     23             ConsumerThread2 consumer2 = new ConsumerThread2(resource);
     24             ConsumerThread2 consumer3 = new ConsumerThread2(resource);
     25             
     26             producer1.start();
     27             consumer1.start();
     28             consumer2.start();
     29             consumer3.start();
     30         }
     31 }
     32 /**
     33  * 消费者线程
     34  */
     35 class ConsumerThread2 extends Thread{
     36     private Resource2 resource;
     37     public ConsumerThread2(Resource2 resource){
     38         this.resource = resource;
     39         //setName("消费者");
     40     }
     41     public void run(){
     42         while(true){
     43             try {
     44                 Thread.sleep((long) (1000 * Math.random()));
     45             } catch (InterruptedException e) {
     46                 e.printStackTrace();
     47             }
     48             resource.remove();
     49         }
     50     }
     51 }
     52 /**
     53  * 生产者线程
     54  * @author tangzhijing
     55  *
     56  */
     57 class ProducerThread2 extends Thread{
     58     private Resource2 resource;
     59     public ProducerThread2(Resource2 resource){
     60         this.resource = resource;
     61         setName("生产者");
     62     }
     63     public void run(){
     64         while(true){
     65                 try {
     66                     Thread.sleep((long) (1000 * Math.random()));
     67                 } catch (InterruptedException e) {
     68                     e.printStackTrace();
     69                 }
     70                 resource.add();
     71         }
     72     }
     73 }
     74 /**
     75  * 公共资源类
     76  * @author tangzhijing
     77  *
     78  */
     79 class Resource2{
     80     private int num = 0;//当前资源数量
     81     private int size = 10;//资源池中允许存放的资源数目
     82     private Lock lock;
     83     private Condition producerCondition;
     84     private Condition consumerCondition;
     85     public Resource2(Lock lock, Condition producerCondition, Condition consumerCondition) {
     86         this.lock = lock;
     87         this.producerCondition = producerCondition;
     88         this.consumerCondition = consumerCondition;
     89  
     90     }
     91     /**
     92      * 向资源池中添加资源
     93      */
     94     public void add(){
     95         lock.lock();
     96         try{
     97             if(num < size){
     98                 num++;
     99                 System.out.println(Thread.currentThread().getName() + 
    100                         "生产一件资源,当前资源池有" + num + "个");
    101                 //唤醒等待的消费者
    102                 consumerCondition.signalAll();
    103             }else{
    104                 //让生产者线程等待
    105                 try {
    106                     producerCondition.await();
    107                     System.out.println(Thread.currentThread().getName() + "线程进入等待");
    108                 } catch (InterruptedException e) {
    109                     e.printStackTrace();
    110                 }
    111             }
    112         }finally{
    113             lock.unlock();
    114         }
    115     }
    116     /**
    117      * 从资源池中取走资源
    118      */
    119     public void remove(){
    120         lock.lock();
    121         try{
    122             if(num > 0){
    123                 num--;
    124                 System.out.println("消费者" + Thread.currentThread().getName() 
    125                         + "消耗一件资源," + "当前资源池有" + num + "个");
    126                 producerCondition.signalAll();//唤醒等待的生产者
    127             }else{
    128                 try {
    129                     consumerCondition.await();
    130                     System.out.println(Thread.currentThread().getName() + "线程进入等待");
    131                 } catch (InterruptedException e) {
    132                     e.printStackTrace();
    133                 }//让消费者等待
    134             }
    135         }finally{
    136             lock.unlock();
    137         }
    138     }
    139     
    140 }
    复制代码

    方式三:BlockingQueue

    复制代码
     1 package producerConsumer;
     2 
     3 import java.util.concurrent.BlockingQueue;
     4 import java.util.concurrent.LinkedBlockingQueue;
     5 
     6 //使用阻塞队列BlockingQueue解决生产者消费者
     7 public class BlockingQueueConsumerProducer {
     8     public static void main(String[] args) {
     9         Resource3 resource = new Resource3();
    10         //生产者线程
    11         ProducerThread3 p = new ProducerThread3(resource);
    12         //多个消费者
    13         ConsumerThread3 c1 = new ConsumerThread3(resource);
    14         ConsumerThread3 c2 = new ConsumerThread3(resource);
    15         ConsumerThread3 c3 = new ConsumerThread3(resource);
    16  
    17         p.start();
    18         c1.start();
    19         c2.start();
    20         c3.start();
    21     }
    22 }
    23 /**
    24  * 消费者线程
    25  * @author tangzhijing
    26  *
    27  */
    28 class ConsumerThread3 extends Thread {
    29     private Resource3 resource3;
    30  
    31     public ConsumerThread3(Resource3 resource) {
    32         this.resource3 = resource;
    33         //setName("消费者");
    34     }
    35  
    36     public void run() {
    37         while (true) {
    38             try {
    39                 Thread.sleep((long) (1000 * Math.random()));
    40             } catch (InterruptedException e) {
    41                 e.printStackTrace();
    42             }
    43             resource3.remove();
    44         }
    45     }
    46 }
    47 /**
    48  * 生产者线程
    49  * @author tangzhijing
    50  *
    51  */
    52 class ProducerThread3 extends Thread{
    53     private Resource3 resource3;
    54     public ProducerThread3(Resource3 resource) {
    55         this.resource3 = resource;
    56         //setName("生产者");
    57     }
    58  
    59     public void run() {
    60         while (true) {
    61             try {
    62                 Thread.sleep((long) (1000 * Math.random()));
    63             } catch (InterruptedException e) {
    64                 e.printStackTrace();
    65             }
    66             resource3.add();
    67         }
    68     }
    69 }
    70 class Resource3{
    71     private BlockingQueue resourceQueue = new LinkedBlockingQueue(10);
    72     /**
    73      * 向资源池中添加资源
    74      */
    75     public void add(){
    76         try {
    77             resourceQueue.put(1);
    78             System.out.println("生产者" + Thread.currentThread().getName()
    79                     + "生产一件资源," + "当前资源池有" + resourceQueue.size() + 
    80                     "个资源");
    81         } catch (InterruptedException e) {
    82             e.printStackTrace();
    83         }
    84     }
    85     /**
    86      * 向资源池中移除资源
    87      */
    88     public void remove(){
    89         try {
    90             resourceQueue.take();
    91             System.out.println("消费者" + Thread.currentThread().getName() + 
    92                     "消耗一件资源," + "当前资源池有" + resourceQueue.size() 
    93                     + "个资源");
    94         } catch (InterruptedException e) {
    95             e.printStackTrace();
    96         }
    97     }
    98 }
    复制代码
  • 相关阅读:
    use paramiko to connect remote server and execute command
    protect golang source code
    adjust jedi vim to python2 and python3
    install vim plugin local file offline
    add swap file if you only have 1G RAM
    datatables hyperlink in td
    django rest framework custom json format
    【JAVA基础】网络编程
    【JAVA基础】多线程
    【JAVA基础】String类的概述和使用
  • 原文地址:https://www.cnblogs.com/xiaowenboke/p/10469125.html
Copyright © 2011-2022 走看看