zoukankan      html  css  js  c++  java
  • ArrayBlockingQueue源码剖析

    生产者-消费者
    ArrayBlockingQueue是一个实现了BlockingQueue接口的类,其可以很方便的实现生产者-消费者模式。用法如下:


    class Producer implements Runnable {
    private final BlockingQueue queue;
    Producer(BlockingQueue q) { queue = q; }
    public void run() {
    try {
    while (true) { queue.put(produce()); }
    } catch (InterruptedException ex) { ... handle ...}
    }
    Object produce() { ... }
    }

    class Consumer implements Runnable {
    private final BlockingQueue queue;
    Consumer(BlockingQueue q) { queue = q; }
    public void run() {
    try {
    while (true) { consume(queue.take()); }
    } catch (InterruptedException ex) { ... handle ...}
    }
    void consume(Object x) { ... }
    }

    class Setup {
    void main() {
    BlockingQueue q = new SomeQueueImplementation();
    Producer p = new Producer(q);
    Consumer c1 = new Consumer(q);
    Consumer c2 = new Consumer(q);
    new Thread(p).start();
    new Thread(c1).start();
    new Thread(c2).start();
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    two-condition算法来进行并发控制
    在ArrayBlockingQueue中有如下三个变量声明(定义):


    /*
    * Concurrency control uses the classic two-condition algorithm
    * found in any textbook.
    */

    /** Main lock guarding all access */
    final ReentrantLock lock;
    /** Condition for waiting takes */
    private final Condition notEmpty;
    /** Condition for waiting puts */
    private final Condition notFull;
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    实现生产者-消费者的并发控制很简单,一把锁,两个条件!再来看ArrayBlockingQueue的构造函数代码:


    public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
    throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    在初始化的时候,ArrayBlockingQueue对lock、notEmpty、notFull进行了初始化。

    生产者进行生产
    首先查看生产者生产时候需要调用的put(E e)方法:


    public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
    while (count == items.length)
    notFull.await();
    insert(e);
    } finally {
    lock.unlock();
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    首先通过ReentrantLock的lockInterruptibly()方法来尝试获得锁,该方法在获取锁之后,可以继续响应线程的interrupt操作,注意lock.unlock()一定要写在finally块中,不然在出现异常之后,有可能永远也释放不了锁了!

    当发现当前数量已经满的时候:while(count == items.length),那么将会让生产者(当前线程)进行等待:notFull.await(),否则进行insert(e)操作。

    继续跟踪insert(e)操作不难想到,在插入成功之后,会通知notEmpty来唤醒消费者(某一个正在等待notEmpty条件的线程),告知有了新的产品可消费了!


    private void insert(E x) {
    items[putIndex] = x;
    putIndex = inc(putIndex);
    ++count;
    notEmpty.signal();
    }
    1
    2
    3
    4
    5
    6
    7
    8
    如上可知:如果队列已满(full),那么notFull进行等待,否则插入成功之后,唤醒notEmpty告知不用等待了。同理:消费者进行消费的take操作也是类似的。

    消费者进行消费

    public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
    while (count == 0)
    notEmpty.await();
    return extract();
    } finally {
    lock.unlock();
    }
    }

    private E extract() {
    final Object[] items = this.items;
    E x = this.<E>cast(items[takeIndex]);
    items[takeIndex] = null;
    takeIndex = inc(takeIndex);
    --count;
    notFull.signal();
    return x;
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    概括
    整体而言,在有了ReentrantLock、Condition之后,生产者-消费者模式实现起来还是很简单的。ReentrantLock负责加锁释放锁,Condition负责等待唤醒线程。
    ————————————————
    版权声明:本文为CSDN博主「赵坤的个人网站」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/anxiaoyi520/article/details/46670675

  • 相关阅读:
    java 多个设备,锁定先后顺序
    使用个推的时候出现Installation error: INSTALL_FAILED_DUPLICATE_PERMISSION
    android 开发-设置控件/view的水平方向翻转
    IIS8中 出现ashx 401:未授权,uploadify上传文件失败
    No Launcher activity found!
    activiti工作流之Eclipse的Eclipse BPMN 2.0 Designer无法安装或者(安装后无法重复打开*.bpmn)
    android Ant 打包
    andorid 直接解压后的xml的解密
    获取当前运行的类名或者方法
    http断点下载客户端和服务端
  • 原文地址:https://www.cnblogs.com/hanease/p/14901428.html
Copyright © 2011-2022 走看看