zoukankan      html  css  js  c++  java
  • 生产者消费者实现

    1.Java 通过阻塞队列实现生产者消费者模式

    阻塞队列 Blocking Queue

    • 当队列空时,获取元素的线程会等待
    • 当队列满时,存储元素的线程会等待

    提供的方法:

    • 插入元素:

      • add(e):抛出异常
      • offer(e):返回特殊值
      • put(e):一直阻塞
      • offer(e,time,unit):超时退出
    • 移除元素:

      • remove():抛出异常
      • poll():返回特殊值
      • take():一直阻塞
      • poll(time,unit):超时退出

    JDK 7 提供了 7 个阻塞队列

    • ArrayBlockingQueue :一个由数组结构组成的 有界 阻塞队列。
      • 此队列按照先进先出(FIFO)的原则对元素进行排序。
      • 默认情况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐量。我们可以使用以下代码创建一个公平的阻塞队列:
        ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
    • LinkedBlockingQueue :一个由链表结构组成的 有界 阻塞队列。
      • 此队列按照先进先出(FIFO)的原则对元素进行排序。
    • PriorityBlockingQueue :一个支持优先级排序的 无界 阻塞队列。
      • 默认情况下元素采取自然顺序排列,也可以通过比较器 comparator 来指定元素的排序规则。元素按照升序排列。
    • DelayQueue:一个使用优先级队列实现的无界阻塞队列。
    • SynchronousQueue:一个不存储元素的阻塞队列。
    • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
    • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

    关于 通过 wait 和 notify 实现生产者消费者模式,可以参考 链接
    关于 通过 Lock 和 竞争条件 Condition 实现生产者消费者模式,可以参考 链接

    利用阻塞队列实现生产者消费者模式,代码如下:

    public class BlockingQueue_Test {
        private static final int MAX_CAPACITY = 10;
        private static ArrayBlockingQueue<Object> goods = new ArrayBlockingQueue<Object>(MAX_CAPACITY);
    
        public static void main(String[] args) {
            (new ProducerThread()).start();
    
            (new ConsumerThread()).start();
        }
    
        static class ProducerThread extends Thread {
            public void run() {
                while (true) {
                    // 每隔 1000 毫秒生产一个商品
                    try {
                        Thread.sleep(1000);
    
                        goods.put(new Object());
                        System.out.println("Produce goods, total: " + goods.size());
                    } catch (InterruptedException e) {
                    }
                }
            }
        }
    
        static class ConsumerThread extends Thread {
            public void run() {
                while (true) {
                    // 每隔 500 毫秒消费一个商品
                    try {
                        Thread.sleep(500);
    
                        goods.take();
                        System.out.println("Consume goods, total: " + goods.size());
                    } catch (InterruptedException e) {
                    }
                }
            }
        }
    }
    

    阻塞队列的实现原理

    ArrayBlockingQueue 为例,实际上使用了 ReentrantLock 和 Condition。

    构造方法:

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    

    插入元素,如果队列已满,则阻塞 notFull.await();:

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    

    移除元素,如果队列已空,则阻塞 notEmpty.await();:

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    

    2.Java 通过wait和notify实现生产者消费者模式

    wait 和 notify 的功能

    • obj.wait():

      • 如果当前调用 obj.wait() 的线程并没有获得该对象 obj 上的锁,则抛出异常 IllegalMonitorStateException
      • 否则,当前线程进入 Waiting Pool 状态
      • 线程调用 obj.wait() 方法后,在如下条件下,会从 Waiting Pool 状态进入 Waiting for monitor entry 状态:
        • 该对象的 notify() 方法被调用
        • 该对象的 notifyAll() 方法被调用
        • obj.wait() 可以添加参数,例如 obj.wait(1000),表示 1000 毫秒后自动进入 Waiting for monitor entry 状态
        • 线程被中断
    • obj.notify():唤醒 一个 正在 Waiting Pool 中等待该对象的线程进入 Waiting for monitor entry 状态

      • 具体是唤醒哪一个?与优先级无关,由 JVM 决定
    • obj.notifyAll():唤醒 所有 正在 Waiting Pool 中等待该对象的线程进入 Waiting for monitor entry 状态

    线程进入 Waiting for monitor entry 状态后,一旦该对象被解锁,这些线程就会去竞争。

    wait 和 notify 的使用

    • wait, notify, notifyAll 方法需要放在 synchronized 代码块中,因为必须先获得对象的锁!
    • 由于线程可能在非正常情况下被意外唤醒,一般需要把 wait 方法放在一个循环中,例如:
    synchronized(obj) {
        while(some condition) {
          try {
            obj.wait();
          } catch(...) {...}
        }
    }
    

    通过wait和notify实现生产者消费者模式

    代码如下:

    public class ProducerCunsumer_Test {
        private static final int MAX_CAPACITY = 10;
        private static List<Object> goods = new ArrayList<Object>();
    
        public static void main(String[] args) {
            (new ProducerThread()).start();
    
            (new ConsumerThread()).start();
        }
    
        static class ProducerThread extends Thread {
            public void run() {
                while (true) {
                    // 每隔 1000 毫秒生产一个商品
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
    
                    synchronized (goods) {
                        // 当前商品满了,生产者等待
                        if (goods.size() == MAX_CAPACITY) {
                            try {
                                System.out.println("Goods full, waiting...");
                                goods.wait();
                            } catch (Exception e) {
                            }
                        }
    
                        goods.add(new Object());
                        System.out.println("Produce goods, total: " + goods.size());
    
                        // goods.notify() 也可以
                        goods.notifyAll();
                    }
                }
            }
        }
    
        static class ConsumerThread extends Thread {
            public void run() {
                while (true) {
                    // 每隔 500 毫秒消费一个商品
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                    }
    
                    synchronized (goods) {
                        // 当前商品空了,消费者等待
                        if (goods.size() == 0) {
                            try {
                                System.out.println("No goods, waiting...");
                                goods.wait();
                            } catch (Exception e) {
                            }
                        }
    
                        goods.remove(0);
                        System.out.println("Consume goods, total: " + goods.size());
    
                        // goods.notify() 也可以
                        goods.notifyAll();
                    }
                }
            }
        }
    }
    

    在上面的代码中,消费商品的速度(500毫秒)快于生产商品的速度(1000毫秒),依次输出如下所示:
    可以看出,商品队列经常处于空的状态。

    No goods, waiting...
    Produce goods, total: 1
    Consume goods, total: 0
    No goods, waiting...
    Produce goods, total: 1
    Consume goods, total: 0
    No goods, waiting...
    Produce goods, total: 1
    Consume goods, total: 0

    如果修改,使得消费商品的速度(500毫秒)慢于生产商品的速度(100毫秒),依次输出如下所示:
    可以看出,商品队列经常处于满的状态。

    Produce goods, total: 1
    Produce goods, total: 2
    Produce goods, total: 3
    Produce goods, total: 4
    Produce goods, total: 5
    Consume goods, total: 4
    Produce goods, total: 5
    Produce goods, total: 6
    Produce goods, total: 7
    Produce goods, total: 8
    Consume goods, total: 7
    Produce goods, total: 8
    Produce goods, total: 9
    Produce goods, total: 10
    Goods full, waiting...
    Consume goods, total: 9
    Produce goods, total: 10
    Goods full, waiting...
    Consume goods, total: 9
    Produce goods, total: 10
    Goods full, waiting...
    Consume goods, total: 9
    Produce goods, total: 10
    Goods full, waiting...

    3.Java 通过 Lock 和 竞争条件 Condition 实现生产者消费者模式

    竞争条件

    多个线程共享对某些变量的访问,其最后结果取决于哪个线程偶然在竞争中获胜。

    • condition.await():类似于 obj.wait()
    • condition.signal():类似于 obj.notify()
    • condition.signalAll():类似于 obj.notifyAll()

    竞争条件 Condition 对比 obj.wait() obj.notify() 的优势

    可以建立不同的多个 Condition,针对不同的竞争条件,例如:

    Condition isFullCondition = lock.newCondition();
    Condition isEmptyCondition = lock.newCondition();
    

    关于 通过 wait 和 notify 实现生产者消费者模式,可以参考 链接

    利用 Lock 和 竞争条件 Condition 也可以实现生产者消费者模式,代码如下:

    public class Condition_Test {
        private static final int MAX_CAPACITY = 10;
        private static List<Object> goods = new ArrayList<Object>();
    
        private static final Lock lock = new ReentrantLock();
        private static final Condition isFullCondition = lock.newCondition();
        private static final Condition isEmptyCondition = lock.newCondition();
    
    
        public static void main(String[] args) {
            (new ProducerThread()).start();
    
            (new ConsumerThread()).start();
        }
    
        static class ProducerThread extends Thread {
            public void run() {
                while (true) {
                    // 每隔 1000 毫秒生产一个商品
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
    
                    // 获得锁,相当于 synchronized
                    lock.lock();
    
                    // 当前商品满了,生产者等待
                    if (goods.size() == MAX_CAPACITY) {
                        try {
                            System.out.println("Goods full, waiting...");
                            isEmptyCondition.await();
                        } catch (Exception e) {
                        }
                    }
    
                    goods.add(new Object());
                    System.out.println("Produce goods, total: " + goods.size());
    
                    // isFullCondition.signal() 也可以
                    isFullCondition.signalAll();
    
                    // 记住要释放锁
                    lock.unlock();
                }
            }
        }
    
        static class ConsumerThread extends Thread {
            public void run() {
                while (true) {
                    // 每隔 500 毫秒消费一个商品
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                    }
    
                    // 获得锁,相当于 synchronized
                    lock.lock();
    
                    // 当前商品空了,消费者等待
                    if (goods.size() == 0) {
                        try {
                            System.out.println("No goods, waiting...");
                            isFullCondition.await();
                        } catch (Exception e) {
                        }
                    }
    
                    goods.remove(0);
                    System.out.println("Consume goods, total: " + goods.size());
    
                    // isEmptyCondition.signal() 也可以
                    isEmptyCondition.signalAll();
    
                    // 记住要释放锁
                    lock.unlock();
                }
            }
        }
    }
    

    Condition 的实现原理

    Condition 的内部实现是使用节点链来实现的,每个条件实例对应一个节点链,我们有 isFullConditionisEmptyCondition 两个条件实例,所以会有两个等待节点链。当对应条件被 signal 的时候,就会把等待节点转移到同步队列中,继续竞争锁。

  • 相关阅读:
    Checking Types Against the Real World in TypeScript
    nexus pip proxy config
    go.rice 强大灵活的golang 静态资源嵌入包
    几个golang 静态资源嵌入包
    rpm 子包创建学习
    Rpm Creating Subpackages
    ava 类似jest snapshot 功能试用
    ava js 测试框架基本试用
    The Architectural Principles Behind Vrbo’s GraphQL Implementation
    graphql-compose graphql schema 生成工具集
  • 原文地址:https://www.cnblogs.com/cxhfuujust/p/10880558.html
Copyright © 2011-2022 走看看