zoukankan      html  css  js  c++  java
  • 并发队列之ArrayBlockingQueue

      上一篇我们说了并发队列中的LinkedBlockingQueue队列,这次我们看看ArrayBlockingQueue,看看名字,我们想象一下LinkedList和ArrayList的区别,我们可以知道ArrayBlockingQueue底层肯定是基于数组实现的,这是一个有界数组;

      ArrayBlockingQueue其中的组成部分和LinkedBlockingQueue及其相似,也是有两个条件变量,维护阻塞队列,实现了生产消费者模式;

    一.简单认识ArrayBlockingQueue

      先看看几个常用属性:

    //数组用于存放队列元素
    final Object[] items;
    //出队索引
    int takeIndex;
    //入队索引
    int putIndex;
    //队列中元素数量
    int count;
    //独占锁
    final ReentrantLock lock;
    //如果数组中为空,还有线程取数据,就丢到这个条件变量中来阻塞
    private final Condition notEmpty;
    //队列满了,还有线程往数组中添加数据,就把线程丢到这里来阻塞
    private final Condition notFull;

      由于这是一个有界的数组,我们再看看构造器:

    //指定容量,默认是非公平策略
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
    //指定容量和独占锁的策略
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    //可以指定容量,锁的策略,还有初始化数据
    public ArrayBlockingQueue(int capacity, boolean fair,
                                Collection<? extends E> c) {
        this(capacity, fair);
    
        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

    二.offer方法

      向队列尾部添加一个元素,添加成功就返回true,队列满了就丢掉当前元素直接返回false,方法不阻塞;

    public boolean offer(E e) {
        //非空检验
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //如果数组中实际数量和最大容量相等,添加失败,返回false
            if (count == items.length)
                return false;
            else {
                //添加成功,方法实现在下面
                enqueue(e);
                return true;
            }
        } finally {
            //释放锁
            lock.unlock();
        }
    }
    private void enqueue(E x) {
       //拿到数组
        final Object[] items = this.items;
        //在putIndex这个位置放入数据x,然后把putIndex加一,说明这个参数表示的是下一个数据要放入的位置的索引
        items[putIndex] = x;
        //这里putIndex是先加一然后再比较是否相等,比如这里数组的最大容量是5,那么索引的最大值应该是4,而如果putIndex等于5了,说明数组
        //越界了,加把这个索引重置为0
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        //添加完成之后,说明了数组中有数据了,这里会唤醒之前因为去数组中取数据而阻塞的线程
        notEmpty.signal();
    }

    三.put方法

      向队列尾部插入一个元素,队列有空闲就插入成功返回true,队列满了就阻塞当前线程到notFull的条件队列中,等有空闲之后就会被唤醒;阻塞过程中对中断会有响应的;

    public void put(E e) throws InterruptedException {
        //非空检查
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        //注意该锁的获取方式
        lock.lockInterruptibly();
        try {
            //如果线程满了,就把当前线程放到notFull条件变量的阻塞队列中
            while (count == items.length)
                notFull.await();
            //没有满,就添加数据
            enqueue(e);
        } finally {
            //释放锁
            lock.unlock();
        }
    }

    四.poll方法

      头部获取并移除一个元素,如果队列为空,就返回null,方法不阻塞;

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //如果队列为空,就返回null
            //如果队列不为空,就调用dequeue方法获取并删除队列头部的元素
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }
    private E dequeue() {
        //获取数组
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        //获取takeIndex位置的元素,最后会将这个返回
        E x = (E) items[takeIndex];
        //然后将takeInde位置置为空
        items[takeIndex] = null;
        //如果takeIndex已经是数组的最后一个位置了,就将takeIndex重置为0
        if (++takeIndex == items.length)
            takeIndex = 0;
        //实际数量减一
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        //唤醒notFull中线程
        notFull.signal();
        return x;
    }

    五.take方法

      获取并删除当前队列头部的元素,如果队列为空当前线程阻塞直到被唤醒,对中断有响应;

     public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            //可中断的方式获取锁
            lock.lockInterruptibly();
            try {
                //如果数组为空,此时就唤醒notEmpty中条件队列里的线程
                while (count == 0)
                    notEmpty.await();
                //获取并删除头节点
                return dequeue();
            } finally {
                lock.unlock();
            }
        }

    六.peek方法

      只是获取头部元素,不删除,如果队列为空就返回null,这个方法是线程不阻塞的

    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex); // null when queue is empty
        } finally {
            lock.unlock();
        }
    }
    
    //获取到数组中索引为takeIndex中的数据
    @SuppressWarnings("unchecked")
    final E itemAt(int i) {
        return (E) items[i];
    }

    七.总结

      理解了上一篇博客中说的LinkedBlockingQueue,那么再看这一篇其实太容易了,就是操作数组嘛!用下面这个图表示:

  • 相关阅读:
    Java读写文本文件操作
    java常用的文件读写操作
    CentOS yum 源的配置与使用
    每天一个linux命令目录
    Linux的概念与体系
    linux ACL权限规划:getfacl,setfacl使用
    基于大数据的电影网站项目开发之HBase分布式安装(四)
    基于大数据的电影网站项目开发之阶段性总结(三)
    基于大数据的电影网站项目开发之Hadoop2.6.0伪分布式设置(二)
    基于大数据的电影网站项目开发之CentOS的安装(一)
  • 原文地址:https://www.cnblogs.com/wyq1995/p/12287327.html
Copyright © 2011-2022 走看看