zoukankan      html  css  js  c++  java
  • Java源码解析——集合框架(二)——ArrayBlockingQueue

    ArrayBlockingQueue源码解析

    ArrayBlockingQueue是一个阻塞式的队列,继承自AbstractBlockingQueue,间接的实现了Queue接口和Collection接口。底层以数组的形式保存数据(实际上可看作一个循环数组)。常用的操作包括 add ,offer,put,remove,poll,take,peek。

    一、类声明

    public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable

    1)AbstractQueue提供了Queue接口的默认实现。

    2)BlockingQueue接口定义了阻塞队列必须实现的方法。

    3)通过实现 java.io.Serializable 接口以启用其序列化功能。未实现此接口的类将无法使其任何状态序列化或反序列化。序列化接口没有方法或字段,仅用于标识可序列化的语义。

     

    二、成员变量

    private final E[] items;//底层数据结构
    
    private int takeIndex;//用来为下一个take/poll/remove的索引(出队)
    
    private int putIndex;//用来为下一个put/offer/add的索引(入队)
    
    private int count;//队列中元素的个数
    
    private final ReentrantLock lock;//
    
    private final Condition notEmpty;//等待出队的条件
    
    private final Condition notFull;//等待入队的条件

    三、构造方法

    ArrayBlockingQueue提供了两个构造方法:

    /**
    * 创造一个队列,指定队列容量,指定模式
    * @param fair
    * true:先来的线程先操作
    * false:顺序随机
    */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = (E[]) new Object[capacity];//初始化类变量数组items
        lock = new ReentrantLock(fair);//初始化类变量锁lock
        notEmpty = lock.newCondition();//初始化类变量notEmpty Condition
        notFull = lock.newCondition();//初始化类变量notFull Condition
    }
    
    /**
    * 创造一个队列,指定队列容量,默认模式为非公平模式
    * @param capacity <1会抛异常
    */
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    ArrayBlockingQueue的组成:一个对象数组+1把锁ReentrantLock+2个条件Condition

    三、成员方法

    • 入队方法

      ArrayBlockingQueue的添加数据方法有add,put,offer这3个方法,总结如下:

      add方法内部调用offer方法,如果队列满了,抛出IllegalStateException异常,否则返回true

      offer方法如果队列满了,返回false,否则返回true

      add方法和offer方法不会阻塞线程,put方法如果队列满了会阻塞线程,直到有线程消费了队列里的数据才有可能被唤醒。

      这3个方法内部都会使用可重入锁保证原子性。

    1)add方法:

    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

    2)offer方法:

    在队尾插入一个元素, 如果队列没满,立即返回true; 如果队列满了,立即返回false。因为使用的是ReentrantLock重入锁,所以需要显式地加锁和释放锁。

    public boolean offer(E e) {
            if (e == null)
                throw new NullPointerException();
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (count == items.length)//数组满了
                    return false;
                else {//数组没满
                    insert(e);//插入一个元素
                    return true;
                }
            } finally {
                lock.unlock();
            }
    }

    在插入元素结束后,唤醒等待notEmpty条件(即获取元素)的线程。

    /**
    * 在队尾插入一个元素,并设置了超时等待的时间
    * 如果数组已满,则进入等待,直到出现以下三种情况:
    * 1、被唤醒
    * 2、等待时间超时
    * 3、当前线程被中断
    */
    public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {
      if (e == null)
        throw new NullPointerException();
      long nanos = unit.toNanos(timeout);//将超时时间转换为纳秒
      final ReentrantLock lock = this.lock;
            /*
             * lockInterruptibly():
             * 1、 在当前线程没有被中断的情况下获取锁。
             * 2、如果获取成功,方法结束。
             * 3、如果锁无法获取,当前线程被阻塞,直到下面情况发生:
             * 1)当前线程(被唤醒后)成功获取锁
             * 2)当前线程被其他线程中断
             * 
             * lock()
             * 获取锁,如果锁无法获取,当前线程被阻塞,直到锁可以获取并获取成功为止。
             */
      lock.lockInterruptibly();//加可中断的锁
      try {
        for (;;) {
          if (count != items.length) {//队列未满
            insert(e);
            return true;
          }
          if (nanos <= 0)//已超时
            return false;
          try {
            /*
            * 进行等待:
            * 在这个过程中可能发生三件事:
            * 1、被唤醒-->继续当前这个for(;;)循环
            * 2、超时-->继续当前这个for(;;)循环
            * 3、被中断-->之后直接执行catch部分的代码
            */
            nanos = notFull.awaitNanos(nanos);//进行等待(在此过程中,时间会流失,在此过程中,线程也可能被唤醒)
          } catch (InterruptedException ie) {//在等待的过程中线程被中断
            notFull.signal(); // 唤醒其他未被中断的线程
            throw ie;
          }
        }
      } finally {
        lock.unlock();
      }
    }

    无论是第一个offer方法还是第二个offer方法都调用了insert方法,insert方法的步骤是首先添加元素,然后利用inc函数进行索引的添加,最后会唤醒因为队列中没有数据而等待被阻塞的获取数据的方法。

    private void insert(E x) {
        items[putIndex] = x; // 元素添加到数组里
        putIndex = inc(putIndex); // 放数据索引+1,当索引满了变成0
        ++count; // 元素个数+1
        notEmpty.signal(); // 使用条件对象notEmpty通知,比如使用take方法的时候队列里没有数据,被阻塞。这个时候队列insert了一条数据,需要调用signal进行通知
    }

    其中inc函数来改变索引的增加:

    final int inc(int i) {
        return (++i == items.length) ? 0 : I;
    }

    3)put方法

    /**
    * 在队尾插入一个元素
    * 如果队列满了,一直阻塞,直到数组不满了或者线程被中断
    */
    public void put(E e) throws InterruptedException {
      if (e == null)
        throw new NullPointerException();
      final E[] items = this.items;
      final ReentrantLock lock = this.lock;
      lock.lockInterruptibly();
      try {
        try {
          while (count == items.length)//队列满了,一直阻塞在这里
            /*
            * 一直等待条件notFull,即被其他线程唤醒
            * (唤醒其实就是,有线程将一个元素出队了,然后调用notFull.signal()唤醒其他等待这个条件的线程,同时队列也不慢了)
             */
            notFull.await();
        } catch (InterruptedException ie) {//如果被中断
          notFull.signal(); // 唤醒其他等待该条件(notFull,即入队)的线程
          throw ie;
        }
        insert(e);
      } finally {
        lock.unlock();
      }
    }
    • 出队方法

    ArrayBlockingQueue有不同的几个数据删除方法,poll、take、remove方法。

    ArrayBlockingQueue的删除数据方法有poll,take,remove这3个方法,总结如下:

    poll方法对于队列为空的情况,返回null,否则返回队列头部元素。

    remove方法取的元素是基于对象的下标值,删除成功返回true,否则返回false。

    poll方法和remove方法不会阻塞线程。

    take方法对于队列为空的情况,会阻塞并挂起当前线程,直到有数据加入到队列中。

    这3个方法内部都会调用notFull.signal方法通知正在等待队列满情况下的阻塞线程。

    1)poll方法

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock(); // 加锁,保证调用poll方法的时候只有1个线程
        try {
            return (count == 0) ? null : extract(); // 如果队列里没元素了,返回null,否则调用extract方法
        } finally {
            lock.unlock(); // 释放锁,让其他线程可以调用poll方法
        }
    }

    poll方法内部调用extract方法:

    private E extract() {
      final E[] items = this.items;
      E x = items[takeIndex];//获取出队元素
      items[takeIndex] = null;//将出队元素位置置空
      /*
      * 第一次出队的元素takeIndex==0,第二次出队的元素takeIndex==1
      * (注意:这里出队之后,并没有将后面的数组元素向前移)
      */
      takeIndex = inc(takeIndex);
      --count;//数组元素个数-1
      notFull.signal();//数组已经不满了,唤醒其他等待notFull条件的线程
      return x;//返回出队的元素
    }

    同样地notfull标志表示数组已经不满,可以执行被阻塞的入队操作。

    2)take方法

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly(); // 加锁,保证调用take方法的时候只有1个线程
        try {
            while (count == 0) // 如果队列空,阻塞当前线程,并加入到条件对象notEmpty的等待队列里
                notEmpty.await(); // 线程阻塞并被挂起,同时释放锁
            return extract(); // 调用extract方法
        } finally {
            lock.unlock(); // 释放锁,让其他线程可以调用take方法
        }
    }

    3)remove方法

    public boolean remove(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock(); // 加锁,保证调用remove方法的时候只有1个线程
        try {
            for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { // 遍历元素
                if (o.equals(items[i])) { // 两个对象相等的话
                    removeAt(i); // 调用removeAt方法
                    return true; // 删除成功,返回true
                }
            }
            return false; // 删除成功,返回false
        } finally {
            lock.unlock(); // 释放锁,让其他线程可以调用remove方法
        }
    }

    以及

    void removeAt(int i) {
        final Object[] items = this.items;
        if (i == takeIndex) { // 如果要删除数据的索引是取索引位置,直接删除取索引位置上的数据,然后取索引+1即可
            items[takeIndex] = null;
            takeIndex = inc(takeIndex);
        } else { // 如果要删除数据的索引不是取索引位置,移动元素元素,更新取索引和放索引的值
            for (;;) {
                int nexti = inc(i);
                if (nexti != putIndex) {
                    items[i] = items[nexti];
                    i = nexti;
                } else {
                    items[i] = null;
                    putIndex = i;
                    break;
                }
            }
        }
        --count; // 元素个数-1
        notFull.signal(); // 使用条件对象notFull通知,比如使用put方法放数据的时候队列已满,被阻塞。这个时候消费了一条数据,队列没满了,就需要调用signal进行通知 
    }

     

     

  • 相关阅读:
    开源Jabber(XMPP) IM服务器介绍
    ejabberd、jabber、jabberd、xmpp辨析
    分布式与集群的区别
    浅谈Javascript事件模拟
    理清javascript的相关概念 DOM和BOM
    js基础学习第一天(关于DOM和BOM)一
    处理机调度和死锁
    C++11 之 " = delete "
    小数的二进制表示
    二进制数的插入
  • 原文地址:https://www.cnblogs.com/winterfells/p/8870104.html
Copyright © 2011-2022 走看看