zoukankan      html  css  js  c++  java
  • Java深入学习(2):并发队列

    并发队列:

    在并发队列中,JDK有两套实现:

    ConcurrentLinkedQueue:非阻塞式队列

    BlockingQueue:阻塞式队列

    阻塞式队列非阻塞式队列的区别:

    阻塞式队列入列操作的时候,如果超出队列总数,这个时候会进行等待;在出列的时候,如果队列为空,也会等待

    非阻塞无论如何都不等待

    非阻塞效率更高,但是阻塞使用更广泛

    阻塞队列的优点是能够防止队列容器溢出,防止丢失

    非阻塞队列:

    public class QueueDemo {
        public static void main(String[] args) {
            ConcurrentLinkedQueue<String> concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
            concurrentLinkedQueue.offer("张三");
            concurrentLinkedQueue.offer("李四");
            concurrentLinkedQueue.offer("王五");
            for (int i = 0; i < 4; i++) {
                System.out.println(concurrentLinkedQueue.poll());
            }
        }
    }

    打印如下:

    张三
    李四
    王五
    null

    阻塞队列(重要):需要初始化队列总数

    public class QueueDemo {
        public static void main(String[] args) throws InterruptedException {
            BlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
            //添加非阻塞式队列
            arrayBlockingQueue.offer("张三");
            arrayBlockingQueue.offer("李四");
            arrayBlockingQueue.offer("王五");
            //添加阻塞式队列,等待时间为3s
            arrayBlockingQueue.offer("赵六",3, TimeUnit.SECONDS);
            System.out.println(arrayBlockingQueue.poll());
            System.out.println(arrayBlockingQueue.poll());
            System.out.println(arrayBlockingQueue.poll());
            System.out.println(arrayBlockingQueue.poll(3,TimeUnit.SECONDS));
        }
    }

    这种情况,等待3秒后打印:张三,李四,王五,再等待3秒后打印:null

    换一下代码:

    public class QueueDemo {
        public static void main(String[] args) throws InterruptedException {
            BlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
            //添加非阻塞式队列
            arrayBlockingQueue.offer("张三");
            arrayBlockingQueue.offer("李四");
            System.out.println(arrayBlockingQueue.poll());
            arrayBlockingQueue.offer("王五");
            //添加阻塞式队列,等待时间为3s
            arrayBlockingQueue.offer("赵六",3, TimeUnit.SECONDS);
            System.out.println(arrayBlockingQueue.poll());
            System.out.println(arrayBlockingQueue.poll());
            System.out.println(arrayBlockingQueue.poll());
            System.out.println(arrayBlockingQueue.poll(3,TimeUnit.SECONDS));
        }
    }

    这种情况,立即打印张三,李四,王五,赵六,等待3秒后打印null

    示例:

    public class QueueDemo {
        public static void main(String[] args) throws InterruptedException {
            BlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
            //添加非阻塞式队列
            boolean success1 = arrayBlockingQueue.offer("张三");
            boolean success2 = arrayBlockingQueue.offer("李四");
            boolean success3 = arrayBlockingQueue.offer("王五");
            //添加阻塞式队列,等待时间为3s
            boolean success4 = arrayBlockingQueue.offer("赵六",3, TimeUnit.SECONDS);
            System.out.println(success1);
            System.out.println(success2);
            System.out.println(success3);
            System.out.println(success4);
        }
    }

    等待3秒后打印:true,true,true,false;说明赵六没有入列成功

    生产者消费者示例:

    下面模拟一个生产者消费者的例子,以便于更好地理解:

    生产者线程存一个队列,消费者线程取一个队列,多线程中可以采用等待唤醒机制,在这里采用并发队列实现

    package org.dreamtech;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * 生产者线程,负责添加队列
     */
    class ProducerThread implements Runnable {
    
        private BlockingQueue<String> blockingQueue;
    
        private volatile boolean FLAG = true;
    
        private AtomicInteger atomicInteger = new AtomicInteger();
    
        ProducerThread(BlockingQueue<String> blockingQueue) {
            this.blockingQueue = blockingQueue;
        }
    
        @Override
        public void run() {
            try {
                System.out.println("---生产者线程启动成功---");
                while (FLAG) {
                    String data = atomicInteger.incrementAndGet() + "";
                    boolean success = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
                    if (success) {
                        System.out.println("---生产者存入队列成功->data:" + data + "---");
                    } else {
                        System.out.println("---生产者存入队列失败->data:" + data + "---");
                    }
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println("---生产者线程已经结束---");
            }
        }
    
        public void stop() {
            this.FLAG = false;
        }
    
    }
    
    /**
     * 消费者线程,负责获取队列
     */
    class ConsumerThread implements Runnable {
    
        private BlockingQueue<String> blockingQueue;
    
        private boolean FLAG = true;
    
        ConsumerThread(BlockingQueue<String> blockingQueue) {
            this.blockingQueue = blockingQueue;
        }
    
        @Override
        public void run() {
            try {
                System.out.println("---消费者线程启动成功---");
                while (FLAG) {
                    String data = blockingQueue.poll(2, TimeUnit.SECONDS);
                    if (data == null) {
                        System.out.println("---消费者没有获取到队列信息---");
                        FLAG = false;
                        return;
                    }
                    System.out.println("---消费者获得队列信息->data:" + data + "---");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println("---消费者线程已经结束---");
            }
        }
    
    }
    
    public class Test {
        public static void main(String[] args) {
            try {
                BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(10);
                ProducerThread producerThread = new ProducerThread(blockingQueue);
                ConsumerThread consumerThread = new ConsumerThread(blockingQueue);
                Thread producer = new Thread(producerThread);
                Thread consumer = new Thread(consumerThread);
                producer.start();
                consumer.start();
                Thread.sleep(10000);
                producerThread.stop();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    打印如下:

    ---消费者线程启动成功---
    ---生产者线程启动成功---
    ---生产者存入队列成功->data:1---
    ---消费者获得队列信息->data:1---
    ---生产者存入队列成功->data:2---
    ---消费者获得队列信息->data:2---
    .............................................
    ---生产者存入队列成功->data:9---
    ---消费者获得队列信息->data:9---
    ---生产者存入队列成功->data:10---
    ---消费者获得队列信息->data:10---
    ---生产者线程已经结束---
    ---消费者没有获取到队列信息---
    ---消费者线程已经结束---
  • 相关阅读:
    把文本数据转化为json
    componentsSeparatedByString 的注意事项
    内存管理
    审核问题2.3.1
    H5缩放效果的问题和缓存问题
    iOS库
    DDOS 攻击防范
    连接数过多的问题
    nginx 长连接keeplive
    javascript 判断身份证的正确性
  • 原文地址:https://www.cnblogs.com/xuyiqing/p/11622499.html
Copyright © 2011-2022 走看看