zoukankan      html  css  js  c++  java
  • 11.并发包阻塞队列之LinkedBlockingQueue

    jdk1.7.0_79 

      在上文10.并发包阻塞队列之ArrayBlockingQueue中简要解析了ArrayBlockingQueue部分源码,在本文中同样要介绍的是Java并发包中的阻塞队列LinkedBlockingQueueArrayBlockingQueue队列是由数组实现,而LinkedBlockingQueue队列的实现则是链表(单向链表)实现,所以在LinkedBlockingQueue有一个Node内部类来表示链表的节点。  

    复制代码
    static final class Node<E> { 
      E item;//入队元素 
      Node<E> next;//指向后继节点 
      Node(E x) { 
        item = x; 
      } 
    } 
    复制代码

      同样它也有3个构造方法,与ArrayBlockingQueue略有不同。

    复制代码
     1 public LinkedBlockingQueue() { 
     2   this(Integer.MAX_VALUE)//默认构造容量为int型的最大值队列 
     3 } 
     4 public LinkedBlockingQueue(int capacity) { 
     5   if (capacity <= o) throw new IllegalArgumentException(); 
     6   this.capacity = capacity; 
     7   last = head = new Node<E>(null);//头指针和尾指针指向头节点(null) 
     8 } 
     9 public LinkedBlockingQueue(Collection<? extends E> c ) { 
    10   this(Integer.MAX_VALUE); 
    11   final ReentrantLock putLock = this.putLock; 
    12   putLock.lock();//这里和ArrayBlockingQueue也会获取锁,但它同样不是为了互斥操作,同样也是为了保证其可见性。 
    13   try { 
    14       int n = 0; 
    15       for (E e : c) { 
    16           if (e == null) 
    17               throw new NullPointerException(); 
    18           if (n == capacity) 
    19               throw new IllegalStateException("Queue full"); 
    20           enqueue(new Node<E>(e));//入队 
    21           ++n; 
    22       } 
    23       count.set(n); 
    24   } finally { 
    25       putLock.unlock(); 
    26   } 
    27 } 
    复制代码

      在第12行中获取锁是为了保证可见性,这个的原因我认为是,线程T1是实例化LinkedBlockingQueue对象,T2是对实例化的LinkedBlockingQueue对象做入队操作(当然要保证T1T2的执行顺序),如果不对它进行加锁操作(加锁会保证其可见性,也就是写回主存),T1的集合c有可能只存在T1线程维护的缓存中,并没有写回主存,T2中实例化的LinkedBlockingQueue维护的缓存以及主存中并没有集合c,此时就因为可见性造成数据不一致的情况,引发线程安全问题。 

      在了解完LinkedBlockingQueue的构造方法后,我们回过头来看LinkedBlockingQueue的两个成员变量: 

    private final ReentrantLock takeLock = new ReentrantLock(); 
    private final ReentrantLock putLock = new ReentrantLock(); 

      可见LinkedBlockingQueue中有两个锁,一个是为了锁住入队操作,一个是为了锁住出队操作。而在ArrayBlockingQueue中则只有一个锁,同时锁住队列的入队、出队操作。 

    private final Condition notEmpty = takeLock.newCondition(); 
    private final Condition notFull = putLock.newCondition(); 

      这两个成员变量则是线程等待队列,一个是出队锁上的等待队列,一个是入队锁上的等待队列。在ArrayBlockingQueue也有两个等待队列,一个是非空等待队列,另一个则是非满等待队列,在这一点上两者一致。 

    队列元素的插入 

     

    抛出异常 

    返回值(非阻塞) 

    一定时间内返回值 

    返回值(阻塞) 

    插入 

    add(e)//队列未满时,返回true;队列满则抛出IllegalStateException(“Queue full”)异常——AbstractQueue 

    offer(e)//队列未满时,返回true;队列满时返回false。非阻塞立即返回。 

    offer(e, time, unit)//设定等待的时间,如果在指定时间内还不能往队列中插入数据则返回false,插入成功返回true 

    put(e)//队列未满时,直接插入没有返回值;队列满时会阻塞等待,一直等到队列未满时再插入。 

      LinkedBlockingQueue中并没有像ArrayBlockingQueue那样重写了AbstractQueueadd方法而直接调用父类的add方法,所以LinkedBlockingQueue#add方法与ArrayBlockingQueue#add一样,都是直接调用其AbstractQueue

    复制代码
    //AbstractQueue#add,这是一个模板方法,只定义add入队算法骨架,成功时返回true,失败时抛出IllegalStateException异常,具体offer实现交给子类实现。 
    public boolean add(E e) { 
      if (offer(e))//offer方法由Queue接口定义 
        return true; 
      else 
        throw new IllegalStateException(); 
    } 
    复制代码
    复制代码
     1 //LinkedBlockingQueue#offer 
     2 public boolean offer(E e) { 
     3   if (e == null) throw new NullPointerException(); 
     4   final AtomicInteger count = this.count;//原子型int变量,线程安全,指向队列数据量引用 
     5   if (count.get() == capacity) //当数据量等于队列容量时,无法入队,返回false 
     6     return false; 
     7   int c = -1; 
     8   Node<E> node = new Node(e); 
     9   final ReentrantLock putLock = this.putLock;//插入锁 
    10   putLock.lock();//获得插入锁 
    11   try { 
    12     if (count.get() < capacity) { 
    13       enqueuer(node);//入队 
    14       c = count.getAndIncrement();//队列数据总数自增+1后返回 
    15       if (c + 1 < capacity) 
    16         notFull.signal();//唤醒非满等待队列上的线程 
    17     } 
    18   } finally { 
    19     putLock.unlock(); 
    20   } 
    21   if (c == 0) 
    22     signalNotEmpty();//队列中刚好有一个数据,唤醒非空等待队列 
    23   return c >= 0 
    24 } 
    复制代码

      在第10行是获取插入锁,和ArrayBlockingQueue只有一个锁不同的是,LinkedBlockingQueue分为入队锁和出队锁,也就是说对于ArrayBlockingQueue同时只能有一个线程对它进行入队或者出队操作,而对于LinkedBlockingQueue来说同时能有两个线程对队列进行入队或者出队操作。 

      前两个addoffer方法都是非阻塞的,对于put方法则是阻塞的,线程会一直阻塞直到线程非空或者非满,但是它在阻塞时能被线程中断返回。

    复制代码
    //LinkedBlockingQueue#put 
    public void put(E e) throws InterruptedException { 
      if (e == null) throws new NullPointerException(); 
      int c = -1; 
      Node<E> node = new Node(e); 
      final ReentrantLock putLock = this.putLock; 
      final AtomicInteger count = this.count; 
      putLock.lockInterrupted();//能被线程中断地获取锁 
      try { 
        while (count.get() == capacity) {//队列数据量等于队列容量 
          notFull.await();//休眠非满等待队列上的线程 
        } 
        enqueuer(node);//入队 
        c = count.getAndIncrement();//队列数据总数自增+1后返回 
        if (c + 1 < capacity)//还没有达到队列容量 
          notFull.signal();//唤醒非满等待队列上的线程 
      } finally { 
        putLock.unlock(); 
      } 
      if (c == 0) 
      signalNotEmpty();//唤醒非空等待队列上的线程 
    } 
    复制代码

      队列插入的最后一个方法来看上面出现的enqueue入队方法。

    private void enqueuer(Node<E> node) { 
      last = last.next = node;//将LinkedBlockingQueue中指向队尾的last.next指向新加入的node节点 
    } 

    队列元素的删除 

    抛出异常 

    返回值(非阻塞) 

    一定时间内返回值 

    返回值(阻塞) 

    remove()//队列不为空时,返回队首值并移除;队列为空时抛出NoSuchElementException()异常——AbstractQueue 

    poll()//队列不为空时返回队首值并移除;队列为空时返回null。非阻塞立即返回。 

    poll(time, unit)//设定等待的时间,如果在指定时间内队列还为空则返回null,不为空则返回队首值 

    take(e)//队列不为空返回队首值并移除;当队列为空时会阻塞等待,一直等到队列不为空时再返回队首值。 

    复制代码
    //AbstractQueue#remove,同样这也是一个模板方法,定义删除队列元素的算法骨架,具体实现由子类来实现poll方法 
    public E remove() {  
      E x = poll();//poll方法由Queue接口定义  
      if (x != null)  
        return x;  
      else  
        throw new NoSuchElementException();  
    } 
    复制代码
    复制代码
    //LinkedBlockingQueue#poll 
    public E poll() { 
      final AtomicInteger count = this.count; 
      if (count.get() == 0)  
        return null; 
      E x = null; 
      int c = -1; 
      final ReentrantLock takeLock = this.takeLock; 
      takeLock.lock();//获取出队锁 
      try { 
        if (count.get() > 0) {//队列不为空 
          x = dequeuer();//出队 
          c = count.getAndDecrement();//队列数据自减-1返回       if ( c > 1)         notEmpty.signal();//唤醒非空等待队列上的线程     }   } finally {     takeLock.unlock();   }   if (c == capacity)     signalNotFull();//唤醒非满等待队列上的线程   return x; }
    复制代码

      前两个removepoll方法都是非阻塞的,对于take方法则是阻塞的,线程会一直阻塞直到线程非空或者非满,但是它在阻塞时能被线程中断返回。

    复制代码
    public E take() throws InterruptedException { 
      E x; 
      int c = -1; 
      final AtomicInteger count = this.count; 
      final ReentrantLock takeLock = this.takeLock; 
      take.lockInterruptibly();//可被线程中断返回地获取锁 
      try { 
        while (count.get() == 0) {//队列数据为空 
          notEmpty.await();//休眠非空等待队列上的线程 
        } 
        x = dequeuer();//此时非空等待队列上的线程被唤醒,队列数据不为空,出队 
        c = count.getAndDecrement(); 
      if (c > 1) 
        notEmpty.signal();//唤醒非空等待队列上的线程 
      } finally { 
        takeLock.unlock(); 
      } 
      if (c == capacity) 
        signalNotFull();//唤醒非满等待队列 
      return x; 
    } 
    复制代码

      队列出队的最后一个方法来看上面出现的dequeue入队方法

    复制代码
    private E dequeue() { 
      Node<E> h = head;//头节点,为空 
      Node<E> first = h.next; 
      h.next = h;//此时没有节点指向头节点,便于GC 
      head = first; 
      E x = first.item; 
      first.item = null; 
      return x; 
    } 
    复制代码

      最后一个方法size。

    public int size() { 
      return count.get();//和ArrayBlockingQueue类似,与ConcurrentLinkedQueue不同,没有遍历整个队列,而是直接返回count变量。此处的count是AtomicInteger变量。 
    } 

    http://www.cnblogs.com/yulinfeng/p/7004383.html

  • 相关阅读:
    html书签展示(带搜索)
    PHP 无限级分类(递归)
    文件服务器的搭建
    php swoole 和 websocket的初次碰撞
    Linux 服务管理的两种方式service和systemctl
    Jquery 代码参考
    分享几个网址二维码生成api
    WordPress 缩率图学习笔记
    Linux 究级基础入门命令整理
    ltrim的思考
  • 原文地址:https://www.cnblogs.com/yuyu666/p/10072948.html
Copyright © 2011-2022 走看看