zoukankan      html  css  js  c++  java
  • Java里的生产者-消费者模型(Producer and Consumer Pattern in Java)

    生产者-消费者模型是多线程问题里面的经典问题,也是面试的常见问题。有如下几个常见的实现方法:

    1. wait()/notify()

    2. lock & condition

    3. BlockingQueue

    下面来逐一分析。

    1. wait()/notify()

    第一种实现,利用根类Object的两个方法wait()/notify(),来停止或者唤醒线程的执行;这也是最原始的实现。

     1 public class WaitNotifyBroker<T> implements Broker<T> {
     2 
     3     private final Object[] items;
     4 
     5     private int takeIndex;
     6     private int putIndex;
     7     private int count;
     8 
     9     public WaitNotifyBroker(int capacity) {
    10         this.items = new Object[capacity];
    11     }
    12 
    13     @SuppressWarnings("unchecked")
    14     @Override
    15     public T take() {
    16         T tmpObj = null;
    17         try {
    18             synchronized (items) {
    19                 while (0 == count) {
    20                     items.wait();
    21                 }
    22                 tmpObj = (T) items[takeIndex];
    23                 if (++takeIndex == items.length) {
    24                     takeIndex = 0;
    25                 }
    26                 count--;
    27                 items.notify();
    28             }
    29         } catch (InterruptedException e) {
    30             e.printStackTrace();
    31         }
    32 
    33         return tmpObj;
    34     }
    35 
    36     @Override
    37     public void put(T obj) {
    38         try {
    39             synchronized (items) {
    40                 while (items.length == count) {
    41                     items.wait();
    42                 }
    43 
    44                 items[putIndex] = obj;
    45                 if (++putIndex == items.length) {
    46                     putIndex = 0;
    47                 }
    48                 count++;
    49                 items.notify();
    50             }
    51         } catch (InterruptedException e) {
    52             e.printStackTrace();
    53         }
    54 
    55     }
    56 
    57 }

    这里利用Array构造一个Buffer去存取数据,并利用count, putIndex和takeIndex来保证First-In-First-Out。

    如果利用LinkedList来代替Array,相对来说会稍微简单些。

    LinkedList的实现,可以参考《Java 7 Concurrency Cookbook》第2章wait/notify。

    2. lock & condition

    lock & condition,实际上也实现了类似synchronized和wait()/notify()的功能,但在加锁和解锁、暂停和唤醒方面,更加细腻和可控。

    在JDK的BlockingQueue的默认实现里,也是利用了lock & condition。此文也详细介绍了怎么利用lock&condition写BlockingQueue,这里换LinkedList再实现一次:

     1 public class LockConditionBroker<T> implements Broker<T> {
     2 
     3     private final ReentrantLock lock;
     4     private final Condition notFull;
     5     private final Condition notEmpty;
     6     private final int capacity;
     7     private LinkedList<T> items;
     8 
     9     public LockConditionBroker(int capacity) {
    10         this.lock = new ReentrantLock();
    11         this.notFull = lock.newCondition();
    12         this.notEmpty = lock.newCondition();
    13         this.capacity = capacity;
    14 
    15         items = new LinkedList<T>();
    16     }
    17 
    18     @Override
    19     public T take() {
    20         T tmpObj = null;
    21         lock.lock();
    22         try {
    23             while (items.size() == 0) {
    24                 notEmpty.await();
    25             }
    26 
    27             tmpObj = items.poll();
    28             notFull.signalAll();
    29 
    30         } catch (InterruptedException e) {
    31             e.printStackTrace();
    32         } finally {
    33             lock.unlock();
    34         }
    35         return tmpObj;
    36     }
    37 
    38     @Override
    39     public void put(T obj) {
    40         lock.lock();
    41         try {
    42             while (items.size() == capacity) {
    43                 notFull.await();
    44             }
    45 
    46             items.offer(obj);
    47             notEmpty.signalAll();
    48 
    49         } catch (InterruptedException e) {
    50             e.printStackTrace();
    51         } finally {
    52             lock.unlock();
    53         }
    54 
    55     }
    56 }

    3. BlockingQueue

    最后这种方法,也是最简单最值得推荐的。利用并发包提供的工具:阻塞队列,将阻塞的逻辑交给BlockingQueue。

    实际上,上述1和2的方法实现的Broker类,也可以视为一种简单的阻塞队列,不过没有标准包那么完善。

     1 public class BlockingQueueBroker<T> implements Broker<T> {
     2 
     3     private final BlockingQueue<T> queue;
     4 
     5     public BlockingQueueBroker() {
     6         this.queue = new LinkedBlockingQueue<T>();
     7     }
     8 
     9     @Override
    10     public T take() {
    11         try {
    12             return queue.take();
    13         } catch (InterruptedException e) {
    14             e.printStackTrace();
    15         }
    16 
    17         return null;
    18     }
    19 
    20     @Override
    21     public void put(T obj) {
    22         try {
    23             queue.put(obj);
    24         } catch (InterruptedException e) {
    25             e.printStackTrace();
    26         }
    27     }
    28 
    29 }

    我们的队列封装了标注包里的LinkedBlockingQueue,十分简单高效。

    接下来,就是一个1P2C的例子:

     1 public interface Broker<T> {
     2 
     3     T take();
     4 
     5     void put(T obj);
     6 
     7 }
     8 
     9 
    10 public class Producer implements Runnable {
    11 
    12     private final Broker<Integer> broker;
    13     private final String name;
    14 
    15     public Producer(Broker<Integer> broker, String name) {
    16         this.broker = broker;
    17         this.name = name;
    18     }
    19 
    20     @Override
    21     public void run() {
    22         try {
    23             for (int i = 0; i < 5; i++) {
    24                 broker.put(i);
    25                 System.out.format("%s produced: %s%n", name, i);
    26                 Thread.sleep(1000);
    27             }
    28             broker.put(-1);
    29             System.out.println("produced termination signal");
    30         } catch (InterruptedException e) {
    31             e.printStackTrace();
    32             return;
    33         }
    34 
    35     }
    36 
    37 }
    38 
    39 
    40 public class Consumer implements Runnable {
    41 
    42     private final Broker<Integer> broker;
    43     private final String name;
    44 
    45     public Consumer(Broker<Integer> broker, String name) {
    46         this.broker = broker;
    47         this.name = name;
    48     }
    49 
    50     @Override
    51     public void run() {
    52         try {
    53             for (Integer message = broker.take(); message != -1; message = broker.take()) {
    54                 System.out.format("%s consumed: %s%n", name, message);
    55                 Thread.sleep(1000);
    56             }
    57             System.out.println("received termination signal");
    58         } catch (InterruptedException e) {
    59             e.printStackTrace();
    60             return;
    61         }
    62 
    63     }
    64 
    65 }
    66 
    67 
    68 public class Main {
    69 
    70     public static void main(String[] args) {
    71         Broker<Integer> broker = new WaitNotifyBroker<Integer>(5);
    72 //         Broker<Integer> broker = new LockConditionBroker<Integer>(5);
    73 //         Broker<Integer> broker = new BlockingQueueBroker<Integer>();
    74 
    75         new Thread(new Producer(broker, "prod 1")).start();
    76         new Thread(new Consumer(broker, "cons 1")).start();
    77         new Thread(new Consumer(broker, "cons 2")).start();
    78 
    79     }
    80 
    81 }

    除了上述的方法,其实还有很多第三方的并发包可以解决这个问题。例如LMAX Disruptor和Chronicle等

    本文完。

    参考:

    《Java 7 Concurrency Cookbook》

  • 相关阅读:
    Windows Phone App的dump 文件分析
    博客园客户端UAP开发随笔 -- App的心动杀手锏:动画
    博客园客户端UAP开发随笔--自定义控件的左膀右臂
    新年快乐
    博客园客户端(Universal App)开发随笔
    博客园 UAP 的部分反馈回复
    博客园客户端(Universal App)开发随笔 -- 样式管理与夜间模式
    Hadoop专业解决方案-第5章 开发可靠的MapReduce应用
    胖子哥的大数据之路(6)- NoSQL生态圈全景介绍
    NoSQL高级培训课程-HBase&&MongoDB(两天版)
  • 原文地址:https://www.cnblogs.com/techyc/p/3357194.html
Copyright © 2011-2022 走看看