zoukankan      html  css  js  c++  java
  • 【1】生产者-消费者模型的三种实现方式

    (手写生产者消费者模型,写BlockingQueue较简便 )

    1、背景                                                                    

    生产者生产数据到缓冲区中,消费者从缓冲区中取数据。

    如果缓冲区已经满了,则生产者线程阻塞;

    如果缓冲区为空,那么消费者线程阻塞。

    2、方式一:synchronized、wait和notify

    定义Resouce资源类,类中定义资源池大小。资源类的add()和remove()方法是synchronized 的。生产者/消费者线程持有一个资源类Resouce的成员变量,Main方法中通过构造函数将Resouce类传入,线程run方法中操作Resouce类的add,remove方法

    package producerConsumer;
    //wait 和 notify
    public class ProducerConsumerWithWaitNofity {
        public static void main(String[] args) {
            Resource resource = new Resource();
            //生产者线程
            ProducerThread p1 = new ProducerThread(resource);
            ProducerThread p2 = new ProducerThread(resource);
            ProducerThread p3 = new ProducerThread(resource);
            //消费者线程
            ConsumerThread c1 = new ConsumerThread(resource);
            //ConsumerThread c2 = new ConsumerThread(resource);
            //ConsumerThread c3 = new ConsumerThread(resource);
        
            p1.start();
            p2.start();
            p3.start();
            c1.start();
            //c2.start();
            //c3.start();
        }
        
        
        
    }
    /**
     * 公共资源类
     * @author 
     *
     */
    class Resource{//重要
        //当前资源数量
        private int num = 0;
        //资源池中允许存放的资源数目
        private int size = 10;
    
        /**
         * 从资源池中取走资源
         */
        public synchronized void remove(){
            if(num > 0){
                num--;
                System.out.println("消费者" + Thread.currentThread().getName() +
                        "消耗一件资源," + "当前线程池有" + num + "个");
                notifyAll();//通知生产者生产资源
            }else{
                try {
                    //如果没有资源,则消费者进入等待状态
                    wait();
                    System.out.println("消费者" + Thread.currentThread().getName() + "线程进入等待状态");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        /**
         * 向资源池中添加资源
         */
        public synchronized void add(){
            if(num < size){
                num++;
                System.out.println(Thread.currentThread().getName() + "生产一件资源,当前资源池有" 
                + num + "个");
                //通知等待的消费者
                notifyAll();
            }else{
                //如果当前资源池中有10件资源
                try{
                    wait();//生产者进入等待状态,并释放锁
                    System.out.println(Thread.currentThread().getName()+"线程进入等待");
                }catch(InterruptedException e){
                    e.printStackTrace();
                }
            }
        }
    }
    /**
     * 消费者线程
     */
    class ConsumerThread extends Thread{
        private Resource resource;
        public ConsumerThread(Resource resource){
            this.resource = resource;
        }
        @Override
        public void run() {
            while(true){
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                resource.remove();
            }
        }
    }
    /**
     * 生产者线程
     */
    class ProducerThread extends Thread{
        private Resource resource;
        public ProducerThread(Resource resource){
            this.resource = resource;
        }
        @Override
        public void run() {
            //不断地生产资源
            while(true){
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                resource.add();
            }
        }
        
    }
    View Code

    3、方式二:lock和condition的await、signalAll     

    package producerConsumer;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    /**
     * 使用Lock 和 Condition解决生产者消费者问题
     * @author tangzhijing
     *
     */
    public class LockCondition {
            public static void main(String[] args) {
                Lock lock = new ReentrantLock();
                Condition producerCondition = lock.newCondition();
                Condition consumerCondition = lock.newCondition();
                Resource2 resource = new Resource2(lock,producerCondition,consumerCondition);
                
                //生产者线程
                ProducerThread2 producer1 = new ProducerThread2(resource);
                
                //消费者线程
                ConsumerThread2 consumer1 = new ConsumerThread2(resource);
                ConsumerThread2 consumer2 = new ConsumerThread2(resource);
                ConsumerThread2 consumer3 = new ConsumerThread2(resource);
                
                producer1.start();
                consumer1.start();
                consumer2.start();
                consumer3.start();
            }
    }
    /**
     * 消费者线程
     */
    class ConsumerThread2 extends Thread{
        private Resource2 resource;
        public ConsumerThread2(Resource2 resource){
            this.resource = resource;
            //setName("消费者");
        }
        public void run(){
            while(true){
                try {
                    Thread.sleep((long) (1000 * Math.random()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                resource.remove();
            }
        }
    }
    /**
     * 生产者线程
     * @author tangzhijing
     *
     */
    class ProducerThread2 extends Thread{
        private Resource2 resource;
        public ProducerThread2(Resource2 resource){
            this.resource = resource;
            setName("生产者");
        }
        public void run(){
            while(true){
                    try {
                        Thread.sleep((long) (1000 * Math.random()));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    resource.add();
            }
        }
    }
    /**
     * 公共资源类
     * @author tangzhijing
     *
     */
    class Resource2{
        private int num = 0;//当前资源数量
        private int size = 10;//资源池中允许存放的资源数目
        private Lock lock;
        private Condition producerCondition;
        private Condition consumerCondition;
        public Resource2(Lock lock, Condition producerCondition, Condition consumerCondition) {
            this.lock = lock;
            this.producerCondition = producerCondition;
            this.consumerCondition = consumerCondition;
     
        }
        /**
         * 向资源池中添加资源
         */
        public void add(){
            lock.lock();
            try{
                if(num < size){
                    num++;
                    System.out.println(Thread.currentThread().getName() + 
                            "生产一件资源,当前资源池有" + num + "个");
                    //唤醒等待的消费者
                    consumerCondition.signalAll();
                }else{
                    //让生产者线程等待
                    try {
                        producerCondition.await();
                        System.out.println(Thread.currentThread().getName() + "线程进入等待");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }finally{
                lock.unlock();
            }
        }
        /**
         * 从资源池中取走资源
         */
        public void remove(){
            lock.lock();
            try{
                if(num > 0){
                    num--;
                    System.out.println("消费者" + Thread.currentThread().getName() 
                            + "消耗一件资源," + "当前资源池有" + num + "个");
                    producerCondition.signalAll();//唤醒等待的生产者
                }else{
                    try {
                        consumerCondition.await();
                        System.out.println(Thread.currentThread().getName() + "线程进入等待");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }//让消费者等待
                }
            }finally{
                lock.unlock();
            }
        }
        
    }
    View Code

    4、方式三:BlockingQueue       

    定义Resouce资源类,资源类持有一个BlockingQueue。生产者/消费者线程持有一个资源类Resouce的成员变量,Main方法中通过构造函数将Resouce类传入,线程run方法中操作Resouce类的add,remove方法,add,remove调用Queue的put()take()

    package producerConsumer;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    
    //使用阻塞队列BlockingQueue解决生产者消费者
    public class BlockingQueueConsumerProducer {
        public static void main(String[] args) {
            Resource3 resource = new Resource3();
            //生产者线程
            ProducerThread3 p = new ProducerThread3(resource);
            //多个消费者
            ConsumerThread3 c1 = new ConsumerThread3(resource);
            ConsumerThread3 c2 = new ConsumerThread3(resource);
            ConsumerThread3 c3 = new ConsumerThread3(resource);
     
            p.start();
            c1.start();
            c2.start();
            c3.start();
        }
    }
    /**
     * 消费者线程
     * @author tangzhijing
     *
     */
    class ConsumerThread3 extends Thread {
        private Resource3 resource3;
     
        public ConsumerThread3(Resource3 resource) {
            this.resource3 = resource;
            //setName("消费者");
        }
     
        public void run() {
            while (true) {
                try {
                    Thread.sleep((long) (1000 * Math.random()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                resource3.remove();
            }
        }
    }
    /**
     * 生产者线程
     * @author tangzhijing
     *
     */
    class ProducerThread3 extends Thread{
        private Resource3 resource3;
        public ProducerThread3(Resource3 resource) {
            this.resource3 = resource;
            //setName("生产者");
        }
     
        public void run() {
            while (true) {
                try {
                    Thread.sleep((long) (1000 * Math.random()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                resource3.add();
            }
        }
    }
    class Resource3{
        private BlockingQueue<Integer> resourceQueue = new LinkedBlockingQueue<>(10);
        /**
         * 向资源池中添加资源
         */
        public void add(){
            try {
                resourceQueue.put(1); //1当做生产和消费的Integer资源
                System.out.println("生产者" + Thread.currentThread().getName()
                        + "生产一件资源," + "当前资源池有" + resourceQueue.size() + 
                        "个资源");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        /**
         * 向资源池中移除资源
         */
        public void remove(){
            try {
                resourceQueue.take();
                System.out.println("消费者" + Thread.currentThread().getName() + 
                        "消耗一件资源," + "当前资源池有" + resourceQueue.size() 
                        + "个资源");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    View Code

     为什么用put和take:

    为什么用put和take:https://blog.csdn.net/qiuchaoxi/article/details/80359462

  • 相关阅读:
    防删没什么意思啊,直接写废你~
    绝大多数情况下,没有解决不了的问题,只有因为平时缺少练习而惧怕问题的复杂度,畏惧的心理让我们选择避让,采取并不那么好的方案去解决问题
    Java 模拟面试题
    Crossthread operation not valid: Control 'progressBar1' accessed from a thread other than the thread it was created on
    一步步从数据库备份恢复SharePoint Portal Server 2003
    【转】理解 JavaScript 闭包
    Just For Fun
    The database schema is too old to perform this operation in this SharePoint cluster. Please upgrade the database and...
    Hello World!
    使用filter筛选刚体碰撞
  • 原文地址:https://www.cnblogs.com/twoheads/p/10137263.html
Copyright © 2011-2022 走看看