zoukankan      html  css  js  c++  java
  • 生产者-消费者模型的3种Java实现:synchronized,signal/notifyAll及BlockingQueue

    我的技术博客经常被流氓网站恶意爬取转载。请移步原文:http://www.cnblogs.com/hamhog/p/3555111.html,享受整齐的排版、有效的链接、正确的代码缩进、更好的阅读体验。

    【实现1:synchronized】

    含测试函数main。

    public class ProductManagerUsingSync {
        
        static final int MAX_AMOUNT = 1000;
        int currentAmount;
    
        /**
         * @param args
         */
        public static void main(String[] args) {
            ProductManagerUsingSync manager = new ProductManagerUsingSync(); 
            
            for (int i = 0; i < 5; i++){
                int consume = (int) Math.round(Math.random()*50) + 10;
                Thread consumerThread = new Thread(new ConsumerWithSync(consume, manager));
                consumerThread.start();
            }
            
            for (int i = 0; i < 10; i++){
                int produce = (int) Math.round(Math.random()*50) + 10;
                Thread producerThread = new Thread(new ProducerWithSync(produce, manager));
                producerThread.start();
            }
        }
        
        public ProductManagerUsingSync() {
            currentAmount = 0;
        }
        
        /**
         * Add product. If can't, return.
         * @param addAmount
         * @return if succeeded.
         */
        public boolean addProduct(int addAmount){
            if (currentAmount + addAmount > MAX_AMOUNT)
                return false;
            
            currentAmount += addAmount;
            System.out.println("produced: " + addAmount + " current: " + currentAmount); 
            return true;
        }
    
        /**
         * Take product. If can't, return.
         * @param takeAmount The amount of product to take.
    * @return if succeeded.
    */ public boolean takeProduct(int takeAmount){ if (takeAmount > currentAmount) return false; currentAmount -= takeAmount; System.out.println("consumed: " + takeAmount + " current: " + currentAmount); return true; } } class ProducerWithSync implements Runnable { private int amount; private ProductManagerUsingSync manager; ProducerWithSync(int amount, ProductManagerUsingSync manager) { this.amount = amount; this.manager = manager; } @Override public void run() { while (true) { synchronized (manager) { if (manager.addProduct(amount)) return; } } } } class ConsumerWithSync implements Runnable { private int amount; private ProductManagerUsingSync manager; ConsumerWithSync(int amount, ProductManagerUsingSync manager) { this.amount = amount; this.manager = manager; } @Override public void run() { while (true) { synchronized (manager) { if (manager.takeProduct(amount)) return; } } } }

    解释:Consumer类和Producer类在run方法中进行产品的生产和消费。重点在于:1. 在尝试生产、消费前会获取manager上的锁。由于所有的生产者、消费者中的manager都是同一个实例,因此消费、生产过程是保证线程安全(单线程串行)的。2. 在生产、消费失败的情况下,会进入死循环,反复再次尝试,直到成功为止。

    这种实现方法下,暂时不能生产、消费时需要一直死循环,太占资源了;如果在每次循环之间sleep,则不一定能及时生产、消费。

    【实现2:signal/notifyAll】

    含测试函数main。

    public class ProductManagerUsingSignal {
    
        static final int MAX_AMOUNT = 1000;
        int currentAmount;
    
        /**
         * @param args useless
         */
        public static void main(String[] args) {
            ProductManagerUsingSignal manager = new ProductManagerUsingSignal(); 
            
            for (int i = 0; i < 5; i++){
                int consume = (int) Math.round(Math.random()*50);
                Thread consumerThread = new Thread(new Consumer(consume, manager));
                consumerThread.start();
            }
            
            for (int i = 0; i < 10; i++){
                int produce = (int) Math.round(Math.random()*50);
                Thread producerThread = new Thread(new Producer(produce, manager));
                producerThread.start();
            }
        }
        
        public ProductManagerUsingSignal(){
            currentAmount = 0;
        }
    
        /**
         * Add product. If can't, wait. NotifyAll when finished.
         * @param addAmount The amount of product to add.
         */
        public synchronized void addProduct(int addAmount){
            while (currentAmount + addAmount > MAX_AMOUNT) { 
                try { 
                    wait(); 
                } catch (InterruptedException e) { 
                    e.printStackTrace(); 
                } 
            } 
            currentAmount += addAmount; 
            System.out.println("produced: " + addAmount + " current: " + currentAmount); 
            notifyAll(); 
        }
    
        /**
         * Take product. If can't, wait. NotifyAll when finished.
         * @param takeAmount The amount of product to take.
         */
        public synchronized void takeProduct(int takeAmount){
            while (takeAmount > currentAmount) { 
                try { 
                    wait(); 
                } catch (InterruptedException e) { 
                    e.printStackTrace(); 
                } 
            } 
            currentAmount -= takeAmount; 
            System.out.println("consumed: " + takeAmount + " current: " + currentAmount); 
            notifyAll();
        }
    
    }
    
    class Producer implements Runnable { 
        private int amount;
        private ProductManagerUsingSignal manager; 
    
        Producer(int amount, ProductManagerUsingSignal manager) { 
            this.amount = amount; 
            this.manager = manager; 
        } 
    
        @Override
        public void run() {
            manager.addProduct(amount); 
        } 
    } 
    
    class Consumer implements Runnable { 
        private int amount;
        private ProductManagerUsingSignal manager;
    
        Consumer(int amount, ProductManagerUsingSignal manager) { 
            this.amount = amount; 
            this.manager = manager; 
        } 
    
        @Override
        public void run() { 
            manager.takeProduct(amount); 
        } 
    }

    解释:这种实现同样用synchronized保证线程安全;它的重点在于,当生产、消费失败时,会进入wait状态,让位给其他线程;而完成一次成功的生产或消费后,会调用notifyAll方法,唤醒之前等待状态的进程。这种实现在效率上要好于第一种。

    【实现3:BlockingQueue】

    含测试函数main。

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    
    public class ProductManagerUsingBlockingQueue {

    BlockingQueue<Integer> sharedQueue;
    /**
         * @param args
         */
        public static void main(String[] args) {
            sharedQueue = new LinkedBlockingQueue<Integer>();
    
            for (int i = 0; i < 10; i++){
                Thread consumerThread = new Thread(new ConsumerWithBlockingQueue(sharedQueue));
                consumerThread.start();
            }
    
            for (int i = 0; i < 10; i++){
                Thread producerThread = new Thread(new ProducerWithBlockingQueue(i, sharedQueue));
                producerThread.start();
            }
        }
    
    }
    
    class ProducerWithBlockingQueue implements Runnable {
    
        private int amount;
        private final BlockingQueue<Integer> sharedQueue;
    
        public ProducerWithBlockingQueue (int amount, BlockingQueue<Integer> sharedQueue) {
            this.amount = amount;
            this.sharedQueue = sharedQueue;
        }
    
        @Override
        public void run() {
    
            try {
                sharedQueue.put(amount);
                System.out.println("produced: " + amount);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }
    
    class ConsumerWithBlockingQueue implements Runnable{
    
        private final BlockingQueue<Integer> sharedQueue;
    
        public ConsumerWithBlockingQueue (BlockingQueue<Integer> sharedQueue) {
            this.sharedQueue = sharedQueue;
        }
    
        @Override
        public void run() {
            try {
                System.out.println("consumed: " + sharedQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }

    解释:这种方法借助数据结构BlockingQueue(初始化好像应该放在构造函数里,暂时来不及改了),底层原理与signal/notifyAll类似,但代码实现就简洁了许多。

    【总结】

    在需要实现生产者-消费者模式的场景下,我们可以优先考虑用BlockingQueue来实现。

  • 相关阅读:
    go函数
    Linux 查看磁盘容量、查找大文件、查找大目录
    五分钟理解一致性哈希算法(consistent hashing)
    使用Java实现三个线程交替打印0-74
    Python实现IOC控制反转
    Wannafly挑战赛5 A珂朵莉与宇宙 前缀和+枚举平方数
    Yandex Big Data Essentials Week1 Scaling Distributed File System
    Yandex Big Data Essentials Week1 Unix Command Line Interface Processes managing
    Yandex Big Data Essentials Week1 Unix Command Line Interface File Content exploration
    Yandex Big Data Essentials Week1 Unix Command Line Interface File System exploration
  • 原文地址:https://www.cnblogs.com/hamhog/p/3555111.html
Copyright © 2011-2022 走看看