zoukankan      html  css  js  c++  java
  • 生产者和消费者模型

    生产者和消费者模型

    1. 什么是生产者和消费者模型

    生产者消费者模型具体来讲,就是在一个系统中,存在生产者和消费者两种角色,他们通过内存缓冲区进行通信,生产者生产消费者需要的资料,消费者把资料做成产品。

    再具体一点:

    1. 生产者生产数据到缓冲区中,消费者从缓冲区中取数据。
    2. 如果缓冲区已经满了,则生产者线程阻塞。
    3. 如果缓冲区为空,那么消费者线程阻塞。

    2. 如何实现

    实现生产者消费者模型有两种方式:

    1. 采用 wait—notify 方式实现生产者消费者模型(注意这里需要加同步锁 synchronized)
    2. 采用 阻塞队列 方式实现生产者消费者模式

    3. wait-notify 方式

    实现过程并不复杂,直接上代码:

    这里设置了生产者生产速度大于消费者消费速度(通过 sleep() 方法实现)。

    缓冲区 BufferArea.java

    public class BufferArea {
    
        // 当前资源数量的计数值
        private int currNum = 0;
    
        // 资源池中允许存放的资源数目
        private int maxSize = 10;
    
        /**
         * 从资源池中取走资源
         */
        public synchronized void get() {
            if (currNum > 0) {
                currNum--;
                System.out.println("Cosumer_" + Thread.currentThread().getName() + "消耗一件资源," + "当前缓冲区有" + currNum + "个");
                // 通知生产者生产资源
                notifyAll();
            } else {
                try {
                    // 如果没有资源,则 Cosumer_ 进入等待状态
                    System.out.println("Cosumer_" + Thread.currentThread().getName() + ": 当前缓冲区资源不足,进入等待状态");
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * 向缓冲区中添加资源
         */
        public synchronized void put() {
            // 若当前缓冲区内的资源计数小于最大 size 数,才加
            if (currNum < maxSize) {
                currNum++;
                System.out.println(Thread.currentThread().getName() + "生产一件资源,当前资源池有" + currNum + "个");
    
                // 通知等待的消费者
                notifyAll();
            } else {
                // 若当前缓冲区的资源计数大于最大 size 数,则等待
                try {
                    System.out.println(Thread.currentThread().getName() + "线程进入等待 << 当前缓冲区的资源计数大于最大 size 数");
                    // 生产者进入等待状态,并释放锁
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
    }
    

    生产者 Producer.java

    public class Producer extends Thread {
    
        private BlockQueueBufferArea mBufferArea;
    
        public Producer(BlockQueueBufferArea bufferArea) {
            this.mBufferArea = bufferArea;
            setName("Producer_" + getName());
        }
    
        @Override
        public void run() {
            // 不断的生产资源
            while (true) {
                sleepSomeTime();
                mBufferArea.put();
            }
        }
    
        private void sleepSomeTime() {
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }
    

    消费者 Consumer

    public class Consumer extends Thread {
    
        private BlockQueueBufferArea mBufferArea;
    
        public Consumer(BlockQueueBufferArea bufferArea) {
            this.mBufferArea = bufferArea;
            setName("Consumer_" + getName());
        }
    
        @Override
        public void run() {
            // 不断的取出资源
            while (true) {
                sleepSomeTime();
                mBufferArea.get();
            }
        }
    
        private void sleepSomeTime() {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }
    

    测试 Test.java

    public class Test {
    
        public static void main(String[] args) {
            BlockQueueBufferArea bufferArea = new BlockQueueBufferArea();
    
            Consumer consumer1 = new Consumer(bufferArea);
            Consumer consumer2 = new Consumer(bufferArea);
            Consumer consumer3 = new Consumer(bufferArea);
    
            Producer producer1 = new Producer(bufferArea);
            Producer producer2 = new Producer(bufferArea);
            Producer producer3 = new Producer(bufferArea);
    
            consumer1.start();
            consumer2.start();
            consumer3.start();
    
            producer1.start();
            producer2.start();
            producer3.start();
    
        }
    
    }
    

    打印结果如下:

    ProducerThread-5生产一件资源,当前资源池有1个
    ProducerThread-4生产一件资源,当前资源池有2个
    ProducerThread-3生产一件资源,当前资源池有3个
    ProducerThread-5生产一件资源,当前资源池有4个
    ProducerThread-4生产一件资源,当前资源池有5个
    ProducerThread-3生产一件资源,当前资源池有6个
    ProducerThread-5生产一件资源,当前资源池有7个
    ProducerThread-4生产一件资源,当前资源池有8个
    ProducerThread-3生产一件资源,当前资源池有9个
    ProducerThread-3生产一件资源,当前资源池有10个
    ProducerThread-4线程进入等待 << 当前缓冲区的资源计数大于最大 size 数
    ProducerThread-5线程进入等待 << 当前缓冲区的资源计数大于最大 size 数
    ProducerThread-3线程进入等待 << 当前缓冲区的资源计数大于最大 size 数
    
    >> 注释:3个生产者线程生产满了10个(maxSize)产品,然后就都进入了等待
    
    Cosumer_Consumer_Thread-0消耗一件资源,当前缓冲区有9个
    Cosumer_Consumer_Thread-1消耗一件资源,当前缓冲区有8个
    Cosumer_Consumer_Thread-2消耗一件资源,当前缓冲区有7个
    
    >> 注释:3个消费者消费了3个产品
    
    ProducerThread-3生产一件资源,当前资源池有8个
    ProducerThread-5生产一件资源,当前资源池有9个
    ProducerThread-4生产一件资源,当前资源池有10个
    
    >> 注释:生产者立马又生产3个
    
    ...
    
    >> 然后一直循环往复这个过程
    

    4. 阻塞队列方式

    阻塞队列的特点:
    • 当队列元素已满的时候,阻塞插入操作
    • 当队列元素为空的时候,阻塞获取操作
    不同的阻塞队列:

    ArrayBlockingQueue 与 LinkedBlockingQueue 都是支持 FIFO (先进先出),但是 LinkedBlockingQueue 是无界的,而ArrayBlockingQueue 是有界的。

    这里我们采用无界阻塞队列来演示生产者消费者模式。

    演示

    还是设置生产者生产速度大于消费者消费速度(通过 sleep() 方法实现)

    缓冲区 BlockQueueBufferArea.java

    public class BlockQueueBufferArea {
    
        BlockingQueue<Integer> mProductPoll = new LinkedBlockingQueue(10);
    
        public void  put() {
            try {
                System.out.println(Thread.currentThread().getName() + "产品池被放入了一个资源");
                mProductPoll.put(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public void get() {
            try {
                System.out.println(Thread.currentThread().getName() + "产品池被取走了一个资源");
                mProductPoll.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }
    

    生产者 Producer.java

    public class Producer extends Thread {
    
        private BlockQueueBufferArea mBufferArea;
    
        public Producer(BlockQueueBufferArea bufferArea) {
            this.mBufferArea = bufferArea;
            setName("Producer_" + getName());
        }
    
        @Override
        public void run() {
            // 不断的生产资源
            while (true) {
                sleepSomeTime();
                mBufferArea.put();
            }
        }
    
        private void sleepSomeTime() {
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }
    

    消费者 Consumer.java

    public class Consumer extends Thread {
    
        private BlockQueueBufferArea mBufferArea;
    
        public Consumer(BlockQueueBufferArea bufferArea) {
            this.mBufferArea = bufferArea;
            setName("Consumer_" + getName());
        }
    
        @Override
        public void run() {
            // 不断的取出资源
            while (true) {
                sleepSomeTime();
                mBufferArea.get();
            }
        }
    
        private void sleepSomeTime() {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }
    

    测试 Test.java

    public class Test {
    
        public static void main(String[] args) {
            BlockQueueBufferArea bufferArea = new BlockQueueBufferArea();
    
            Consumer consumer1 = new Consumer(bufferArea);
            Consumer consumer2 = new Consumer(bufferArea);
            Consumer consumer3 = new Consumer(bufferArea);
    
            Producer producer1 = new Producer(bufferArea);
            Producer producer2 = new Producer(bufferArea);
            Producer producer3 = new Producer(bufferArea);
    
            consumer1.start();
            consumer2.start();
            consumer3.start();
    
            producer1.start();
            producer2.start();
            producer3.start();
    
        }
    
    }
    

    打印结果如下:

    Producer_Thread-5产品池被放入了一个资源
    Producer_Thread-4产品池被放入了一个资源
    Producer_Thread-3产品池被放入了一个资源
    Producer_Thread-3产品池被放入了一个资源
    Producer_Thread-4产品池被放入了一个资源
    Producer_Thread-5产品池被放入了一个资源
    Producer_Thread-3产品池被放入了一个资源
    Producer_Thread-4产品池被放入了一个资源
    Producer_Thread-5产品池被放入了一个资源
    Producer_Thread-3产品池被放入了一个资源
    Producer_Thread-4产品池被放入了一个资源
    Producer_Thread-5产品池被放入了一个资源
    Producer_Thread-3产品池被放入了一个资源
    Consumer_Thread-0产品池被取走了一个资源
    Consumer_Thread-1产品池被取走了一个资源
    Consumer_Thread-2产品池被取走了一个资源
    Producer_Thread-4产品池被放入了一个资源
    Producer_Thread-5产品池被放入了一个资源
    Producer_Thread-3产品池被放入了一个资源
    

    5. 参考

  • 相关阅读:
    反射实现Model修改前后的内容对比
    [C#] 将NLog输出到RichTextBox,并在运行时动态修改日志级别过滤
    C#远程调用技术WebService葵花宝典
    C# winform实现右下角弹出窗口结果的方法
    C# / VB.NET合并PDF指定页
    C# Word转PDF/HTML/XML/XPS/SVG/EMF/EPUB/TIFF
    C# 将PDF转为SVG的3种情况
    C# 如何将PDF转为多种图像文件格式(Png/Bmp/Emf/Tiff)
    C# 按指定范围拆分Excel工作表
    Powershell如何在Start-Job的Scriptblock里传参?
  • 原文地址:https://www.cnblogs.com/weixuqin/p/11430981.html
Copyright © 2011-2022 走看看