zoukankan      html  css  js  c++  java
  • 阻塞队列概念及其简单使用

    什么是阻塞队列

      概念

        当队列满的时候,插入元素的线程被阻塞,直到队列不满

        队列为空的时候,获取元素的线程被阻塞,直到队列不为空

        生产者消费者模式也是阻塞队列的一种体现

      常用阻塞队列

        ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列

        LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列

        PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列

        DelayQueue:一个使用优先级队列实现的无界阻塞队列

        SynchronousQueue:一个不存储元素的阻塞队列

        LinkedTransferQueue:一个由链表结构组成的无界阻塞队列

        LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列

      常用方法

    方法 抛出异常 返回值 一直阻塞 超时退出
    插入方法 add offer put offer(timeout)
    移除方法 remove poll take poll(timeout)
    检查方法 element peek N/A N/A

      介绍:

        ArrayBlockingQueue:按照先进先出的原则,初始化需要设置大小

        LinkedBlockingQueue:按照先进先出的原则,可以不设置初始大小,不设置,默认就会Integer.MAX_VALUE

        区别:

          锁:ArrayBlockingQueue,只使用了一把锁,而LinkedBlockingQueue使用了2把

          实现:ArrayBlockingQueue直接插入元素,LinkedBlockingQueue需要转换

          初始化:ArrayBlockingQueue必须指定初始化大小,LinkedBlockingQueue可以不指定

        PriorityBlockingQueue:默认采用自然顺序排序,就1<2,A>B... ,如果想自定义顺序有要么实现CompateTo方法,要么指定构造参数Comparator,如果一致,PriorityBlockingQueue不保证优先级顺序

        DelayQueue:支持延时获取元素的阻塞队列,内部使用PriorityBlockingQueue,元素必须实现Delayed接口才允许放入

        SynchronousQueue:每一个put操作,都要等待一个take操作,类似于等待唤醒机制

        LinkedTransferQueue: 相比于LinkedBlockingQueue多了两个方法,transfer和tryTransfer,transfer在往队列里面插入元素之前,先看一下有没有消费者在等着,如果有,直接把元素交给消费者,省去了插入和,取出的步骤,tryTransfer尝试把元素给消费者,无论消费者是否接收,都会立即返回,transfer必须要消费者消费之后,才会返回

        LinkedBlockingDeque:可以从队列的头部和尾部都可以插入和移除元素,可以在有竞争的时候从两侧获取元素,减少一半的时间,在ForkJoin中的工作密取机制就是采用的LinkedBlockingDeque实现的,凡是方法名带了First的都是从头去拿,带了Last都是从尾部拿,不加的话,默认add等于addLast,remove等于removeFirst,take方法等于takeFirst

        建议:尽量采用有界阻塞队列, 因为在流量高峰的时候,无界阻塞队列会不断的增加占用资源,可能导致服务器宕机

      案例:

        使用DelayQueue实现延时订单功能

    定义元素容器类

    package org.dance.day5.bq;
    
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 存放到队列的元素
     * @author ZYGisComputer
     */
    public class ItemVo<T> implements Delayed {
    
        /**
         * 延时时间 单位为毫秒
         */
        private long activeTime;
    
        private T data;
    
        public ItemVo(long activeTime, T data) {
            // 将时间转换为纳秒 并 + 当前纳秒 = 将超时时长转换为超时时刻
            this.activeTime = TimeUnit.NANOSECONDS.convert(activeTime,TimeUnit.MILLISECONDS) + System.nanoTime();
            this.data = data;
        }
    
        /**
         * 返回元素的剩余时间
         * @param unit
         * @return
         */
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(this.activeTime - System.nanoTime(), TimeUnit.NANOSECONDS);
        }
    
        /**
         * 按照剩余时间排序
         * @param o
         * @return
         */
        @Override
        public int compareTo(Delayed o) {
            long d = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
            return (d==0)?0:(d>0)?1:-1;
        }
    
        public long getActiveTime() {
            return activeTime;
        }
    
        public T getData() {
            return data;
        }
    }

    定义订单实体类

    package org.dance.day5.bq;
    
    /**
     * 订单实体类
     * @author ZYGisComputer
     */
    public class Order {
    
        private final String orderNo;
    
        private final double orderMoney;
    
        public Order(String orderNo, double orderMoney) {
            this.orderNo = orderNo;
            this.orderMoney = orderMoney;
        }
    
        public String getOrderNo() {
            return orderNo;
        }
    
        public double getOrderMoney() {
            return orderMoney;
        }
    }

    定义订单放入线程

    package org.dance.day5.bq;
    
    import java.util.concurrent.DelayQueue;
    
    /**
     * 将订单放入队列
     * @author ZYGisComputer
     */
    public class PutOrder implements Runnable{
    
        private DelayQueue<ItemVo<Order>> queue;
    
        public PutOrder(DelayQueue<ItemVo<Order>> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            // 5秒到期
            Order order = new Order("1",16.00d);
            // 包装成ItemVo
            ItemVo<Order> itemVo = new ItemVo<>(5000,order);
            queue.offer(itemVo);
            System.out.println("订单5秒后到期:"+order.getOrderNo());
    
            // 5秒到期
            order = new Order("2",18.00d);
            // 包装成ItemVo
            itemVo = new ItemVo<>(8000,order);
            queue.offer(itemVo);
            System.out.println("订单8秒后到期:"+order.getOrderNo());
        }
    }

    定义订单消费线程

    package org.dance.day5.bq;
    
    import java.util.concurrent.DelayQueue;
    
    /**
     * 获取订单
     *
     * @author ZYGisComputer
     */
    public class FetchOrder implements Runnable {
    
        private DelayQueue<ItemVo<Order>> queue;
    
        public FetchOrder(DelayQueue<ItemVo<Order>> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            while (true) {
                ItemVo<Order> poll = null;
                try {
                    poll = queue.take();
                    Order data = poll.getData();
                    String orderNo = data.getOrderNo();
                    System.out.println(orderNo + "已经消费");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    e.printStackTrace();
                }
            }
        }
    }

    测试类

    package org.dance.day5.bq;
    
    import java.util.Queue;
    import java.util.concurrent.DelayQueue;
    
    /**
     * @author ZYGisComputer
     */
    public class Test {
    
        public static void main(String[] args) throws InterruptedException {
    
            DelayQueue<ItemVo<Order>> queue = new DelayQueue<>();
            new Thread(new PutOrder(queue)).start();
            new Thread(new FetchOrder(queue)).start();
            for (int i = 0; i < 15; i++) {
                Thread.sleep(500);
                System.out.println(i*500);
            }
    
        }
    
    }

    执行结果

    订单5秒后到期:1
    订单8秒后到期:2
    0
    500
    1000
    1500
    2000
    2500
    3000
    3500
    4000
    4500
    1已经消费
    5000
    5500
    6000
    6500
    7000
    2已经消费

    通过执行结果分析,可以得知,两个放入阻塞队列的元素已经成功被消费掉了,当然这种阻塞队列也可以用于别的场景,比如实现本地延时缓存,限时缴费....

    作者:彼岸舞

    时间:2021111

    内容关于:并发编程

    本文来源于网络,只做技术分享,一概不负任何责任

  • 相关阅读:
    东方国信 - 软件开发人员面试问卷(ver1.001.002)
    Traceback (most recent call last): File "setup.py", line 22, in <module> execfile(join(CURDIR, 'src', 'SSHLibrary', 'version.py')) NameError: name 'execfile' is not defined
    python学习笔记一
    A strange lift
    A strange lift
    Tempter of the Bone
    Tempter of the Bone
    Rescue
    Rescue
    Red and Black
  • 原文地址:https://www.cnblogs.com/flower-dance/p/14261864.html
Copyright © 2011-2022 走看看