zoukankan      html  css  js  c++  java
  • 线程学习三:生产者/消费者模式

    原理:

      生产者/消费者模式就是将生产和消费分成多个线程,他们并用一个仓库。当仓库满的时候不能再生产,只能等消费了以后才能继续生产;当仓库空的时候不能再消费,必须等到生产了之后才能消费。

      这个实现的手段有三种:使用wait() / notify()方法;使用await() / signal()方法;使用BlockingQueue阻塞队列方法

    实现:

      一、使用wait() / notify()方法

      wait() / nofity()方法是基类Object的两个方法,也就意味着所有Java类都会拥有这两个方法,这样,我们就可以为任何对象实现同步机制。

      wait()方法:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等等状态,让其他线程执行。

      notify()方法:当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。

     1 public class ProConTest {
     2     public final int MAX_SIZE = 10;
     3     public LinkedList<Object> list = new LinkedList<Object>();
     4 
     5     public static void main(String[] args) {
     6         ProConTest pct = new ProConTest();
     7         Producer producer = pct.new Producer();
     8         Consumer consumer = pct.new Consumer();
     9         producer.start();
    10         consumer.start();
    11     }
    12 
    13     class Producer extends Thread {
    14 
    15         @Override
    16         public void run() {
    17             Producer();
    18         }
    19 
    20         public void Producer() {
    21             while (true) {
    22                 synchronized (list) {
    23                     if (list.size() == MAX_SIZE) {
    24                         System.out.println("已经满仓,不能再生产了");
    25                         try {
    26                             list.wait();
    27                         } catch (InterruptedException e) {
    28                             e.printStackTrace();
    29                         }
    30                     }
    31                     list.add(1);
    32                     System.out.println("现在仓库剩余" + list.size());
    33                     list.notifyAll();
    34                 }
    35             }
    36         }
    37     }
    38 
    39     class Consumer extends Thread {
    40 
    41         @Override
    42         public void run() {
    43             Consumer();
    44         }
    45 
    46         public void Consumer() {
    47             while (true) {
    48                 synchronized (list) {
    49                     if (list.size() == 0) {
    50                         System.out.println("仓库已经卖完,不能再消费了");
    51                         try {
    52                             list.wait();
    53                         } catch (InterruptedException e) {
    54                             e.printStackTrace();
    55                         }
    56                     }
    57                     list.remove();
    58                     System.out.println("现在仓库剩余" + list.size());
    59                     list.notifyAll();
    60                 }
    61             }
    62         }
    63     }
    64 
    65 }

      二、使用await() / signal()方法

      在JDK5.0之后,Java提供了更加健壮的线程处理机制,包括同步、锁定、线程池等,它们可以实现更细粒度的线程控制。await()和signal()就是其中用来做同步的两种方法,它们的功能基本上和wait() / nofity()相同,完全可以取代它们,但是它们和新引入的锁定机制Lock直接挂钩,具有更大的灵活性。通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。

     1 public class ProConTest2 {
     2     public final int MAX_SIZE = 10;
     3     public LinkedList<Object> list = new LinkedList<Object>();
     4     private final Lock lock = new ReentrantLock();
     5     private final Condition full = lock.newCondition();
     6     private final Condition empty = lock.newCondition();
     7 
     8     public static void main(String[] args) {
     9         ProConTest2 pct = new ProConTest2();
    10         Producer producer = pct.new Producer();
    11         Consumer consumer = pct.new Consumer();
    12         producer.start();
    13         consumer.start();
    14     }
    15 
    16     class Producer extends Thread {
    17 
    18         @Override
    19         public void run() {
    20             Producer();
    21         }
    22 
    23         public void Producer() {
    24             while (true) {
    25                 lock.lock();
    26                 if (list.size() == MAX_SIZE) {
    27                     System.out.println("已经满仓,不能再生产了");
    28                     try {
    29                         full.await();
    30                     } catch (InterruptedException e) {
    31                         e.printStackTrace();
    32                     }
    33                 }
    34                 list.add(1);
    35                 System.out.println("现在仓库剩余" + list.size());
    36                 full.signalAll();
    37                 empty.signalAll();
    38                 lock.unlock();
    39 
    40             }
    41         }
    42     }
    43 
    44     class Consumer extends Thread {
    45 
    46         @Override
    47         public void run() {
    48             Consumer();
    49         }
    50 
    51         public void Consumer() {
    52             while (true) {
    53                 lock.lock();
    54                 if (list.size() == 0) {
    55                     System.out.println("仓库已经卖完,不能再消费了");
    56                     try {
    57                         empty.await();
    58                     } catch (InterruptedException e) {
    59                         e.printStackTrace();
    60                     }
    61                 }
    62                 list.remove();
    63                 System.out.println("现在仓库剩余" + list.size());
    64                 full.signalAll();
    65                 empty.signalAll();
    66                 lock.unlock();
    67 
    68             }
    69         }
    70     }
    71 }

      三、使用BlockingQueue阻塞队列方法

      BlockingQueue是JDK5.0的新增内容,它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种await() / signal()方法。它可以在生成对象时指定容量大小。它用于阻塞操作的是put()和take()方法。

      put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。

      take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。

     1 public class ProConTest3 {
     2     public final int MAX_SIZE = 10;
     3     private LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(MAX_SIZE);
     4 
     5     public static void main(String[] args) {
     6         ProConTest3 pct = new ProConTest3();
     7         Producer producer = pct.new Producer();
     8         Consumer consumer = pct.new Consumer();
     9         producer.start();
    10         consumer.start();
    11     }
    12 
    13     class Producer extends Thread {
    14 
    15         @Override
    16         public void run() {
    17             Producer();
    18         }
    19 
    20         public void Producer() {
    21             while (true) {
    22                 if (queue.size() == MAX_SIZE) {
    23                     System.out.println("已经满仓,不能再生产了");
    24                 }
    25                 try {
    26                     queue.put(1);
    27                 } catch (InterruptedException e) {
    28                     e.printStackTrace();
    29                 }
    30                 System.out.println("现在仓库剩余" + queue.size());
    31             }
    32         }
    33     }
    34 
    35     class Consumer extends Thread {
    36 
    37         @Override
    38         public void run() {
    39             Consumer();
    40         }
    41 
    42         public void Consumer() {
    43             while (true) {                
    44                 if (queue.size() == 0) {
    45                     System.out.println("仓库已经卖完,不能再消费了");
    46                 }
    47                 try {
    48                     queue.take();
    49                 } catch (InterruptedException e) {
    50                     e.printStackTrace();
    51                 }
    52                 System.out.println("现在仓库剩余" + queue.size());
    53             }
    54         }
    55     }
    56 
    57 }

    参考

    JAVA编程思想

    http://blog.csdn.net/monkey_d_meng/article/details/6251879

  • 相关阅读:
    CentOS 7 rpm包部署kubernetes 1.20
    基于ipset对大量IP进行封禁(Iptables)
    RPM打包指南
    MySQL主从一致性检查
    基于Docker+Jenkins+Git的发布环境
    MySQL管理工具集MySQL Utilities | 利用frm和ibd文件恢复表数据
    MySQL日志解析工具资料汇总
    MySQL之—分库分表的技巧
    MySQL之查询重复记录、删除重复记录方法大全
    一个爬虫
  • 原文地址:https://www.cnblogs.com/gforce/p/5897105.html
Copyright © 2011-2022 走看看