zoukankan      html  css  js  c++  java
  • 【1】生产者-消费者模型的三种实现方式

    (手写生产者消费者模型,写BlockingQueue较简便 )

    1、背景                                                                    

    生产者生产数据到缓冲区中,消费者从缓冲区中取数据。

    如果缓冲区已经满了,则生产者线程阻塞;

    如果缓冲区为空,那么消费者线程阻塞。

    2、方式一:synchronized、wait和notify

    定义Resouce资源类,类中定义资源池大小。资源类的add()和remove()方法是synchronized 的。生产者/消费者线程持有一个资源类Resouce的成员变量,Main方法中通过构造函数将Resouce类传入,线程run方法中操作Resouce类的add,remove方法

    package producerConsumer;
    //wait 和 notify
    public class ProducerConsumerWithWaitNofity {
        public static void main(String[] args) {
            Resource resource = new Resource();
            //生产者线程
            ProducerThread p1 = new ProducerThread(resource);
            ProducerThread p2 = new ProducerThread(resource);
            ProducerThread p3 = new ProducerThread(resource);
            //消费者线程
            ConsumerThread c1 = new ConsumerThread(resource);
            //ConsumerThread c2 = new ConsumerThread(resource);
            //ConsumerThread c3 = new ConsumerThread(resource);
        
            p1.start();
            p2.start();
            p3.start();
            c1.start();
            //c2.start();
            //c3.start();
        }
        
        
        
    }
    /**
     * 公共资源类
     * @author 
     *
     */
    class Resource{//重要
        //当前资源数量
        private int num = 0;
        //资源池中允许存放的资源数目
        private int size = 10;
    
        /**
         * 从资源池中取走资源
         */
        public synchronized void remove(){
            if(num > 0){
                num--;
                System.out.println("消费者" + Thread.currentThread().getName() +
                        "消耗一件资源," + "当前线程池有" + num + "个");
                notifyAll();//通知生产者生产资源
            }else{
                try {
                    //如果没有资源,则消费者进入等待状态
                    wait();
                    System.out.println("消费者" + Thread.currentThread().getName() + "线程进入等待状态");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        /**
         * 向资源池中添加资源
         */
        public synchronized void add(){
            if(num < size){
                num++;
                System.out.println(Thread.currentThread().getName() + "生产一件资源,当前资源池有" 
                + num + "个");
                //通知等待的消费者
                notifyAll();
            }else{
                //如果当前资源池中有10件资源
                try{
                    wait();//生产者进入等待状态,并释放锁
                    System.out.println(Thread.currentThread().getName()+"线程进入等待");
                }catch(InterruptedException e){
                    e.printStackTrace();
                }
            }
        }
    }
    /**
     * 消费者线程
     */
    class ConsumerThread extends Thread{
        private Resource resource;
        public ConsumerThread(Resource resource){
            this.resource = resource;
        }
        @Override
        public void run() {
            while(true){
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                resource.remove();
            }
        }
    }
    /**
     * 生产者线程
     */
    class ProducerThread extends Thread{
        private Resource resource;
        public ProducerThread(Resource resource){
            this.resource = resource;
        }
        @Override
        public void run() {
            //不断地生产资源
            while(true){
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                resource.add();
            }
        }
        
    }
    View Code

    3、方式二:lock和condition的await、signalAll     

    package producerConsumer;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    /**
     * 使用Lock 和 Condition解决生产者消费者问题
     * @author tangzhijing
     *
     */
    public class LockCondition {
            public static void main(String[] args) {
                Lock lock = new ReentrantLock();
                Condition producerCondition = lock.newCondition();
                Condition consumerCondition = lock.newCondition();
                Resource2 resource = new Resource2(lock,producerCondition,consumerCondition);
                
                //生产者线程
                ProducerThread2 producer1 = new ProducerThread2(resource);
                
                //消费者线程
                ConsumerThread2 consumer1 = new ConsumerThread2(resource);
                ConsumerThread2 consumer2 = new ConsumerThread2(resource);
                ConsumerThread2 consumer3 = new ConsumerThread2(resource);
                
                producer1.start();
                consumer1.start();
                consumer2.start();
                consumer3.start();
            }
    }
    /**
     * 消费者线程
     */
    class ConsumerThread2 extends Thread{
        private Resource2 resource;
        public ConsumerThread2(Resource2 resource){
            this.resource = resource;
            //setName("消费者");
        }
        public void run(){
            while(true){
                try {
                    Thread.sleep((long) (1000 * Math.random()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                resource.remove();
            }
        }
    }
    /**
     * 生产者线程
     * @author tangzhijing
     *
     */
    class ProducerThread2 extends Thread{
        private Resource2 resource;
        public ProducerThread2(Resource2 resource){
            this.resource = resource;
            setName("生产者");
        }
        public void run(){
            while(true){
                    try {
                        Thread.sleep((long) (1000 * Math.random()));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    resource.add();
            }
        }
    }
    /**
     * 公共资源类
     * @author tangzhijing
     *
     */
    class Resource2{
        private int num = 0;//当前资源数量
        private int size = 10;//资源池中允许存放的资源数目
        private Lock lock;
        private Condition producerCondition;
        private Condition consumerCondition;
        public Resource2(Lock lock, Condition producerCondition, Condition consumerCondition) {
            this.lock = lock;
            this.producerCondition = producerCondition;
            this.consumerCondition = consumerCondition;
     
        }
        /**
         * 向资源池中添加资源
         */
        public void add(){
            lock.lock();
            try{
                if(num < size){
                    num++;
                    System.out.println(Thread.currentThread().getName() + 
                            "生产一件资源,当前资源池有" + num + "个");
                    //唤醒等待的消费者
                    consumerCondition.signalAll();
                }else{
                    //让生产者线程等待
                    try {
                        producerCondition.await();
                        System.out.println(Thread.currentThread().getName() + "线程进入等待");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }finally{
                lock.unlock();
            }
        }
        /**
         * 从资源池中取走资源
         */
        public void remove(){
            lock.lock();
            try{
                if(num > 0){
                    num--;
                    System.out.println("消费者" + Thread.currentThread().getName() 
                            + "消耗一件资源," + "当前资源池有" + num + "个");
                    producerCondition.signalAll();//唤醒等待的生产者
                }else{
                    try {
                        consumerCondition.await();
                        System.out.println(Thread.currentThread().getName() + "线程进入等待");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }//让消费者等待
                }
            }finally{
                lock.unlock();
            }
        }
        
    }
    View Code

    4、方式三:BlockingQueue       

    定义Resouce资源类,资源类持有一个BlockingQueue。生产者/消费者线程持有一个资源类Resouce的成员变量,Main方法中通过构造函数将Resouce类传入,线程run方法中操作Resouce类的add,remove方法,add,remove调用Queue的put()take()

    package producerConsumer;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    
    //使用阻塞队列BlockingQueue解决生产者消费者
    public class BlockingQueueConsumerProducer {
        public static void main(String[] args) {
            Resource3 resource = new Resource3();
            //生产者线程
            ProducerThread3 p = new ProducerThread3(resource);
            //多个消费者
            ConsumerThread3 c1 = new ConsumerThread3(resource);
            ConsumerThread3 c2 = new ConsumerThread3(resource);
            ConsumerThread3 c3 = new ConsumerThread3(resource);
     
            p.start();
            c1.start();
            c2.start();
            c3.start();
        }
    }
    /**
     * 消费者线程
     * @author tangzhijing
     *
     */
    class ConsumerThread3 extends Thread {
        private Resource3 resource3;
     
        public ConsumerThread3(Resource3 resource) {
            this.resource3 = resource;
            //setName("消费者");
        }
     
        public void run() {
            while (true) {
                try {
                    Thread.sleep((long) (1000 * Math.random()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                resource3.remove();
            }
        }
    }
    /**
     * 生产者线程
     * @author tangzhijing
     *
     */
    class ProducerThread3 extends Thread{
        private Resource3 resource3;
        public ProducerThread3(Resource3 resource) {
            this.resource3 = resource;
            //setName("生产者");
        }
     
        public void run() {
            while (true) {
                try {
                    Thread.sleep((long) (1000 * Math.random()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                resource3.add();
            }
        }
    }
    class Resource3{
        private BlockingQueue<Integer> resourceQueue = new LinkedBlockingQueue<>(10);
        /**
         * 向资源池中添加资源
         */
        public void add(){
            try {
                resourceQueue.put(1); //1当做生产和消费的Integer资源
                System.out.println("生产者" + Thread.currentThread().getName()
                        + "生产一件资源," + "当前资源池有" + resourceQueue.size() + 
                        "个资源");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        /**
         * 向资源池中移除资源
         */
        public void remove(){
            try {
                resourceQueue.take();
                System.out.println("消费者" + Thread.currentThread().getName() + 
                        "消耗一件资源," + "当前资源池有" + resourceQueue.size() 
                        + "个资源");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    View Code

     为什么用put和take:

    为什么用put和take:https://blog.csdn.net/qiuchaoxi/article/details/80359462

  • 相关阅读:
    JSP简单访问数据库
    解析数据存储MySQL
    学习SSH框架
    JavaWEB中读取配置信息
    Eclipse中将Java项目转换成Web项目的方法
    JavaWEB入门
    万能数据库连接类-Oracle、DB2 、Access 、Sql Server
    小米3 打开开发者选项
    coolpad 5879logcat不能输入日志解决办法
    实用开发之-oracle表回滚到一个指定时间的操作语句
  • 原文地址:https://www.cnblogs.com/twoheads/p/10137263.html
Copyright © 2011-2022 走看看