zoukankan      html  css  js  c++  java
  • java面试-阻塞队列

    一、阻塞队列

    当阻塞队列是空,从队列中获取元素的操作会被阻塞

    当阻塞队列是满,往队列中添加元素的操作会被阻塞

    二、为什么用,有什么好处?

    我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切阻塞队列都包办了。

    三、常见的阻塞队列

    ArrayBlockingQueue由数组构成的有界阻塞队列.

    LinkedBlockingQueue由链表构成的有界阻塞队列(默认值为Integer.MAX_VALUE)

    public class BlockingQueueDemo {
    
        public static void main(String[] args) throws InterruptedException {
            BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3);
            /**
             * 1、抛出异常 add()/remove()
             */
    //        System.out.println(blockingQueue.add("a"));
    //        System.out.println(blockingQueue.add("b"));
    //        System.out.println(blockingQueue.add("c"));
    //        System.out.println(blockingQueue.add("d"));
    
    //        System.out.println(blockingQueue.element()); //检查队首元素
    
    //        System.out.println(blockingQueue.remove());
    //        System.out.println(blockingQueue.remove());
    //        System.out.println(blockingQueue.remove());
    //        System.out.println(blockingQueue.remove());
    
    
            /**
             * 2、返回布尔类型 offer()/pull()
             */
    //        System.out.println(blockingQueue.offer("a"));
    //        System.out.println(blockingQueue.offer("b"));
    //        System.out.println(blockingQueue.offer("c"));
    //        System.out.println(blockingQueue.offer("d"));
    //
    //        System.out.println(blockingQueue.peek()); //检查队首元素
    //
    //        System.out.println(blockingQueue.poll());
    //        System.out.println(blockingQueue.poll());
    //        System.out.println(blockingQueue.poll());
    //        System.out.println(blockingQueue.poll());
    
    
    
            /**
             * 3、阻塞 put()/take()
             */
    //        blockingQueue.put("a");
    //        blockingQueue.put("b");
    //        blockingQueue.put("c");
    //        System.out.println("############");
    //        blockingQueue.put("d");
    //
    //        System.out.println(blockingQueue.take());
    //        System.out.println(blockingQueue.take());
    //        System.out.println(blockingQueue.take());
    //        System.out.println(blockingQueue.take());
    
    
            /**
             *4、超时
             */
    
            System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS));
            System.out.println(blockingQueue.offer("b",2L, TimeUnit.SECONDS));
            System.out.println(blockingQueue.offer("c",2L, TimeUnit.SECONDS));
            System.out.println(blockingQueue.offer("d",2L, TimeUnit.SECONDS));
        }
    }

    SynchronousQueue是一个不存储元素的阻塞队列,也即单个元素的队列

    public class SynchronousQueueDemo {
        public static void main(String[] args) {
            BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
    
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + " put 1");
                    blockingQueue.put("1");
    
                    System.out.println(Thread.currentThread().getName() + " put 2");
                    blockingQueue.put("2");
    
                    System.out.println(Thread.currentThread().getName() + " put 3");
                    blockingQueue.put("3");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "AAA").start();
    
    
            new Thread(() -> {
                try {
    
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(Thread.currentThread().getName() + " take " + blockingQueue.take());
    
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(Thread.currentThread().getName() + " take " + blockingQueue.take());
    
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(Thread.currentThread().getName() + " take " + blockingQueue.take());
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "BBB").start();
        }
    }  

    PriorityBlockingQueue:支持优先级排序的无界阻塞队列

    DelayQueue:使用优先级队列实现的延迟无界阻塞队列

    LinkedTransferQueue:由链表构成的无界阻塞队列

    LinkedBlockingDeque:由链表构成的双向阻塞队列

    四、BlockQueue的核心方法

    add()/remove()/element():抛出异常

    offer()/pull():返回布尔类型/支持超时  

    put()/take():阻塞

    peek() 检查队列首元素

    五、使用场景

    1、生产者-消费者模式

        问题: 一个初始值为零的变量,两个线程对其交替操作,一个加1一个减1,来5轮

          1)传统版的生产者-消费者模式

    /**
     * Created by wujuhong on 2019/7/3.
     * 传统版的生产者消费者模式
     */
    public class ProductConsumer_TraditionDemo {
    
        public static void main(String[] args) {
            ShareData shareData = new ShareData();
            new Thread(() -> {
                for (int i = 0; i < 5; i++) {
                    try {
                        shareData.increment();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "AA").start();
    
    
            new Thread(() -> {
                for (int i = 0; i < 5; i++) {
                    try {
                        shareData.decrement();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "BB").start();
        }
    }
    
    class ShareData {
        private int number = 0;
        private Lock lock = new ReentrantLock();
        private Condition condition = lock.newCondition();
    
        public void increment() throws InterruptedException {
            lock.lock();
            try {
                //1、判断
                while (number != 0) {
                    //等待,不能生产
                    condition.await();
                }
                //2、干活
                number++;
                System.out.println(Thread.currentThread().getName() + " " + number);
                //3、通知唤醒
                condition.signalAll();
    
            } finally {
                lock.unlock();
            }
        }
    
    
        public void decrement() throws InterruptedException {
            lock.lock();
            try {
                //1、判断
                while (number == 0) {
                    //等待,不能生产
                    condition.await();
                }
                //2、干活
                number--;
                System.out.println(Thread.currentThread().getName() + " " + number);
                //3、通知唤醒
                condition.signalAll();
    
            } finally {
                lock.unlock();
            }
        }
    }
    

      2)阻塞队列的生产者-消费者模式

    class MyResource {
        private volatile boolean FLAG = true; //默认生产,进行生产+消费
        private AtomicInteger atomicInteger = new AtomicInteger();
        BlockingQueue<String> blockingQueue = null;
    
        public MyResource(BlockingQueue<String> blockingQueue) {
            this.blockingQueue = blockingQueue;
            System.out.println(blockingQueue.getClass().getName());
        }
    
    
        public void myProduct() throws InterruptedException {
            String data = "";
            boolean returnValue;
            while (FLAG) {
                data = atomicInteger.incrementAndGet() + "";
                returnValue = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
                if (returnValue) {
                    System.out.println(Thread.currentThread().getName() + " 插入队列" + data + "成功");
                } else {
                    System.out.println(Thread.currentThread().getName() + " 插入队列" + data + "失败");
    
                }
                TimeUnit.SECONDS.sleep(1);
            }
            System.out.println(Thread.currentThread().getName() + " 生产动作结束,FLAG = false");
        }
    
    
        public void myConsume() throws InterruptedException {
            String result = "";
            while (FLAG) {
                result = blockingQueue.poll(2, TimeUnit.SECONDS);
                if (null == result || result.equalsIgnoreCase("")) {
                    FLAG = false;
                    System.out.println(Thread.currentThread().getName() + " 超过2s钟没有取到蛋糕,消费队列退出");
                    return;
    
                }
                System.out.println(Thread.currentThread().getName() + " 消费队列" + result + "成功");
    
                TimeUnit.SECONDS.sleep(1);
            }
            System.out.println(Thread.currentThread().getName() + " 消费动作结束,FLAG = false");
        }
    
    
        public void stop() {
            this.FLAG = false;
        }
    }
    
    public class ProductConsumer_BlockQueueDemo {
        public static void main(String[] args) throws InterruptedException {
            MyResource myResource = new MyResource(new ArrayBlockingQueue<String>(10));
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + " 生产线程启动");
                try {
                    myResource.myProduct();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "product").start();
    
    
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + " 消费线程启动");
                try {
                    myResource.myConsume();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "consume").start();
    
            TimeUnit.SECONDS.sleep(5);
    
            System.out.println("5秒钟时间到,main线程叫停,活动结束");
    
            myResource.stop();
        }
    }  

    2、线程池

    3、消息中间件

    阻塞队列有没有好的一面?

    不得不阻塞,你如何管理?

  • 相关阅读:
    566. Reshape the Matrix矩阵重排
    697. Degree of an Array 频率最高元素的最小覆盖子数组
    717. 1-bit and 2-bit Characters最后一位数是否为0
    189. Rotate Array 从右边开始翻转数组
    448. Find All Numbers Disappeared in an Array 寻找有界数组[1,n]中的缺失数
    268. Missing Number序列中遗失的数字
    C 练习实例20 – 小球自由下落
    menu (Elements) – HTML 中文开发手册
    HTML DOM Password form 属性
    fmal (Numerics) – C 中文开发手册
  • 原文地址:https://www.cnblogs.com/wjh123/p/11123579.html
Copyright © 2011-2022 走看看