zoukankan      html  css  js  c++  java
  • 并发编程-concurrent指南-阻塞队列-同步队列SynchronousQueue

    SynchronousQueue:同步Queue,属于线程安全的BlockingQueue的一种,此队列设计的理念类似于"单工模式",对于每个put/offer操作,必须等待一个take/poll操作,类似于我们的现实生活中的"火把传递":一个火把传递地他人,需要2个人"触手可及"才行. 因为这种策略,最终导致队列中并没有一个真正的元素;这是一种pipleline思路的基于queue的"操作传递".

    如果有生产者,没有消费者,生产者的数据是put不了SynchronousQueue中的。

    •  void put(E o):向队列提交一个元素,阻塞直到其他线程take或者poll此元素.
    • boolean offer(E o):向队列中提交一个元素,如果此时有其他线程正在被take阻塞(即其他线程已准备接收)或者"碰巧"有poll操作,那么将返回true,否则返回false.
    •   E take():获取并删除一个元素,阻塞直到有其他线程offer/put.
    •  boolean poll():获取并删除一个元素,如果此时有其他线程正在被put阻塞(即其他线程提交元素正等待被接收)或者"碰巧"有offer操作,那么将返回true,否则返回false.
    •  E peek():总会返回null,硬编码.

    这个队列中,对我们有意义的操作时put/take,以及put/offer + take或者put/take + poll,对于无法进入队列的元素,需要有额外的"拒绝"策略支持.

    SynchronousQueue经常用来,一端或者双端严格遵守"单工"(单工作者)模式的场景,队列的两个操作端分别是productor和consumer.常用于一个productor多个consumer的场景。

    在ThreadPoolExecutor中,通过Executors创建的cachedThreadPool就是使用此类型队列.已确保,如果现有线程无法接收任务(offer失败),将会创建新的线程来执行.

    因为SynchronousQueue没有存储功能,因此put和take会一直阻塞,直到有另一个线程已经准备好参与到交付过程中。仅当有足够多的消费者,并且总是有一个消费者准备好获取交付的工作时,才适合使用同步队列。

    SynchronousQueue是这样 一种阻塞队列,其中每个 put 必须等待一个 take,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。

    不能在同步队列上进行 peek,因为仅在试图要取得元素时,该元素才存在;

     也不能迭代队列,因为其中没有元素可用于迭代。队列的头是尝试添加到队列中的首个已排队线程元素; 如果没有已排队线程,则不添加元素并且头为 null。

     注意1:它一种阻塞队列,其中每个 put 必须等待一个 take,反之亦然。
           同步队列没有任何内部容量,甚至连一个队列的容量都没有。
     注意2:它是线程安全的,是阻塞的。
     注意3:不允许使用 null 元素。
     注意4:公平排序策略是指调用put的线程之间,或take的线程之间。
     公平排序策略可以查考ArrayBlockingQueue中的公平策略。
     注意5:SynchronousQueue的以下方法很有趣:
        * iterator() 永远返回空,因为里面没东西。
        * peek() 永远返回null。
        * put() 往queue放进去一个element以后就一直wait直到有其他thread进来把这个element取走。
        * offer() 往queue里放一个element后立即返回,如果碰巧这个element被另一个thread取走了,offer方法返回true,认为offer成功;否则返回false。
        * offer(2000, TimeUnit.SECONDS) 往queue里放一个element但是等待指定的时间后才返回,返回的逻辑和offer()方法一样。
        * take() 取出并且remove掉queue里的element(认为是在queue里的。。。),取不到东西他会一直等。
        * poll() 取出并且remove掉queue里的element(认为是在queue里的。。。),只有到碰巧另外一个线程正在往queue里offer数据或者put数据的时候,该方法才会取到东西。否则立即返回null。
        * poll(2000, TimeUnit.SECONDS) 等待指定的时间然后取出并且remove掉queue里的element,其实就是再等其他的thread来往里塞。
        * isEmpty()永远是true。
        * remainingCapacity() 永远是0。
        * remove()和removeAll() 永远是false。

    具体代码:

    /**
     * SynchronoutQueue 必须有生产者消费者同时,不然会一直阻塞。
     * 没有生产者和消费者,队列中不会出现数据。
     */
    public class Main {
        public static void main(String[] args) {
            SynchronousQueue<Integer> synchronousQueue = new SynchronousQueue<Integer>();
    
            //生产者,不管几个生产者,只会有一个产品进入队列
            Producer producer = new Producer(synchronousQueue);
            producer.start();
            Producer producer1 = new Producer(synchronousQueue);
            producer1.start();
            Producer producer2 = new Producer(synchronousQueue);
            producer2.start();
    
            //消费者
            Consumer consumer = new Consumer(synchronousQueue);
            consumer.start();
        }
    }
    /**
     * 生产者
     */
    public class Producer extends Thread{
        private SynchronousQueue<Integer> sychronousQueue;
        public Producer(SynchronousQueue<Integer> sychronousQueue){
            this.sychronousQueue = sychronousQueue;
        }
    
        @Override
        public void run() {
            while(true){
                int random = new Random().nextInt(1000);
                System.out.println(Thread.currentThread().getName()+",生产一个产品:"+random);
                System.out.println(Thread.currentThread().getName()+",等待三秒后运输出去");
                try {
                    Thread.sleep(3000);
                    System.out.println(Thread.currentThread().getName()+",产品加入队列");
                    sychronousQueue.put(random);
                    System.out.println(Thread.currentThread().getName()+",加入成功");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    /**
     * 消费者
     */
    public class Consumer extends Thread{
        private SynchronousQueue<Integer> synchronousQueue;
        public Consumer(SynchronousQueue<Integer> synchronousQueue){
            this.synchronousQueue = synchronousQueue;
        }
    
        @Override
        public void run() {
            while (true){
                try {
                    System.out.println(Thread.currentThread().getName()+",消费者等待");
                    int random = synchronousQueue.take();
                    System.out.println(Thread.currentThread().getName()+",消费者消费了产品:"+random);
                    System.out.println(Thread.currentThread().getName()+",-----------------------------------------------");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    结果:

    Thread-2,生产一个产品:329
    Thread-2,等待三秒后运输出去
    Thread-1,生产一个产品:204
    Thread-1,等待三秒后运输出去
    Thread-0,生产一个产品:167
    Thread-0,等待三秒后运输出去
    Thread-3,消费者等待
    Thread-1,产品加入队列
    Thread-0,产品加入队列
    Thread-2,产品加入队列
    Thread-3,消费者消费了产品:204
    Thread-3,-----------------------------------------------
    Thread-3,消费者等待
    Thread-1,加入成功
    Thread-3,消费者消费了产品:329
    Thread-1,生产一个产品:496
    Thread-1,等待三秒后运输出去
    Thread-2,加入成功
    Thread-3,-----------------------------------------------
    Thread-3,消费者等待
    Thread-2,生产一个产品:272
    Thread-2,等待三秒后运输出去
    Thread-0,加入成功
    Thread-0,生产一个产品:7
    Thread-0,等待三秒后运输出去
    Thread-3,消费者消费了产品:167
    Thread-3,-----------------------------------------------
    Thread-3,消费者等待
    Thread-1,产品加入队列
    Thread-1,加入成功
    Thread-1,生产一个产品:804
    Thread-3,消费者消费了产品:496
    Thread-3,-----------------------------------------------
    Thread-3,消费者等待
    Thread-1,等待三秒后运输出去
    Thread-2,产品加入队列
    Thread-2,加入成功
    Thread-3,消费者消费了产品:272
    Thread-3,-----------------------------------------------
    Thread-3,消费者等待
    Thread-0,产品加入队列
    Thread-2,生产一个产品:648
    Thread-2,等待三秒后运输出去
    Thread-0,加入成功
    Thread-0,生产一个产品:596
    Thread-0,等待三秒后运输出去
    Thread-3,消费者消费了产品:7
    。。。。。。。

    从结果中可以看出如果已经生产但是还未消费的,那么会阻塞在生产一直等到消费才能生成下一个。

    源码地址:https://github.com/qjm201000/concurrent_synchronousQueue.git

  • 相关阅读:
    4、pytest -- fixtures:明确的、模块化的和可扩展的
    CentOS -- 新建用户并使能密钥登录
    3、pytest -- 编写断言
    2、pytest -- 使用和调用
    1、pytest -- 安装和入门
    《Fluent Python》 -- 一个关于memoryview例子的理解过程
    SecureCRT 连接 Centos7.0 (NAT模式),且能连接公网。
    SecureCRT 连接 Centos7.0 (桥接模式),且能连接公网。
    Centos7.0 三种网络适配器
    Centos 7.0 界面
  • 原文地址:https://www.cnblogs.com/qjm201000/p/10146359.html
Copyright © 2011-2022 走看看