zoukankan      html  css  js  c++  java
  • 生产者消费者模型Java实现

    生产者消费者问题是研究多线程程序时绕不开的经典问题之一。

    问题描述如下。使用一个商品的缓存池用来存放商品。当池子满时,生产者不能往池子里加入商品;当池子空时,消费者不能从池子中取得商品。


    使用Object的方法 wait() notify()/notifyAll()实现

    获取锁和释放锁都是针对Object而言的,而和线程无关。试想如果和线程相关,那么一个线程就无法使用多个锁。

    import java.util.concurrent.Executor;
    import java.util.concurrent.Executors;
    
    public class Pool {
    
        private int MAX;
        private int cnt = 0;
    
        public Pool(int MAX) {
            this.MAX = MAX;
        }
    
        public synchronized void produce() throws InterruptedException {
            while (cnt == MAX) {
                wait();
            }
            System.out.println("Produce one.. Now:" + ++cnt);
            notify();
        }
    
        public synchronized void consume() throws InterruptedException {
            while (cnt == 0) {
                wait();
            }
            System.out.println("Consume one.. Now:" + --cnt);
            notifyAll();
        }
    
        public static void main(String[] args) {
            Pool pool = new Pool(6);
            Executor executor = Executors.newCachedThreadPool();
            for (int i = 0; i < 10; i++) {
                executor.execute(() -> {
                    try {
                        pool.produce();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }
            for (int i = 0; i < 10; i++) {
                executor.execute(() -> {
                    try {
                        pool.consume();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }
        }
    }

    使用 wait() notify()/notifyAll()的缺点在于在生产者唤醒消费者或者消费者唤醒生产者时,由于生产者和消费者使用同一个锁,所以生产者也会将生产者唤醒,消费者也会将消费者唤醒。(这一点也被字节跳动的面试官问到啦TAT)

    举例:假设现在池子满了,然后有3个生产者被阻塞,现在一个消费者拿走一个item,调用notify,此时一个被阻塞的生产者被唤醒了。这个生产者向池子里放入一个产品,并执行notify意图唤醒被阻塞的消费者,此时这个notify有可能唤醒另外2个被阻塞的生产者中的一个。

    Condition可以指定多个等待的条件,因此使用Condition可以解决这一问题。


    使用ReentrantLock和Condition的await() signal()/signalAll()实现

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class Pool {
        private int capacity;
        private Object[] queue;
        private int count;
        private Lock lock = new ReentrantLock();
        private Condition prod = lock.newCondition();
        private Condition cons = lock.newCondition();
    
        public Pool(int capacity) {
            this.capacity = capacity;
            queue = new Object[capacity];
            count = 0;
        }
    
        public void produce(Object o) throws InterruptedException {
            lock.lock();
            try {
                while (count == capacity) {
                    prod.await();
                }
                queue[count++] = o;
                System.out.println(Thread.currentThread().getName()
                        + " produce " + o.toString() + ". current count: " + count);
                cons.signal();
            } finally {
                lock.unlock();
            }
        }
    
        public Object consume() throws InterruptedException {
            lock.lock();
            try {
                while (count == 0) {
                    cons.await();
                }
                Object res = queue[--count];
                queue[count] = null;
                System.out.println(Thread.currentThread().getName()
                        + " consume " + res.toString() + ". current count: " + count);
                prod.signal();
                return res;
            } finally {
                lock.unlock();
            }
        }
    
        public static void main(String[] args) {
            Pool pool = new Pool(10);
            ExecutorService executorService = Executors.newCachedThreadPool();
            int cnt = 20;
            while (cnt-- > 0) {
                executorService.execute(() -> {
                    try {
                        pool.produce(new Object());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
                executorService.execute(() -> {
                    try {
                        pool.consume();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }
        }
    }
  • 相关阅读:
    button 垂直分布
    GitHub上值得关注的iOS开源项目
    电脑连接网络(网络正常),但不能上网,登录网页提示dns_probe_finished_no_internet
    android 模拟应用因内存不足被后台杀死命令
    android 屏幕划分
    android 没有root的手机导出数据库
    移动硬盘不能识别,设备管理器中显示黄色感叹号
    低功耗蓝牙开发(BLE)
    音视频学习笔记
    Java中为什么要使用线程池?如何使用?
  • 原文地址:https://www.cnblogs.com/yfzhou/p/10996801.html
Copyright © 2011-2022 走看看