zoukankan      html  css  js  c++  java
  • Java并发容器之阻塞队列BlockingQueue

        BlockingQueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。 

        BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:
    这里写图片描述     

    四组不同的行为方式解释:

      • 抛异常:如果试图的操作无法立即执行,抛一个异常。
      • 特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
      • 阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
      • 超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是true / false。

        阻塞队列实现阻塞同步的方式很简单,使用的就是是lock锁的多条件(condition)阻塞控制。封装了根据条件阻塞线程的过程,而我们就不用关心繁琐的await/signal操作了。

        阻塞队列的主要实现类有以下几种:

        1:ArrayBlockingQueue:ArrayBlockingQueue是一个有界的阻塞队列,其内部实现是一个数组。有界也就意味着,它不能够存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(因为它是基于数组实现的,一旦初始化,大小就无法修改,不会自动扩容)。内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。

        2:DelayQueue:DelayQueue 对元素进行持有,直到一个特定的延迟到期。DelayQueue 将会在每个元素的 getDelay() 方法返回的值的时间段之后才释放掉该元素。如果返回的是 0 或者负值,延迟将被认为过期,该元素将会在 DelayQueue 的下一次 take  被调用的时候被释放掉。元素必须实现 java.util.concurrent.Delayed 接口。

        3:LinkedBlockingQueue:LinkedBlockingQueue 内部以一个链表存储元素。可以选择一个长度上限。如果没有定义上限,将使用 Integer.MAX_VALUE 作为上限。内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。

        4:PriorityBlockingQueue:PriorityBlockingQueue 是一个无界的并发队列。所有插入到 PriorityBlockingQueue 的元素必须实现 java.lang.Comparable接口。队列中元素的排序取决于你自己的 Comparable 实现。注意:如果你从一个 PriorityBlockingQueue 获得一个 Iterator 的话,该 Iterator 并不能保证它对元素的遍历是以优先级为序的

        5:SynchronousQueue:SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳一个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。

         6:LinkedBlockingDeque :链阻塞双端队列.它是对BlockingDeque双端队列接口的实现,线程在双端队列的两端都可以插入和提取元素在它为空的时候,一个试图从中抽取数据的线程将会阻塞,无论该线程是试图从哪一端抽取数据

        BlockingQueue 通常用于一个线程生产对象,而另外一个线程消费这些对象的场景,最常见的就是用来实现生产者消费者模式:因为使用者不需显式指定什么时候阻塞进行生产或者消费,队列本身就提供了满或空时的阻塞功能。

    public class BlockingQueueTest {
        //生产者线程
        public static class Producer implements Runnable{
            private final BlockingQueue<Integer> blockingQueue;
            private volatile boolean flag;
            private Random random;
    
            public Producer(BlockingQueue<Integer> blockingQueue) {
                this.blockingQueue = blockingQueue;
                flag=false;
                random=new Random();
    
            }
            public void run() {
                while(!flag){
                    int info=random.nextInt(100);
                    try {
                        blockingQueue.put(info);//生产对象并存入阻塞队列中
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }               
                }
            }
            public void shutDown(){
                flag=true;
            }
        }
        //消费者线程
        public static class Consumer implements Runnable{
            private final BlockingQueue<Integer> blockingQueue;
            private volatile boolean flag;
            public Consumer(BlockingQueue<Integer> blockingQueue) {
                this.blockingQueue = blockingQueue;
            }
            public void run() {
                while(!flag){
                    int info;
                    try {
                        info = blockingQueue.take();//从阻塞队列中消费产品
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }               
                }
            }
            public void shutDown(){
                flag=true;
            }
        }
        public static void main(String[] args){
            BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>(10);
            Producer producer=new Producer(blockingQueue);
            Consumer consumer=new Consumer(blockingQueue);
            //创建5个生产者,5个消费者,并启动生产和消费
            for(int i=0;i<10;i++){
                if(i<5){
                    new Thread(producer,"producer"+i).start();
                }else{
                    new Thread(consumer,"consumer"+(i-5)).start();
                }
            }
    
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            producer.shutDown();//手动停止生产
            consumer.shutDown();//手动停止消费
    
        }
    }
  • 相关阅读:
    springboot + self4j 学习笔记
    git 创建本地分支,并且推送到远程分支
    windows 下生成 ssh key
    Topshelf
    ADSL拨号连接
    EF中使用Contains方法
    elasticsearch中的概念简述
    CriticalFinalizerObject的作用
    sqlserver中的统计语法
    Windbg简单介绍
  • 原文地址:https://www.cnblogs.com/ygj0930/p/6543890.html
Copyright © 2011-2022 走看看