zoukankan      html  css  js  c++  java
  • 抽象队列同步器

    一 BlockingQueue

      是java.util.concurrent包提供的用于解决并发生产者-消费者最有用的类,它的特性是在任意时刻只有一个线程可以进行take或put操作,并且blockingqueue提供了超时return null的机制,在很多生产场景里可以看到这个工具的身影.

      1. 队列类型

        无限队列 -- 几乎可以无限增长

        有限队列 -- 定义了最大容量

      2. 队列数据结构

      • 通常用链表或者数组实现
      • 一般而言队列具备FIFO先进先出的特性,当然也有双端队列(Deque)优先级队列
      • 主要操作: 入队(EnQueue)与出队(Dequeue)


      3. 常见的4种阻塞队列

        ArrayBlockingQueue : 队列基于数组实现,大小在创建ArrayBlockingQueue时已经定义好.

        应用场景 : 线程池中有比较多的应用,生产者消费者场景

        LinkedBlockingQueue :  是一个基于链表的无界队列, 使用无限BlockingQueue设计生产者-消费者模型最重要的是,消费者应该能够像生产者向队列添加消息一样快的消费消息,否则,内存可能会填满.

        DelayQueue : 由优先级堆支持的,基于时间的调度队列,内部基于无界队列PriorityQueue实现,而无界队列基于数组的扩容实现.

        工作原理 : 队列内部会根据时间优先级进行排序.

      4. BlockingQueue API

        add() : 插入成功返回true,否则抛出 IllegalStateException 异常

        put() : 将指定的元素插入队列, 如果队列满了,会阻塞,直到有空间插入

        offer() : 如果插入成功返回true , 否则返回false

        offer(E e, long timeout, TimeUnit unit ) : 等同于put

        take() : 获取队列的头部元素并将其删除,如果队列为空,则阻塞并等待元素变为可用

        poll(long timeout, TimeUnit unit) : 检索并删除队列的头部, 等待指定的时间以使元素可用,如果超时,则返回null

      5. 多线程实现生产者消费者

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ThreadLocalRandom;
    
    public class NumbersProducer implements Runnable {
        private BlockingQueue<Integer> numbersQueue;
        private final int poisonPill;
        private final int poisonPillPerProducer;
    
        public NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) {
            this.numbersQueue = numbersQueue;
            this.poisonPill = poisonPill;
            this.poisonPillPerProducer = poisonPillPerProducer;
        }
    
        public void run() {
            try {
                generateNumbers();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    
        private void generateNumbers() throws InterruptedException {
            for (int i = 0; i < 100; i++) {
                numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
                System.out.println("潘金莲-{" + Thread.currentThread().getId() + "}号,给武大郎的泡药!");
            }
            for (int j = 0; j < poisonPillPerProducer; j++) {
                numbersQueue.put(poisonPill);
                System.out.println("潘金莲-{" + Thread.currentThread().getId() + "}号,往武大郎的药里放入第{" + (j + 1) + "}颗毒丸!");
            }
        }
    }
    
    
    
    import java.util.concurrent.BlockingQueue;
    
    public class NumbersConsumer implements Runnable {
        // 阻塞队列
        private BlockingQueue<Integer> queue;
        private final int poisonPill;
    
        public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {
            this.queue = queue;
            this.poisonPill = poisonPill;
        }
    
        public void run() {
            try {
                while (true) {
                    Integer number = queue.take();
                    if (number.equals(poisonPill)) {
                        System.out.println("武大郎-{" + Thread.currentThread().getId() + "}号GG,喝药-编号:{" + number + "}");
                        return;
                    }
                    System.out.println("武大郎-{" + Thread.currentThread().getId() + "}号,喝药-编号:{" + number + "}");
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    
    public class Main {
    
        public static void main(String[] args) {
            // 阻塞队列最大值
            int BOUND = 10;
            // 16个生产者线程
            int N_PRODUCERS = 16;
            // 获取虚拟机可用的处理器数量
            int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
            // 获取最大值
            int poisonPill = Integer.MAX_VALUE;
            int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
            int mod = N_CONSUMERS % N_PRODUCERS;
    
            BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);
            //潘金莲给武大郎熬药
            for (int i = 1; i < N_PRODUCERS; i++) {
                new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start();
            }
            //武大郎开始喝药
            for (int j = 0; j < N_CONSUMERS; j++) {
                new Thread(new NumbersConsumer(queue, poisonPill)).start();
            }
            //潘金莲开始投毒,武大郎喝完毒药GG
            new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start();
        }
    
    }
    岁月本长而忙者自促;天地本宽而卑者自隘;风花雪月本闲,而劳忧者自冗;天行健,君子以自强不息;地势坤,君子以厚德载物;宠辱不惊,闲看庭前花开花落;去留无意,漫随天外云卷云舒.不妄取,不妄予,不妄想,不妄求,与人方便,随遇而安
  • 相关阅读:
    如何通过npm编译Typescript代码
    TypeScript 中的':' 和'?:'的区别
    无法读取本地服务器JSON文件, 返回404错误
    Nodejs , npn 注册 包上传,更新,下载
    Java 字符流
    Java 字节流
    Java中的File类,递归是什么?
    JDBC工具类—如何封装JDBC
    JDBC的开发步骤
    vFor和vIf不要一起使用
  • 原文地址:https://www.cnblogs.com/vvning/p/13932179.html
Copyright © 2011-2022 走看看