zoukankan      html  css  js  c++  java
  • JUC之阻塞队列BlockingQueue的实现原理

    1. 阻塞队列首先它是一个队列,是队列就会遵循先进先出(FIFO)的原则,又因为它是阻塞的,故与普通的队列有两点区别:

      A. 当一个线程向队列里面添加数据时,如果队列是满的,那么将阻塞该线程,暂停添加数据。

      B. 当一个线程从队列里面取出数据时,如果队列是空的,那么将阻塞该线程,暂停取出数据。

    2. JUC中实现一个阻塞队列一般都会实现BlockingQueue接口,主要方法说明:

    方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
    插入数据方法 add(e) offer(e) put(e) offer(e, timeout, unit)
    移除数据方法 remove() poll() take() pull(timeout, unit)
    检查方法 element() peek() 不可用 不可用

    注:"返回特殊值"的意思是说当向队列插入(offer)数据时,会返回数据是否插入成功,成功返回true。如果是移除方法(poll),则是从队列里面取出一个数据,如果没有就返回null。

    3. jdk里提供的阻塞队列

       1). ArrayBlockingQueue

      基于数组的阻塞队列实现,其内部维护一个定长的数组,用于存储队列元素。线程阻塞的实现是通过ReentrantLock来完成的,数据的插入与取出共用同一个锁,因此ArrayBlockingQueue并不能实现生产、消费同时进行。而且在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。

      2). LinkedBlockingQueue

      基于单向链表的阻塞队列实现,在初始化LinkedBlockingQueue的时候可以指定对立的大小,也可以不指定,默认类似一个无限大小的容量(Integer.MAX_VALUE),不指队列容量大小也是会有风险的,一旦数据生产速度大于消费速度,系统内存将有可能被消耗殆尽,因此要谨慎操作。另外LinkedBlockingQueue中用于阻塞生产者、消费者的锁是两个(锁分离),因此生产与消费是可以同时进行的。

      3). PriorityBlockingQueue  一个支持优先级排序的无界阻塞队列

      4). SynchronousBlockingQueue  一个不存储原色的阻塞队列

      5). DelayQueue  一个使用优先级队列队列实现的无界阻塞队列

    4. 实现一个简单的阻塞队列,基于List实现,生产、消费共用同一个锁,而且是一个有界的队列

     1). 实现阻塞队:MyBlockingQueue

     1 package com.winter.juc;
     2 
     3 import java.util.*;
     4 import java.util.concurrent.locks.Condition;
     5 import java.util.concurrent.locks.Lock;
     6 import java.util.concurrent.locks.ReentrantLock;
     7 
     8 public class MyBlockingQueue<E> {
     9 
    10     private int count;
    11     private int capacity;
    12     private final List<E> containor;
    13 
    14     private final Lock lock = new ReentrantLock();
    15     private final Condition notFull = lock.newCondition();
    16     private final Condition notEmpty = lock.newCondition();
    17 
    18     public MyBlockingQueue(int capacity) {
    19         containor = new ArrayList<E>(capacity);
    20         this.count = 0;
    21         this.capacity = capacity;
    22     }
    23 
    24     public void put(E data) throws InterruptedException {
    25         lock.lockInterruptibly();
    26         try {
    27             while (count == capacity) {
    28                 notFull.await();
    29             }
    30             containor.add(data);
    31             this.count++;
    32             notEmpty.signal();
    33         } finally {
    34             lock.unlock();
    35         }
    36     }
    37 
    38     public E take() throws InterruptedException {
    39         lock.lockInterruptibly();
    40         try {
    41             while (count == 0) {
    42                 notEmpty.await();
    43             }
    44             this.count--;
    45             notFull.signal();
    46             E node = containor.get(0);
    47             containor.remove(0);
    48             return node;
    49         } finally {
    50             lock.unlock();
    51         }
    52     }
    53 }

    2).接下来使用该队列

    生产者:

     1 package com.winter.juc;
     2 
     3 import java.util.concurrent.TimeUnit;
     4 import org.apache.commons.lang3.RandomUtils;
     5 
     6 public class Productor implements Runnable {
     7    private MyBlockingQueue<NodeItem> queue;
     8 
     9    public Productor(MyBlockingQueue<NodeItem> queue) {
    10       this.queue = queue;
    11    }
    12 
    13    public void run() {
    14       while (true) {
    15          try {
    16             TimeUnit.SECONDS.sleep(RandomUtils.nextInt(0, 5));
    17             NodeItem node = new NodeItem();
    18             node.setKey(GlobalKey.get());
    19             System.out.println("produce a node" + node);
    20             queue.put(node);
    21          } catch (InterruptedException e) {
    22             e.printStackTrace();
    23          }
    24       }
    25    }
    26 }

    消费者:

     1 package com.winter.juc;
     2 
     3 import java.util.concurrent.TimeUnit;
     4 import org.apache.commons.lang3.RandomUtils;
     5 
     6 public class Customer implements Runnable {
     7     private MyBlockingQueue<NodeItem> queue;
     8     public Customer(MyBlockingQueue<NodeItem> queue) {
     9         this.queue = queue;
    10     }
    11 
    12     public void run() {
    13         while (true) {
    14             try {
    15                 TimeUnit.SECONDS.sleep(RandomUtils.nextInt(0, 5));
    16                 NodeItem node = queue.take();
    17                 System.out.println("consume a node" + node);
    18             } catch (InterruptedException e) {
    19                 e.printStackTrace();
    20             }
    21         }
    22 
    23     }
    24 }

    元素实体:

     1 package com.winter.juc;
     2 public class NodeItem {
     3 
     4     private int key;
     5 
     6     public int getKey() {
     7         return key;
     8     }
     9 
    10     public void setKey(int key) {
    11         this.key = key;
    12     }
    13 
    14     @Override
    15     public String toString() {
    16         return "NodeItem{" +
    17                 "key=" + key +
    18                 '}';
    19     }
    20 }

    全局键值:

     1 package com.winter.juc;
     2 import java.util.concurrent.atomic.AtomicInteger;
     3 public class GlobalKey {
     4 
     5     private static AtomicInteger key = new AtomicInteger(1);
     6 
     7     public static int get() {
     8         return key.getAndIncrement();
     9     }
    10 }

    main方法:

     1 package com.winter.juc;
     2 
     3 import java.util.concurrent.Executors;
     4 import java.util.concurrent.ExecutorService;
     5 
     6 public class ProviderService {
     7     public static void main(String[] args) {
     8 
     9         final MyBlockingQueue blockingQueue = new MyBlockingQueue(3);
    10 
    11         ExecutorService exec = Executors.newCachedThreadPool();
    12 
    13         exec.submit(new Productor(blockingQueue));
    14         exec.submit(new Customer(blockingQueue));
    15     }
    16 }

    执行结果:

    product a nodeNodeItem{key=1}
    custom a nodeNodeItem{key=1}
    product a nodeNodeItem{key=2}
    custom a nodeNodeItem{key=2}
    product a nodeNodeItem{key=3}
    custom a nodeNodeItem{key=3}
    product a nodeNodeItem{key=4}
    custom a nodeNodeItem{key=4}
    product a nodeNodeItem{key=5}
    product a nodeNodeItem{key=6}
    custom a nodeNodeItem{key=5}
    product a nodeNodeItem{key=7}
    custom a nodeNodeItem{key=6}
    custom a nodeNodeItem{key=7}
    product a nodeNodeItem{key=8}
    custom a nodeNodeItem{key=8}
    product a nodeNodeItem{key=9}
    custom a nodeNodeItem{key=9}
    product a nodeNodeItem{key=10}
    product a nodeNodeItem{key=11}
    product a nodeNodeItem{key=12}
    custom a nodeNodeItem{key=10}
    custom a nodeNodeItem{key=11}
    product a nodeNodeItem{key=13}
    product a nodeNodeItem{key=14}
    custom a nodeNodeItem{key=12}
    product a nodeNodeItem{key=15}
    custom a nodeNodeItem{key=13}
    product a nodeNodeItem{key=16}
    product a nodeNodeItem{key=17}
    custom a nodeNodeItem{key=14}
    product a nodeNodeItem{key=18}
    custom a nodeNodeItem{key=15}
    product a nodeNodeItem{key=19}
    custom a nodeNodeItem{key=16}
    custom a nodeNodeItem{key=17}
    custom a nodeNodeItem{key=18}
    product a nodeNodeItem{key=20}
    custom a nodeNodeItem{key=19}
    custom a nodeNodeItem{key=20}
    product a nodeNodeItem{key=21}
    custom a nodeNodeItem{key=21}
    product a nodeNodeItem{key=22}
    custom a nodeNodeItem{key=22}

    可以看到product和custom是对应执行的。

    后续会继续实现一个类似LinkedBlockingQueue的阻塞队列,支持锁分离。

    注:本人水平有限,如有问题,欢迎交流指出。 

  • 相关阅读:
    XAF 有条件的对象访问权限
    XAF 顯示 UnInplace Report(設置自定義條件顯示報表,不是根據選擇ListView記錄條件顯示報表)
    XAF 如何自定义PivotGrid单元格显示文本?
    XAF 如何布局详细视图上的按钮
    XAF How to set size of a popup detail view
    XAF Delta Replication Module for Devexpress eXpressApp Framework
    XAF 帮助文档翻译 EasyTest Basics(基础)
    XAF 用户双击ListView记录时禁止显示DetailView
    XAF How to enable LayoutView mode in the GridControl in List Views
    XAF 如何实现ListView单元格批量更改?
  • 原文地址:https://www.cnblogs.com/it-science/p/5721104.html
Copyright © 2011-2022 走看看