zoukankan      html  css  js  c++  java
  • LinkedBlockingQueue、ArrayBlockingQueue、DelayQueue、TransferQueue、SynchronousQueue

    1.LinkedBlockingQueue

    /**
     * 使用阻塞同步队列 LinkedBlockingQueue 完成生产者消费者模式
     * 使用场景较多。
     */
    public class T05_LinkedBlockingQueue {
        public static void main(String[] args) {
    
            BlockingQueue<String> queue = new LinkedBlockingQueue<>();
    
            // 启动生产者线程生产
            new Thread(() -> {
                for (int j = 0; j < 100; j++) {
                    try {
                //若果容器满了,它会在这里等待,容器有空余了在插入数据 queue.put(
    "aaa" + j); // put 方法,给容器添加元素,如果容器已经满了,则会阻塞等待 } catch (InterruptedException e) { e.printStackTrace(); } } }, "p").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } // 启用消费者线程消费 for (int i = 0; i < 5; i++) { new Thread(() -> { while (true) { try { System.out.println(Thread.currentThread().getName() + ":" + queue.take()); // 从队列中拿数据,如果空了,则会阻塞等待 } catch (InterruptedException e) { e.printStackTrace(); } } }, "c" + i).start(); } } }

    2.ArrayBlockingQueue

    /**
     * 使用阻塞有界同步队列 ArrayBlockingQueue 完成生产者消费者模式
     */
    public class T06_ArrayBlockingQueue {
        public static void main(String[] args) throws InterruptedException {
    
            BlockingQueue queue = new ArrayBlockingQueue<>(10);
            for (int i = 0; i < 10; i++) {
                queue.put("a" + i);
            }
          //put的话在容器满了的情况下进行等待
    //queue.put("a11"); // 会阻塞 //queue.add("a11"); // 会抛出异常 //System.out.println(queue.offer("a11")); // 会返回false System.out.println(queue.offer("a11", 1, TimeUnit.SECONDS)); // 会等待1s,返回false, 如果1s内有空闲,则添加成功后返回true } }

    3.DelayQueue可以当做定时任务

    /**
     * DelayQueue,
     * 出队有个时间限制, 每个元素有一个等待时间, 可以按照等待时间排序元素
     * DelayQueue元素必须为 Delayed类型的,即必须设置元素的等待时间
     * 
     * 用途,定时执行任务
     */
    public class T07_DelayQueue {
    
        public static void main(String[] args) throws InterruptedException {
            long timestamp = System.currentTimeMillis();
            MyTask myTask1 = new MyTask(timestamp + 1000); // 1s后执行
            MyTask myTask2 = new MyTask(timestamp + 2000);
            MyTask myTask3 = new MyTask(timestamp + 1500);
            MyTask myTask4 = new MyTask(timestamp + 2500);
            MyTask myTask5 = new MyTask(timestamp + 500);
    
            DelayQueue<MyTask> tasks = new DelayQueue<>();
            tasks.put(myTask1);
            tasks.put(myTask2);
            tasks.put(myTask3);
            tasks.put(myTask4);
            tasks.put(myTask5);
    
            System.out.println(tasks);  // 确实按照我们拍的顺序执行的
    
            for (int i = 0; i < tasks.size(); i++) {
                System.out.println(tasks.take());
            }
        }
        static class MyTask implements Delayed {
            private long runningTime;
    
            public MyTask(long runTime) {
                this.runningTime = runTime;
            }
            
            // 这是每个元素的等待时间, 越是后加入的元素,时间等待的越长
            @Override
            public long getDelay(TimeUnit unit) {
                return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
            }
    
            // 这是排序规律, 执行等待时间最短的排在上面
            @Override
            public int compareTo(Delayed o) {
                return (int) (o.getDelay(TimeUnit.MILLISECONDS) - this.getDelay(TimeUnit.MILLISECONDS));
            }      
            @Override
            public String toString() {
                return runningTime + "";
            }
        }
    }

    4.TransferQueue

    /**
     * TransferQueue,
     * 拥有transfer方法,传输,当transfer一个元素时,如果有take方法阻塞等待获取元素,则不向队列中保存,直接给等待的take方法
     * 如果没有消费者线程,transfer线程将会阻塞
     * 
     * 情景:如果将元素放入队列,再拿给消费者线程,太慢了,如果需要的效率更高,可以使用TransferQueue来解决更高的并发
     * 
     */
    public class T08_TransferQueue {
    
        public static void main(String[] args) {
            
            TransferQueue mq = new LinkedTransferQueue();
            
            // 先让消费者线程等待
            new Thread(() -> {
                try {
                    System.out.println(mq.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
    
            // 再让生产者线程生产
            try {
           //一般应用于消息的实时处理,若是消费者都在忙着,可以先放到队列里面,但消费者必须必生产者先启动 mq.transfer(
    "aaa"); // put add 都不会阻塞,会添加到容器中,只有transfer才有此种功能(等待消费者直接获取),所以transfer是有容量的 } catch (InterruptedException e) { e.printStackTrace(); } //若是先起生产者再起消费者,这里就不会再执行了,他找不到消费者了 /*new Thread(() -> { try { System.out.println(mq.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start();*/ } }

     5.SynchronousQueue

    这个是容量为0的队列,生产者直接把数据交给消费者

    /**
     * SynchronousQueue,
     * 一种特殊的TransferQueue,容量为0
     * 
     * TransferQueue是有容量的,可以通过add/put等方法向队列中加入元素
     * 但是SynchronousQueue是灭有
     */
    public class T09_SynchronousQueue {
    
        public static void main(String[] args) throws InterruptedException {
            
            BlockingQueue queue = new SynchronousQueue();
            
            new Thread(() -> {
                try {
                    System.out.println(queue.take()); // 取不到就阻塞
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
    
            //queue.add("aaa"); // IllegalStateException: Queue full  抛出异常,因为没有容量
            queue.put("aaa");  // 会阻塞等待消费者线程获取,内部是transfer
            
            System.out.println(queue.size()); // 长度为0 
        }
    
    }

    总结

    Map/Set:
        无并发:
            HashMap
            TreeMap
            LinkedHashMap
        低并发:
            HashTable
            Collections.synchronizedMap()
        高并发:
            ConcurrentHashMap - 并发高
            ConcurrentSkipListMap - 并发高 且 需要排序
    
    队列:
        无并发:
            ArrayList
            LinkedList
        低并发:
            Vector
            Collections.synchronizedList()
        写少读多:
            CopyOnWriteList
        高并发
            Queue:
                ConcurrentLinkedQueue 非阻塞同步队列
                BlockQueue
                    LinkedBlockingQueue
                    ArrayBlockingQueue 
                    TransferQueue
                    SynchronousQueue
                DelayQueue

     

  • 相关阅读:
    python的reduce()函数
    python的map()函数
    【RxJS 01】函数式编程
    【vs_dev】01 first one
    【Angular01】Angular First One ----附 ip 地址查询
    目录
    ECMA Script 6 something
    【git】打tag
    【work 0107】dione 搭建
    【nextjs】React SSR
  • 原文地址:https://www.cnblogs.com/gxlaqj/p/11699740.html
Copyright © 2011-2022 走看看