zoukankan      html  css  js  c++  java
  • 阻塞队列(BlockingQueue)

    阻塞队列是 java.util.concurrent 包提供的一个类,该类提供了多线程中通过队列实现安全高效的数据处理的功能。

    所谓阻塞队列,是在普通队列基础上实现了阻塞线程的功能:

    •   队列为空时,获取元素的线程阻塞,直到队列变为非空。
    •   当队列满时,存储元素的线程阻塞,直到队列可用(非满)。

    以下是阻塞队列实现阻塞线程的两种常用场景:

     

    阻塞队列提供的方法:

      插入方法:

        1. boolean add(E e):队列没有满,则插入数据并返回true;队列满时,抛出异常 java.lang.IllegalStateException: Queue full。

        2. boolean offer(E e):队列没有满,则插入数据并返回true;队列满时,返回false。

        3. void put(E e):队列没有满,则插入数据;队列满时,阻塞调用此方法线程,直到队列有空闲空间时此线程进入就绪状态。

        4. boolean offer(E e, long timeout, TimeUnit unit):队列没有满,插入数据并返回true;队列满时,阻塞调用此方法线程,若指定等待的时间内还不能往队列中插入数据,返回false。

      移除方法:

        1. E remove():队列非空,则以FIFO原则移除数据,并返回该数据的值;队列为空,抛出异常 java.util.NoSuchElementException。

        2. E poll():队列非空,移除数据,并返回该数据的值;队列为空,返回null。

        3. E take():队列非空,移除数据,并返回该数据的值;队列为空,阻塞调用此方法线程,直到队列为非空时此线程进入就绪状态。

        4. E poll(long timeout, TimeUnit unit):队列非空,移除数据,并返回该数据的值;队列为空,阻塞调用此方法线程,若指定等待的时间内队列都没有数据可取,返回null。

      检查方法:

        1. E element():队列非空,则返回队首元素;队列为空,抛出异常 java.util.NoSuchElementException。

        2. E peek():队列非空,则返回队首元素;队列为空,返回null。

      获取所有成员的方法:

        1. int drainTo(Collection<? super E> c):一次性从BlockingQueue获取所有可用的数据对象存入集合中。

        2. int drainTo(Collection<? super E> c, int maxElements):从BlockingQueue获取指定数据的个数的对象存入集合中。

    JDK提供的阻塞队列:

      1. ArrayBlockingQueue :一个由数组结构实现的有界阻塞队列。

        ArrayBlockingQueue内部,维护了一个定长数组。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下对象内部采用非公平锁,所谓公平锁是指阻塞的所有生产者线程(或消费者线程),当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素(先阻塞的消费者线程,可以先从队列里获取元素)。通常情况下为了保证公平性会降低吞吐量。

      2. LinkedBlockingQueue :一个由链表结构实现的有界阻塞队列。

        LinkedBlockingQueue是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。按照先进先出的原则对元素进行排序。

      3. DelayQueue:一个使用优先级队列实现的无界阻塞队列。

        DelayQueue是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。使用场景:常见的例子比如使用一个DelayQueue来管理一个超时未响应的连接队列。

      4. PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。

        PriorityBlockingQueue是一个支持优先级的无界队列。默认情况下元素采取自然顺序排列,也可以通过比较器comparator来指定元素的排序规则。元素按照升序排列。

      5. SynchronousQueue:一个不存储元素的阻塞队列。

        SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。

      6. LinkedTransferQueue:一个由链表结构实现的无界阻塞队列。

        LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列LinkedTransferQueue多了tryTransfer和transfer方法。

      7. LinkedBlockingDeque:一个由链表结构实现的双向阻塞队列。

        LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的你可以从队列的两端插入和移出元素。双端队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。

    阻塞队列常用场景是 “生产者—消费者” 模式,以下是一个生产者不断生产随机数据存入队列,消费者不断获取的实例:

    import java.util.Random;
    import java.util.concurrent.*;
    
    public class BlockingQueueTest {
    
        /**
         * 生产者
         */
        public static class Producer implements Runnable {
    
            /**
             * 阻塞队列
             */
            private BlockingQueue<Integer> blockingQueue;
    
            /**
             * 判断是否循环
             */
            private boolean isRunning = true;
    
            /**
             * 随机数据范围
             */
            private static final int RANGE_FOR_DATA = 1000;
    
            private Random random = new Random();
    
            public Producer(BlockingQueue<Integer> blockingQueue) {
                this.blockingQueue = blockingQueue;
            }
    
            @Override
            public void run() {
                while (isRunning) {
                    try {
                        /** 生产出随机数 */
                        int data = random.nextInt(RANGE_FOR_DATA);
                        System.out.println(Thread.currentThread().getName() + " 生产数据:" + data);
                        /** 将随机数放入阻塞队列 */
                        blockingQueue.put(data);
                        System.out.println(Thread.currentThread().getName() + " 插入队列:" + data);
                        /** 进行随机时间休眠 */
                        Thread.sleep(random.nextInt(1000));
                    } catch (InterruptedException e) {
                        System.out.println("程序结束啦,我不用再等待阻塞队列有空余位置了!");
                    }
                }
            }
    
            /**
             * 终止生产线程
             */
            public void shutDown() {
                isRunning = false;
            }
        }
    
        /**
         * 消费者
         */
        public static class Consumer implements Runnable {
    
            /**
             * 阻塞队列
             */
            private BlockingQueue<Integer> blockingQueue;
    
            /**
             * 判断是否循环
             */
            private boolean isRunning = true;
    
            /**
             * 随机数据范围
             */
            private Random random = new Random();
    
            public Consumer(BlockingQueue<Integer> blockingQueue) {
                this.blockingQueue = blockingQueue;
            }
    
            @Override
            public void run() {
                while (isRunning) {
                    try {
                        /** 从阻塞队列中获取随机数 */
                        int data = (int) blockingQueue.take();
                        System.out.println(Thread.currentThread().getName() + " 消费数据:" + data);
                        /** 进行随机时间休眠 */
                        Thread.sleep(random.nextInt(1000));
                    } catch (InterruptedException e) {
                        System.out.println("程序结束啦,我不用再等待阻塞队列非空了!");
                    }
                }
            }
    
            /**
             * 终止消费线程
             */
            public void shutDown() {
                isRunning = false;
            }
        }
    
        public static void main(String[] args) {
            /** 创建容量大小为5的阻塞队列 */
            BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(5);
            /** 创建连接池 */
            ExecutorService pool = Executors.newCachedThreadPool();
            /** 创建生产线程,消费线程各5个 */
            Producer[] producers = new Producer[5];
            Consumer[] consumers = new Consumer[5];
            /** 实例化生产线程与消费线程并且执行线程 */
            for (int i = 0; i < producers.length; i++) {
                producers[i] = new Producer(blockingQueue);
                consumers[i] = new Consumer(blockingQueue);
                pool.execute(producers[i]);
                pool.execute(consumers[i]);
            }
            try {
                /** 等待5秒后进行手动中断 */
                Thread.sleep(5 * 1000);
                for (int i = 0; i < producers.length; i++) {
                    producers[i].shutDown();
                    consumers[i].shutDown();
                }
                /** 其实提不提醒线程关闭都一个样了,阻塞的线程,不会因为手动中断而中断的 */
                pool.shutdown();
                /** 等待2秒,若还有线程没有关闭则强行中断所有等待线程 */
                if (!pool.awaitTermination(2 * 1000, TimeUnit.MILLISECONDS)) {
                    /** 超时的时候向线程池中所有的线程发出中断 */
                    pool.shutdownNow();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    以下是对火车售票系统的模拟,在火车票发售的短短几分钟会有众多的人进行下单,这就是一个处理并发的问题。我们可以将用户请求存入阻塞队列,然后再对这些请求一一处理。

    import java.util.ArrayList;
    import java.util.List;
    import java.util.Random;
    import java.util.concurrent.*;
    
    public class TrainTicketingSystem {
        /**
         * 用户线程数量,同时进行抢票
         */
        private static final int USER_THREAD_COUNT = 300;
    
        /**
         * 火车票数量
         */
        private static final int TICKET_COUNT = 500;
    
        /**
         * 用户抢票线程,将用户名放入阻塞队列等待系统处理。
         */
        public static class User implements Runnable {
            /**
             * 用户名
             */
            private String username;
    
            /**
             * 火车票
             */
            private Ticket ticket;
    
            private BlockingQueue<String> blockingQueue;
    
            private Random random = new Random();
    
            public User(String username, BlockingQueue<String> blockingQueue) {
                this.username = username;
                this.blockingQueue = blockingQueue;
            }
    
            public void setUsername(String username) {
                this.username = username;
            }
    
            public String getUsername() {
                return username;
            }
    
            public void setTicket(Ticket ticket) {
                this.ticket = ticket;
            }
    
            public Ticket getTicket() {
                return ticket;
            }
    
            @Override
            public void run() {
                try {
                    /** 在休眠1s以内后开始抢票 */
                    Thread.sleep(random.nextInt(1000));
                    System.out.println(username + "开始抢票");
                    /** 将用户名放入阻塞队列 */
                    blockingQueue.put(username);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * 火车票,只是一个POJO
         */
        public static class Ticket {
            /**
             * 火车票编号
             */
            private String tid;
    
            /**
             * 用户名
             */
            private String username;
    
            public Ticket(String tid) {
                this.tid = tid;
            }
    
            public String getTid() {
                return tid;
            }
    
            public void setTid(String tid) {
                this.tid = tid;
            }
    
            public String getUsername() {
                return username;
            }
    
            public void setUsername(String username) {
                this.username = username;
            }
        }
    
        /**
         * 售票线程,取出阻塞队列中的用户名,将火车票卖给用户名对应的用户
         */
        public static class SellTicket implements Runnable {
            /**
             * 火车票组
             */
            private Ticket[] tickets;
            /**
             * 阻塞队列
             */
            private BlockingQueue<String> blockingQueue;
    
            public SellTicket(Ticket[] tickets, BlockingQueue<String> blockingQueue) {
                this.tickets = tickets;
                this.blockingQueue = blockingQueue;
            }
    
            @Override
            public void run() {
                for (int i = 0; i < tickets.length; i++) {
                    try {
                        String username = blockingQueue.take();
                        tickets[i].setUsername(username);
                        System.out.println(username + " 抢票成功,火车票编号:" + tickets[i].getTid());
                    } catch (InterruptedException e) {
                        System.out.println("售票时间截止!");
                        break;
                    }
                }
            }
        }
    
        public static void main(String[] args) {
            /** 存放用户线程的数组 */
            User[] users = new User[USER_THREAD_COUNT];
            /** 存放火车票的数组 */
            Ticket[] tickets = new Ticket[TICKET_COUNT];
            /** 链表存储结构的无界阻塞队列 */
            BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>();
            /** 连接池 */
            ExecutorService pool = Executors.newCachedThreadPool();
            /** 创建火车票实例 */
            for (int i = 0; i < tickets.length; i++) {
                tickets[i] = new Ticket("#00000" + String.valueOf(i));
            }
            /** 创建并启动用户线程组中的线程 */
            for (int i = 0; i < users.length; i++) {
                users[i] = new User("user" + String.valueOf(i), blockingQueue);
                pool.execute(users[i]);
            }
            /** 创建并启动售卖火车票线程 */
            pool.execute(new SellTicket(tickets, blockingQueue));
            try {
                /** 使用集合将阻塞队列中的所有元素存入,统计抢票失败的人数 */
                Thread.sleep(4 * 1000);
                List<String> list = new ArrayList<String>();
                int count = blockingQueue.drainTo(list);
                System.out.println("************ 一共有:" + count + "个人没有抢到票 ************");
                list.forEach((s) -> System.out.println("*** 用户" + s + "抢票失败 ***"));
    
                /** 中断所有运行的线程,由于是无界阻塞队列,就是中断售票线程,意思是停止售票 */
                pool.shutdown();
                if (!pool.awaitTermination(3 * 1000, TimeUnit.MILLISECONDS)) {
                    /** 超时的时候向线程池中所有的线程发出中断 */
                    pool.shutdownNow();
                }
                try {
                    Thread.sleep(4 * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                /** 将火车票传给对应的用户 */
                for (int i = 0; i < tickets.length; i++) {
                    /** 若是淡季,火车票不会被卖完,火车票对应的用户名就为空 */
                    if (tickets[i].getUsername() != null) {
                        int id = Integer.parseInt(tickets[i].getUsername().substring(4));
                        users[id].setTicket(tickets[i]);
                    }
                }
                /** 将用户信息依次输出 */
                System.out.println("************ 以下是所有用户信息 ************");
                for (int i = 0; i < users.length; i++) {
                    /** 若是旺季,很多用户没有抢到票ticket就为空 */
                    if (users[i].getTicket() != null) {
                        System.out.println("*** 火车票编号:" + users[i].getTicket().getTid() + " ***");
                        System.out.println("*** 用户名:    " + users[i].getTicket().getUsername() + " ***");
                    } else {
                        System.out.println("*** 火车票编号:" + "#####" + " ***");
                        System.out.println("*** 用户名:    " + users[i].getUsername() + " ***");
                    }
                }
            }
        }
    }
  • 相关阅读:
    超赞!不容错过的5款实用网页开发和设计工具
    如何从平面设计转行到UI设计?
    线段树
    RMQ
    Splay
    Treap
    *模板--矩阵
    最小生成树
    hash
    ac自动机
  • 原文地址:https://www.cnblogs.com/JimKing/p/9067821.html
Copyright © 2011-2022 走看看