zoukankan      html  css  js  c++  java
  • Java并发编程之阻塞队列

    1、什么是阻塞队列?

    队列是一种数据结构,它有两个基本操作:在队列尾部加入一个元素,从队列头部移除一个元素。阻塞队里与普通的队列的区别在于,普通队列不会对当前线程产生阻塞,在面对类似消费者-生产者模型时,就必须额外的实现同步策略以及线程间唤醒策略。使用阻塞队列,就会对当前线程产生阻塞,当队列是空时,从队列中获取元素的操作将会被阻塞,当队列是满时,往队列里添加元素的操作也会被阻塞。

    2、主要的阻塞队列及其方法

    java.util.concurrent包下提供主要的几种阻塞队列,主要有以下几个:

    ArrayBlockingQueue:基于数组实现的阻塞队列,在创建ArrayBlockingQueue对象时必须指定其容量大小,还可以指定访问策略,默认情况下为非公平的,即不保证等待时间最长的线程最优先能够访问队列。
    LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。
    以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列。
    DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

    阻塞队列包括了非阻塞队列中的大部分方法,还提供另外若干非常有用的方法:

    put方法用来向队尾存入元素,如果队列满,则等待;
    take方法用来从队首取元素,如果队列为空,则等待;
    offer方法用来向队尾存入元素,如果队列满,则等待一定的时间,当时间期限达到时,如果还没有插入成功,则返回false;否则返回true;
    poll方法用来从队首取元素,如果队列空,则等待一定的时间,当时间期限达到时,如果取到,则返回null;否则返回取得的元素;

    下面看一段代码:

    import java.util.concurrent.ArrayBlockingQueue;
    
    /** 
     * @author 作者:徐剑   E-mail:anxu_2013@163.com 
     * @version 创建时间:2016年3月20日 下午12:52:53 
     * 类说明 
     */
    public class BlockingQueue
    {
        public static void main(String[] args) throws InterruptedException
        {
            java.util.concurrent.BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(5);
            for (int i = 0; i < 10; i++)
            {
                // 将指定元素添加到此队列中
                blockingQueue.put("加入元素" + i);
                System.out.println("向阻塞队列中添加了元素:" + i);
            }
            System.out.println("程序到此运行结束,即将退出----");
        }
    }

    当限制阻塞队列数量为5时,添加了5个元素之后,继续添加将会队列外阻塞等待,此时程序并未终止。

    当队列满了之后,我们将队首元素移除,则可以继续向阻塞队列中添加元素,代码如下:

    public class BlockingQueue
    {
        public static void main(String[] args) throws InterruptedException
        {
            java.util.concurrent.BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(5);
            for (int i = 0; i < 10; i++)
            {
                // 将指定元素添加到此队列中
                blockingQueue.put("加入元素" + i);
                System.out.println("向阻塞队列中添加了元素:" + i);
                if(i>=4)
                    System.out.println("移除队首元素"+blockingQueue.take());
            }
            System.out.println("程序到此运行结束,即将退出----");
        }
    }

    执行结果如下:

    3、阻塞队列的实现原理

    下面主要看一下ArrayBlockingQueue的实现原理。

    首先看一下ArrayBlockingQueue类的成员变量:

    public class ArrayBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
    
        /** 底层存储结构-数组 */
        final Object[] items;
    
        /** 队首元素下标 */
        int takeIndex;
    
        /** 队尾元素下标 */
        int putIndex;
    
        /**队列元素总数 */
        int count;
    
        /** 重入锁 */
        final ReentrantLock lock;
    
        /** notEmpty等待条件 */
        private final Condition notEmpty;
        /** notFull等待条件 */
        private final Condition notFull;
        /**
         * Shared state for currently active iterators, or null if there
         * are known not to be any.  Allows queue operations to update
         * iterator state.
         */
        transient Itrs itrs = null;

    可以看到,ArrayBlockingQueue用来存储元素的实际上是一个数组。

    再看下ArrayBlockingQueue两个重要方法的实现,put()和take():

    public void put(E e) throws InterruptedException
        {
            //先检查e是否为空
            checkNotNull(e);
            //获取锁
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try
            {
                //当队列已满,进入条件等待
                while (count == items.length)
                    notFull.await();
                //队列不满,进行入队列操作
                enqueue(e);
            } 
            finally
            {
                //释放锁
                lock.unlock();
            }
        }

    再看下具体的入队操作:

    private void enqueue(E x)
        {
            final Object[] items = this.items;
            //队尾入队
            items[putIndex] = x;
            if (++putIndex == items.length)
                putIndex = 0;
            //队列总数+1
            count++;
            //notempty条件的等待集中随机选择一个线程,解除其阻塞状态
            notEmpty.signal();
        }

    下面是take()方法的源代码:

    public E take() throws InterruptedException
        {
            //获取锁
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try
            {
                //队列为空
                while (count == 0)
                    //线程加入notEmpty条件等待集
                    notEmpty.await();
                //非空,出队列
                return dequeue();
            } finally
            {
                //释放锁
                lock.unlock();
            }
        }

    4、阻塞队列的应用:实现消费者-生产者模式

    /** 
     * @author 作者:徐剑   E-mail:anxu_2013@163.com 
     * @version 创建时间:2016年3月20日 下午2:21:55 
     * 类说明:阻塞队列实现的消费者-生产者模式
     */
    public class Test
    {
        private int queueSize = 10;
        private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize);
        public static void main(String[] args)
        {
            Test test = new Test();
            Producer producer = test.new Producer();
            Consumer consumer = test.new Consumer();
            producer.start();
            consumer.start();
        }
        class Consumer extends Thread
        {
            @Override
            public void run()
            {
                consume();
            }
    
            private void consume()
            {
                while (true)
                {
                    try
                    {
                        queue.take();
                        System.out.println("从队列取走一个元素,队列剩余" + queue.size() + "个元素");
                    } catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                }
            }
        }
        class Producer extends Thread
        {
            @Override
            public void run()
            {
                produce();
            }
            private void produce()
            {
                while (true)
                {
                    try
                    {
                        queue.put(1);
                        System.out.println("向队列取中插入一个元素,队列剩余空间:"+ (queueSize - queue.size()));
                    } catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    5、参考资料

    http://www.cnblogs.com/dolphin0520/p/3932906.html

  • 相关阅读:
    Leetcode 238. Product of Array Except Self
    Leetcode 103. Binary Tree Zigzag Level Order Traversal
    Leetcode 290. Word Pattern
    Leetcode 205. Isomorphic Strings
    Leetcode 107. Binary Tree Level Order Traversal II
    Leetcode 102. Binary Tree Level Order Traversal
    三目运算符
    简单判断案例— 分支结构的应用
    用switch判断月份的练习
    java基本打印练习《我行我素购物系统》
  • 原文地址:https://www.cnblogs.com/wxd0108/p/5309787.html
Copyright © 2011-2022 走看看