zoukankan      html  css  js  c++  java
  • BlockingQueue原理

    概念

    BlockingQueue 翻译成中文阻塞队列,顾名思义就是线程使用队列时会阻塞当前线程;

    BlockingQueue 继承了Collection,具有一般集合所具有的数据存取功能

    BlockingQueue 是线程安全的队列,多线程访问时不会出现同一个数据集中的数据被多次取出,或者覆盖存放的事件

    使用场景

    可用于一个快速反馈的消息队列,无消息时阻塞线程让出CPU,有数据存入时通知线程取出数据,取完后继续阻塞,

    比如用户下单后立刻在大屏上显示有客户下单,比较简单的做法是开启一个定时任务,定期扫订单表;或者接入消息中间件,下单时发送消息,大屏服务监听消息;或者借用reddis队列 解决方式有很多种不一一列举

    示例模拟数据的存取 设置队列的容量为1 是为了更好展示 存取的阻塞特性

    public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue(1);
    
        //模拟存入数据线程
        new Thread(()->{
            int i=0;
            while (true){
                try {
                    //每次循环+1
                    i++;
                    queue.put(i);
                    System.out.println("存入数据"+i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "存入数据线程").start();
    
        //模拟取出数据线程 1秒钟取一个
        new Thread(()->{
            while (true){
                try {
                    //一秒钟取一个数据
                    Thread.sleep(1000);
                    Integer result = queue.take();
                    System.out.println("取出数据"+result);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "取数据线程").start();
    
    }
    
    打印结果:
    存入数据1
    取出数据1
    存入数据2
    取出数据2
    存入数据3
    取出数据3
    存入数据4
    取出数据4
    存入数据5
    取出数据5
    存入数据6
    取出数据6
    存入数据7
    取出数据7

    方法示例

    阻塞队列的使用非常简单,基本上和普通集合一样对数据进行存和取

    public static void main(String[] args) throws InterruptedException {
            ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue(2);
    
            //存入一个数据 如果队列满了则一直阻塞到有数据取出
            queue.put(1);
            //取出一个数据 如果队列空了则一直阻塞到有数据存入
            queue.take();
    
            //存入一个数据 如果队列满了则阻塞若干时长(示例为10秒),超时则返回offerResult=false
            boolean offerResult = queue.offer(1, 10, TimeUnit.SECONDS);
            //取出一个数据 如果队列空了则阻塞若干时长(示例为10秒),超时则返回pollResult=null
            Integer pollResult = queue.poll(10, TimeUnit.SECONDS);
    }

    源码分析

    1、接口继承结构 

    2、接口代码

    public interface BlockingQueue<E> extends Queue<E> {
        //向队列中添加元素, 若超过给定队列长度抛出异常
        boolean add(E e);
    
        //向队列中添加元素, 若超过给定队列长度抛出异常
        boolean offer(E e);
    
        //向队列中添加元素, 若超过队列长度则等待队列有剩余容量再加入元素
        void put(E e) throws InterruptedException;
    
        //向队列中添加元素, 若超过给定队列长度则等待给定时长
        boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException;
    
        //获取队列头部元素,并从队列头部移除,若队列为空,则阻塞当前获取线程,并等待新元素加入
        E take() throws InterruptedException;
    
        //获取队列头部元素,并从队列头部移除,若队列为空,则阻塞当前获取线程,并等待元素给定时长
        E poll(long timeout, TimeUnit unit)
            throws InterruptedException;
    
        //返回队列剩余容量
        int remainingCapacity();
    
        //移除指定元素
        boolean remove(Object o);
    
        //返回是否存在指定元素
        public boolean contains(Object o);
    
        
        //将队列中的元素全部移除到给定的集合c中
        int drainTo(Collection<? super E> c);
    
        //将队列中的元素全部移除到给定的集合c中(最多不超过maxElements个)
        int drainTo(Collection<? super E> c, int maxElements);
    }

    3、实现类 ArrayBlockingQueue 分析

    public class ArrayBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
    
        //数据集 用于存放元素 初始化固定数组长度 不再扩容
        final Object[] items;
    
        //数据集下一次取数据的下标
        //具体操作为 每次take加1 若take+1==items.length即take最后一个元素
        //则takeIndex重置为0 如此往复
        int takeIndex;
    
        //数据集下一次存数据下标
        int putIndex;
    
        //数据集中 存放元素的个数 即items[i]!=null的个数
        int count;
    
        //重入锁 可选公平与非公平 非本文重点
        final ReentrantLock lock;
    
        //Condition 
        //使用流程 1取数据为空(count==0) 则阻塞等待数据集存入数据 执行等待notEmpty.await 
        //         2存数据数据集肯定不为空(count!=0), 则通知取数据线程继续取数据 执行通知notEmpty.signal
        private final Condition notEmpty;
    
        //Condition 
        //使用流程 1存数据数据集存满(count==items.length)则等待消耗后重新存入 执行等待notFull.await 
        //         2取数据后则数据集未满肯定不满(count<items.length) 则通知存入数据 执行通知notFull.signal 
        private final Condition notFull;
    
        //用户维护ArrayBlockingQueue 作为集合的迭代(Iterator)功能
        //调用ArrayBlockingQueue.iterator()是初始化此属性 非本文重点
        transient Itrs itrs = null;
    
    
    
        //--------------------重点方法------------------------
        
    
        /**
         * 从队列中取一个元素
         * @return [description]
         * @throws InterruptedException [description]
         */
        public E take() throws InterruptedException {
            //对操作进行加锁 多线程时轮流取元素
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                //如果队列中没有对象 则阻塞线程等待
                while (count == 0)
                    //重点:等待存数据的线程通知
                    notEmpty.await();
                //代码运行到此处说明count!=null 执行从队列中取元素
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
    
        private E dequeue() {
            final Object[] items = this.items;
            //从数据集数组items 取出下标takeIndex的数据
            @SuppressWarnings("unchecked")
            E x = (E) items[takeIndex];
            //取完数据之后 将数组对应下标应用置为空(GC对象)
            items[takeIndex] = null;
            //takeIndex+1等于数组长度表示当前下标为数组最后一个对象
            //则takeIndex重新归0
            if (++takeIndex == items.length)
                takeIndex = 0;
            //每次取数据 数据总量减1
            count--;
            //迭代器维护操作
            if (itrs != null)
                itrs.elementDequeued();
            //重点:通知存数据的线程 可以执行数据存放
            notFull.signal();
            return x;
        }
    
    
        /**
         * 存入一个数据
         * @param  e                    [description]
         * @throws InterruptedException [description]
         */
        public void put(E e) throws InterruptedException {
            //校验数据非空
            checkNotNull(e);
            //加锁
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                //若数据集数组items满了 则阻塞线程等待
                while (count == items.length)
                    //重点:等待取出数据的线程通知
                    notFull.await();
                //存入数据
                enqueue(e);
            } finally {
                lock.unlock();
            }
        }
    
        private void enqueue(E x) {
            final Object[] items = this.items;
            //存入数据到下标putIndex
            items[putIndex] = x;
            //如果存数据的下标已经到数据最后一个下标 则putIndex重新归0
            if (++putIndex == items.length)
                putIndex = 0;
            //数据总量加1
            count++;
            //重点:存入数据后通知等待取数据的线程
            notEmpty.signal();
        }
    
    
    
    
    }

     总结:

    BlockingQueue 重点关注

    1、阻塞方式

    Condition notFull 和 Condition notEmpty 的使用,存通知取,取通知存;

    从而达到存满阻塞,取完阻塞,存入通知取,取出通知存的功能

    2、存取游标

    takeIndex 和 putIndex的使用,每次取数据takeIndex加1,到了数据末尾则重新回到数组开始下标0,存数据原理相似逐次加1,到末尾归0

    对于LinkedBlockingQueue实现方式则略有不同,链表式集合多线程取数据时只需要排队从头部节点获取,从末尾存数据,有个小优化,创建LinkedBlockingQueue

    时创建一个虚拟头部节点,不做深究

  • 相关阅读:
    HDU.2087 剪花布条
    一个acm过来人的心得
    一个acm过来人的心得
    HDU.2190 悼念512汶川大地震遇难同胞——重建希望小学
    HDOJ.2501 Tiling_easy version
    HDOJ.2501 Tiling_easy version
    HDU
    poj3216 Prime Path(BFS)
    poj1426 Find The Multiple (DFS)
    Rikka with Nickname (简单题)
  • 原文地址:https://www.cnblogs.com/xieyanke/p/13441318.html
Copyright © 2011-2022 走看看