zoukankan      html  css  js  c++  java
  • 生产者消费者实现

    1.Java 通过阻塞队列实现生产者消费者模式

    阻塞队列 Blocking Queue

    • 当队列空时,获取元素的线程会等待
    • 当队列满时,存储元素的线程会等待

    提供的方法:

    • 插入元素:

      • add(e):抛出异常
      • offer(e):返回特殊值
      • put(e):一直阻塞
      • offer(e,time,unit):超时退出
    • 移除元素:

      • remove():抛出异常
      • poll():返回特殊值
      • take():一直阻塞
      • poll(time,unit):超时退出

    JDK 7 提供了 7 个阻塞队列

    • ArrayBlockingQueue :一个由数组结构组成的 有界 阻塞队列。
      • 此队列按照先进先出(FIFO)的原则对元素进行排序。
      • 默认情况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐量。我们可以使用以下代码创建一个公平的阻塞队列:
        ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
    • LinkedBlockingQueue :一个由链表结构组成的 有界 阻塞队列。
      • 此队列按照先进先出(FIFO)的原则对元素进行排序。
    • PriorityBlockingQueue :一个支持优先级排序的 无界 阻塞队列。
      • 默认情况下元素采取自然顺序排列,也可以通过比较器 comparator 来指定元素的排序规则。元素按照升序排列。
    • DelayQueue:一个使用优先级队列实现的无界阻塞队列。
    • SynchronousQueue:一个不存储元素的阻塞队列。
    • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
    • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

    关于 通过 wait 和 notify 实现生产者消费者模式,可以参考 链接
    关于 通过 Lock 和 竞争条件 Condition 实现生产者消费者模式,可以参考 链接

    利用阻塞队列实现生产者消费者模式,代码如下:

    public class BlockingQueue_Test {
        private static final int MAX_CAPACITY = 10;
        private static ArrayBlockingQueue<Object> goods = new ArrayBlockingQueue<Object>(MAX_CAPACITY);
    
        public static void main(String[] args) {
            (new ProducerThread()).start();
    
            (new ConsumerThread()).start();
        }
    
        static class ProducerThread extends Thread {
            public void run() {
                while (true) {
                    // 每隔 1000 毫秒生产一个商品
                    try {
                        Thread.sleep(1000);
    
                        goods.put(new Object());
                        System.out.println("Produce goods, total: " + goods.size());
                    } catch (InterruptedException e) {
                    }
                }
            }
        }
    
        static class ConsumerThread extends Thread {
            public void run() {
                while (true) {
                    // 每隔 500 毫秒消费一个商品
                    try {
                        Thread.sleep(500);
    
                        goods.take();
                        System.out.println("Consume goods, total: " + goods.size());
                    } catch (InterruptedException e) {
                    }
                }
            }
        }
    }
    

    阻塞队列的实现原理

    ArrayBlockingQueue 为例,实际上使用了 ReentrantLock 和 Condition。

    构造方法:

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    

    插入元素,如果队列已满,则阻塞 notFull.await();:

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    

    移除元素,如果队列已空,则阻塞 notEmpty.await();:

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    

    2.Java 通过wait和notify实现生产者消费者模式

    wait 和 notify 的功能

    • obj.wait():

      • 如果当前调用 obj.wait() 的线程并没有获得该对象 obj 上的锁,则抛出异常 IllegalMonitorStateException
      • 否则,当前线程进入 Waiting Pool 状态
      • 线程调用 obj.wait() 方法后,在如下条件下,会从 Waiting Pool 状态进入 Waiting for monitor entry 状态:
        • 该对象的 notify() 方法被调用
        • 该对象的 notifyAll() 方法被调用
        • obj.wait() 可以添加参数,例如 obj.wait(1000),表示 1000 毫秒后自动进入 Waiting for monitor entry 状态
        • 线程被中断
    • obj.notify():唤醒 一个 正在 Waiting Pool 中等待该对象的线程进入 Waiting for monitor entry 状态

      • 具体是唤醒哪一个?与优先级无关,由 JVM 决定
    • obj.notifyAll():唤醒 所有 正在 Waiting Pool 中等待该对象的线程进入 Waiting for monitor entry 状态

    线程进入 Waiting for monitor entry 状态后,一旦该对象被解锁,这些线程就会去竞争。

    wait 和 notify 的使用

    • wait, notify, notifyAll 方法需要放在 synchronized 代码块中,因为必须先获得对象的锁!
    • 由于线程可能在非正常情况下被意外唤醒,一般需要把 wait 方法放在一个循环中,例如:
    synchronized(obj) {
        while(some condition) {
          try {
            obj.wait();
          } catch(...) {...}
        }
    }
    

    通过wait和notify实现生产者消费者模式

    代码如下:

    public class ProducerCunsumer_Test {
        private static final int MAX_CAPACITY = 10;
        private static List<Object> goods = new ArrayList<Object>();
    
        public static void main(String[] args) {
            (new ProducerThread()).start();
    
            (new ConsumerThread()).start();
        }
    
        static class ProducerThread extends Thread {
            public void run() {
                while (true) {
                    // 每隔 1000 毫秒生产一个商品
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
    
                    synchronized (goods) {
                        // 当前商品满了,生产者等待
                        if (goods.size() == MAX_CAPACITY) {
                            try {
                                System.out.println("Goods full, waiting...");
                                goods.wait();
                            } catch (Exception e) {
                            }
                        }
    
                        goods.add(new Object());
                        System.out.println("Produce goods, total: " + goods.size());
    
                        // goods.notify() 也可以
                        goods.notifyAll();
                    }
                }
            }
        }
    
        static class ConsumerThread extends Thread {
            public void run() {
                while (true) {
                    // 每隔 500 毫秒消费一个商品
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                    }
    
                    synchronized (goods) {
                        // 当前商品空了,消费者等待
                        if (goods.size() == 0) {
                            try {
                                System.out.println("No goods, waiting...");
                                goods.wait();
                            } catch (Exception e) {
                            }
                        }
    
                        goods.remove(0);
                        System.out.println("Consume goods, total: " + goods.size());
    
                        // goods.notify() 也可以
                        goods.notifyAll();
                    }
                }
            }
        }
    }
    

    在上面的代码中,消费商品的速度(500毫秒)快于生产商品的速度(1000毫秒),依次输出如下所示:
    可以看出,商品队列经常处于空的状态。

    No goods, waiting...
    Produce goods, total: 1
    Consume goods, total: 0
    No goods, waiting...
    Produce goods, total: 1
    Consume goods, total: 0
    No goods, waiting...
    Produce goods, total: 1
    Consume goods, total: 0

    如果修改,使得消费商品的速度(500毫秒)慢于生产商品的速度(100毫秒),依次输出如下所示:
    可以看出,商品队列经常处于满的状态。

    Produce goods, total: 1
    Produce goods, total: 2
    Produce goods, total: 3
    Produce goods, total: 4
    Produce goods, total: 5
    Consume goods, total: 4
    Produce goods, total: 5
    Produce goods, total: 6
    Produce goods, total: 7
    Produce goods, total: 8
    Consume goods, total: 7
    Produce goods, total: 8
    Produce goods, total: 9
    Produce goods, total: 10
    Goods full, waiting...
    Consume goods, total: 9
    Produce goods, total: 10
    Goods full, waiting...
    Consume goods, total: 9
    Produce goods, total: 10
    Goods full, waiting...
    Consume goods, total: 9
    Produce goods, total: 10
    Goods full, waiting...

    3.Java 通过 Lock 和 竞争条件 Condition 实现生产者消费者模式

    竞争条件

    多个线程共享对某些变量的访问,其最后结果取决于哪个线程偶然在竞争中获胜。

    • condition.await():类似于 obj.wait()
    • condition.signal():类似于 obj.notify()
    • condition.signalAll():类似于 obj.notifyAll()

    竞争条件 Condition 对比 obj.wait() obj.notify() 的优势

    可以建立不同的多个 Condition,针对不同的竞争条件,例如:

    Condition isFullCondition = lock.newCondition();
    Condition isEmptyCondition = lock.newCondition();
    

    关于 通过 wait 和 notify 实现生产者消费者模式,可以参考 链接

    利用 Lock 和 竞争条件 Condition 也可以实现生产者消费者模式,代码如下:

    public class Condition_Test {
        private static final int MAX_CAPACITY = 10;
        private static List<Object> goods = new ArrayList<Object>();
    
        private static final Lock lock = new ReentrantLock();
        private static final Condition isFullCondition = lock.newCondition();
        private static final Condition isEmptyCondition = lock.newCondition();
    
    
        public static void main(String[] args) {
            (new ProducerThread()).start();
    
            (new ConsumerThread()).start();
        }
    
        static class ProducerThread extends Thread {
            public void run() {
                while (true) {
                    // 每隔 1000 毫秒生产一个商品
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
    
                    // 获得锁,相当于 synchronized
                    lock.lock();
    
                    // 当前商品满了,生产者等待
                    if (goods.size() == MAX_CAPACITY) {
                        try {
                            System.out.println("Goods full, waiting...");
                            isEmptyCondition.await();
                        } catch (Exception e) {
                        }
                    }
    
                    goods.add(new Object());
                    System.out.println("Produce goods, total: " + goods.size());
    
                    // isFullCondition.signal() 也可以
                    isFullCondition.signalAll();
    
                    // 记住要释放锁
                    lock.unlock();
                }
            }
        }
    
        static class ConsumerThread extends Thread {
            public void run() {
                while (true) {
                    // 每隔 500 毫秒消费一个商品
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                    }
    
                    // 获得锁,相当于 synchronized
                    lock.lock();
    
                    // 当前商品空了,消费者等待
                    if (goods.size() == 0) {
                        try {
                            System.out.println("No goods, waiting...");
                            isFullCondition.await();
                        } catch (Exception e) {
                        }
                    }
    
                    goods.remove(0);
                    System.out.println("Consume goods, total: " + goods.size());
    
                    // isEmptyCondition.signal() 也可以
                    isEmptyCondition.signalAll();
    
                    // 记住要释放锁
                    lock.unlock();
                }
            }
        }
    }
    

    Condition 的实现原理

    Condition 的内部实现是使用节点链来实现的,每个条件实例对应一个节点链,我们有 isFullConditionisEmptyCondition 两个条件实例,所以会有两个等待节点链。当对应条件被 signal 的时候,就会把等待节点转移到同步队列中,继续竞争锁。

  • 相关阅读:
    在其他博客里看到的比较好的map用法,进行储存啦啦~ x
    codevs 2597 团伙x
    codevs 1009 产生数x
    格子游戏x(并查集)
    codevs 5929 亲戚x
    [HDOJ2389]Rain on your Parade(二分图最大匹配,HK算法)
    [HDOJ2819]Swap(二分图最大匹配, 匈牙利算法)
    [HDOJ1281]棋盘游戏(二分图最大匹配,匈牙利算法)
    [HDOJ1083]Courses(二分图最大匹配,匈牙利算法)
    [HDOJ2444]The Accomodation of Students(二分图染色判定,最大匹配,匈牙利算法)
  • 原文地址:https://www.cnblogs.com/cxhfuujust/p/10880558.html
Copyright © 2011-2022 走看看