zoukankan      html  css  js  c++  java
  • 多线程编程学习六(Java 中的阻塞队列).

    介绍

    阻塞队列(BlockingQueue)是指当队列满时,队列会阻塞插入元素的线程,直到队列不满;当队列空时,队列会阻塞获得元素的线程,直到队列变非空。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

    当线程 插入/获取 动作由于队列 满/空 阻塞后,队列也提供了一些机制去处理,或抛出异常,或返回特殊值,或者线程一直等待...

    方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
    插入方法 add(e) offer(e) put(e) offer(e, timeout, unit)
    移除方法 remove(o) poll() take() poll(timeout, unit)
    检查方法 element() peek() — 不移除元素 不可用 不可用

    tips: 如果是无界阻塞队列,则 put 方法永远不会被阻塞;offer 方法始终返回 true。

    Java 中的阻塞队列:

    ArrayBlockingQueue

    ArrayBlockingQueue 是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序,默认情况下不保证线程公平的访问。

    通过可重入的独占锁 ReentrantLock 来控制并发,Condition 来实现阻塞。

    public class ArrayBlockingQueueTest {
    
        /**
         * 1. 由于是有界阻塞队列,需要设置初始大小
         * 2. 默认不保证阻塞线程的公平访问,可设置公平性
         */
        private static ArrayBlockingQueue<String> QUEUE = new ArrayBlockingQueue<>(2, true);
    
        public static void main(String[] args) throws InterruptedException {
    
            Thread put = new Thread(() -> {
                // 3. 尝试插入元素
                try {
                    QUEUE.put("java");
                    QUEUE.put("javaScript");
                    // 4. 元素已满,会阻塞线程
                    QUEUE.put("c++");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            put.start();
            Thread take = new Thread(() -> {
                try {
                    // 5. 获取一个元素
                    System.out.println(QUEUE.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            take.start();
            // 6 javaScript、c++
            System.out.println(QUEUE.take());
            System.out.println(QUEUE.take());
        }
    }
    

    LinkedBlockingQueue

    LinkedBlockingQueue 是一个用单向链表实现的有界阻塞队列。此队列的默认最大长度为 Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序,吞吐量通常要高于ArrayBlockingQueue。Executors.newFixedThreadPool() 就使用了这个队列。

    和 ArrayBlockingQueue 一样,采用 ReentrantLock 来控制并发,不同的是它使用了两个独占锁来控制消费和生产,通过 takeLock 和 putLock 两个锁来控制生产和消费,互不干扰,只要队列未满,生产线程可以一直生产;只要队列不空,消费线程可以一直消费,不会相互因为独占锁而阻塞。

    tips:因为使用了双锁,避免并发计算不准确,使用了一个 AtomicInteger 变量统计元素总量。

    LinkedBlockingDeque

    LinkedBlockingDeque 是一个由双向链表结构组成的有界阻塞队列,可以从队列的两端插入和移出元素。它实现了BlockingDeque接口,多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以 First 单词结尾的方法,表示插入、获取或移除双端队列的第一个元素。以 Last 单词结尾的方法,表示插入、获取或移除双端队列的最后一个元素。

    LinkedBlockingDeque 的 Node 实现多了指向前一个节点的变量 prev,以此实现双向队列。并发控制上和 ArrayBlockingQueue 类似,采用单个 ReentrantLock 来控制并发。因为双端队列头尾都可以消费和生产,所以使用了一个共享锁。

    双向阻塞队列可以运用在“工作窃取”模式中。

    public class LinkedBlockingDequeTest {
    
        private static LinkedBlockingDeque<String> DEQUE = new LinkedBlockingDeque<>(2);
    
        public static void main(String[] args) {
            DEQUE.addFirst("java");
            DEQUE.addFirst("c++");
            // java
            System.out.println(DEQUE.peekLast());
            // java
            System.out.println(DEQUE.pollLast());
            DEQUE.addLast("php");
            // c++
            System.out.println(DEQUE.pollFirst());
        }
    }
    

    tips: take() 方法调用的是 takeFirst(),使用时候需注意。

    PriorityBlockingQueue

    PriorityBlockingQueue 是一个底层由数组实现的无界阻塞队列,并带有排序功能。由于是无界队列,所以插入永远不会被阻塞。默认情况下元素采取自然顺序升序排列。也可以自定义类实现 compareTo()方法来指定元素排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素进行排序。

    底层同样采用 ReentrantLock 来控制并发,由于只有获取会阻塞,所以只采用一个Condition(只通知消费)来实现。

    public class PriorityBlockingQueueTest {
    
        private static PriorityBlockingQueue<String> QUEUE = new PriorityBlockingQueue<>();
    
        public static void main(String[] args) {
            QUEUE.add("java");
            QUEUE.add("javaScript");
            QUEUE.add("c++");
            QUEUE.add("python");
            QUEUE.add("php");
            Iterator<String> it = QUEUE.iterator();
            while (it.hasNext()) {
                // c++  javaScript  java  python  php
                // 同优先级不保证排序顺序
                System.out.print(it.next() + "  ");
            }
        }
    }
    

    DelayQueue

    DelayQueue 是一个支持延时获取元素的无界阻塞队列。队列使用 PriorityQueue 来实现。队列中的元素必须实现 Delayed 接口(Delayed 接口的设计可以参考 ScheduledFutureTask 类),元素按延迟优先级排序,延迟时间短的排在前面,只有在延迟期满时才能从队列中提取元素。

    DelayQueue 中的 PriorityQueue 会对队列中的任务进行排序。排序时,time 小的排在前面(时间早的任务将被先执行)。如果两个任务的 time 相同,就比较 sequenceNumber,sequenceNumber 小的排在前面(也就是说,如果两个任务的执行时间相同,那么先提交的任务将被先执行)。

    和 PriorityBlockingQueue 相似,底层也是数组,采用一个 ReentrantLock 来控制并发。

    应用场景:

    1. 缓存系统的设计:可以用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询 DelayQueue,一旦能从 DelayQueue 中获取元素时,表示缓存有效期到了。
    2. 定时任务调度:使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从 DelayQueue 中获取到任务就开始执行,比如 TimerQueue 就是使用 DelayQueue 实现的。
    public class DelayElement implements Delayed, Runnable {
    
        private static final AtomicLong SEQUENCER = new AtomicLong();
        /**
         * 标识元素先后顺序
         */
        private final long sequenceNumber;
        /**
         * 延迟时间,单位纳秒
         */
        private long time;
    
        public DelayElement(long time) {
            this.time = System.nanoTime() + time;
            this.sequenceNumber = SEQUENCER.getAndIncrement();
        }
    
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(time - System.nanoTime(), NANOSECONDS);
        }
    
        @Override
        public int compareTo(Delayed other) {
            // compare zero if same object
            if (other == this) {
                return 0;
            }
            if (other instanceof DelayElement) {
                DelayElement x = (DelayElement) other;
                long diff = time - x.time;
                if (diff < 0) {
                    return -1;
                } else if (diff > 0) {
                    return 1;
                } else if (sequenceNumber < x.sequenceNumber) {
                    return -1;
                } else {
                    return 1;
                }
            }
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }
    
        @Override
        public void run() {
            System.out.println("sequenceNumber" + sequenceNumber);
        }
    
        @Override
        public String toString() {
            return "DelayElement{" + "sequenceNumber=" + sequenceNumber + ", time=" + time + '}';
        }
    }
    
    public class DelayQueueTest {
    
        private static DelayQueue<DelayElement> QUEUE = new DelayQueue<>();
    
        public static void main(String[] args) {
            // 1. 添加 10 个参数
            for (int i = 1; i < 10; i++) {
                // 2. 5 秒内随机延迟
                int nextInt = new Random().nextInt(5);
                long convert = TimeUnit.NANOSECONDS.convert(nextInt, TimeUnit.SECONDS);
                QUEUE.offer(new DelayElement(convert));
            }
            // 3. 查询元素排序 —— 延迟短的排在前面
            Iterator<DelayElement> iterator = QUEUE.iterator();
            while (iterator.hasNext()) {
                System.out.println(iterator.next());
            }
            // 4. 可观察到元素延迟输出
            while (!QUEUE.isEmpty()) {
                Thread thread = new Thread(QUEUE.poll());
                thread.start();
            }
        }
    }
    

    LinkedTransferQueue

    LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。

    并发控制上采用了大量的 CAS 操作,没有使用锁。

    相对于其他阻塞队列,LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。

    1. transfer : Transfers the element to a consumer, waiting if necessary to do so. 存入的元素必须等到有消费者消费才返回。
    2. tryTransfer:Transfers the element to a waiting consumer immediately, if possible. 如果有消费者正在等待消费元素,则把传入的元素传给消费者。否则立即返回 false,不用等到消费。

    SynchronousQueue

    SynchronousQueue 是一个不存储元素的阻塞队列。每一个 put 操作必须等待一个 take 操作,否则继续 put 操作会被阻塞。Executors.newCachedThreadPool 就使用了这个队列。

    SynchronousQueue 默认情况下线程采用非公平性策略访问队列,未使用锁,全部通过 CAS 操作来实现并发,吞吐量非常高,高于 LinkedBlockingQueue 和 ArrayBlockingQueue,非常适合用来处理一些高效的传递性场景。Executors.newCachedThreadPool() 就使用了 SynchronousQueue 进行任务传递。

    public class SynchronousQueueTest {
    
        private static class SynchronousQueueProducer implements Runnable {
    
            private BlockingQueue<String> blockingQueue;
    
            private SynchronousQueueProducer(BlockingQueue<String> queue) {
                this.blockingQueue = queue;
            }
    
            @Override
            public void run() {
                while (true) {
                    try {
                        String data = UUID.randomUUID().toString();
                        System.out.println(Thread.currentThread().getName() + " Put: " + data);
                        blockingQueue.put(data);
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        private static class SynchronousQueueConsumer implements Runnable {
    
            private BlockingQueue<String> blockingQueue;
    
            private SynchronousQueueConsumer(BlockingQueue<String> queue) {
                this.blockingQueue = queue;
            }
    
            @Override
            public void run() {
                while (true) {
                    try {
                        System.out.println(Thread.currentThread().getName() + " take(): " + blockingQueue.take());
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        public static void main(String[] args) {
    
            final BlockingQueue<String> synchronousQueue = new SynchronousQueue<>();
            SynchronousQueueProducer queueProducer = new SynchronousQueueProducer(synchronousQueue);
            new Thread(queueProducer, "producer - 1").start();
            SynchronousQueueConsumer queueConsumer1 = new SynchronousQueueConsumer(synchronousQueue);
            new Thread(queueConsumer1, "consumer — 1").start();
            SynchronousQueueConsumer queueConsumer2 = new SynchronousQueueConsumer(synchronousQueue);
            new Thread(queueConsumer2, "consumer — 2").start();
        }
    }
    

     
     

    1. 参考书籍:《Java 并发编程的艺术》
    2. 参考博文:https://www.cnblogs.com/konck/p/9473677.html
  • 相关阅读:
    Commix命令注入漏洞利用
    漏洞扫描
    信息搜集神器
    peepingtom
    自动化安装一些渗透工具的脚本
    MITMF
    Xssf配合metaspolit使用
    Beef安装与简单使用
    Linux安全检测常用方法
    cobaltstrike3.8服务器搭建及使用
  • 原文地址:https://www.cnblogs.com/jmcui/p/11442616.html
Copyright © 2011-2022 走看看