zoukankan      html  css  js  c++  java
  • 生产者和消费者并发模式的几种简单写法

    1、使用synchronized

    package cn.luxh.app.test;
    
    import java.util.LinkedList;
    import java.util.List;
    import java.util.Random;
    import java.util.concurrent.Executor;
    import java.util.concurrent.Executors;
    
    /**
     *    synchronized实现的生产者和消费者
     */
    public class ProducerCustomerWithSynchronized {
        
        Executor pool = Executors.newFixedThreadPool(10);
        
        //仓库
        private List<String> storageList = new LinkedList<String>();
        
        //仓库容量
        private int MAX_SIZE = 3;
        
        //仓库为空
        private int ZERO = 0;
        
        //生产者线程
        private class Producer implements Runnable{
            
            //生产方法,需同步
            private void produce(){
                synchronized (storageList) {
                    System.out.println(Thread.currentThread().getName()+"进入仓库,准备生产!");
                    try {
                        if(storageList.size()==MAX_SIZE) {
                            System.out.println("仓库已满!等待消费者消费");
                            Thread.sleep(1000);
                            storageList.wait();//当前线程放弃锁,处于等待状态,让其他线程执行
                        }
                        if(storageList.size()<MAX_SIZE) {
                            String name = "产品"+new Random().nextInt();
                            storageList.add(name);
                            System.out.println(Thread.currentThread().getName()+"往仓库中生产了一个产品!");
                        }
                        Thread.sleep(1000);
                        storageList.notifyAll();//当前线程放弃锁,唤醒其他线程
                            
                    }catch(InterruptedException ie) {
                        System.out.println("中断异常");
                        ie.printStackTrace();
                    }
                    
                }
            }
    
            @Override
            public void run() {
                while(true) {
                    produce();
                }
            }
        }
        
        //消费者线程
        private class Customer implements Runnable{
            
            //消费方法,需同步
            private void consume() {
                synchronized (storageList) {
                    System.out.println(Thread.currentThread().getName()+"进入仓库,准备消费!");
                    try {
                        if(storageList.size()==ZERO) {
                            System.out.println("仓库已空!等待生产者生产");
                            Thread.sleep(1000);
                            storageList.wait();//当前线程放弃锁,处于等待状态,让其他线程执行
                        }
                        if(storageList.size()!=ZERO) {
                            System.out.println(Thread.currentThread().getName()+"从仓库取得产品:"+storageList.remove(0));
                        }
                        Thread.sleep(1000);
                        storageList.notifyAll();//当前线程放弃锁,唤醒其他线程
                    }catch(InterruptedException ie) {
                        System.out.println("中断异常");
                        ie.printStackTrace();
                    }
                    
                }
            }
    
            @Override
            public void run() {
                while(true) {
                    consume();
                }
            }
            
        }
        
        //启动生产者和消费者线程
        public void start() {
            for(int i=1;i<5;i++) {
                //new Thread(new Producer()).start();
                //new Thread(new Customer()).start();
                pool.execute(new Producer());
                pool.execute(new Customer());
            }
            
        }
        
        public static void main(String[] args) {
            ProducerCustomerWithSynchronized pc = new ProducerCustomerWithSynchronized();
            pc.start();
        }
    }
    View Code

    2、使用Lock

    package cn.luxh.app.test;
    
    import java.util.LinkedList;
    import java.util.List;
    import java.util.Random;
    import java.util.concurrent.Executor;
    import java.util.concurrent.Executors;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    
    /**
     * Lock实现的生产者和消费者
     *
     */
    public class ProducerCustomerWithLock {
        
        Executor pool = Executors.newFixedThreadPool(10);
        
        //仓库
        private List<String> storageList = new LinkedList<String>();
        
        //仓库容量
        private int MAX_SIZE = 3;
        
        //仓库为空
        private int ZERO = 0;
        
        //获取锁对象
        private Lock lock = new ReentrantLock();
        
        //仓库满了,绑定生产者线程
        private Condition full = lock.newCondition();
        
        //仓库为空,绑定消费者线程
        private Condition empty = lock.newCondition();
        
        //生产者线程
        private class Producer implements Runnable{
            
            //生产方法,需同步
            private void produce(){
                if(lock.tryLock()) {
                    System.out.println(Thread.currentThread().getName()+"进入仓库,准备生产!");
                    try {
                        if(storageList.size()==MAX_SIZE) {
                            System.out.println("仓库已满!等待消费者消费");
                            Thread.sleep(1000);
                            full.await();//生产者线程加入线程等待池
                        }
                        if(storageList.size()<MAX_SIZE){
                            String name = "产品"+new Random().nextInt();
                            storageList.add(name);
                            System.out.println(Thread.currentThread().getName()+"往仓库中生产了一个产品!");
                        }
                        Thread.sleep(1000);
                        empty.signalAll();//唤醒消费者线程
                            
                    }catch(InterruptedException ie) {
                        System.out.println("中断异常");
                        ie.printStackTrace();
                    }finally{
                        lock.unlock();
                    }
                }
            }
    
            @Override
            public void run() {
                while(true) {
                    produce();
                }
            }
        }
        
        //消费者线程
        private class Customer implements Runnable{
            
            //消费方法,需同步
            private void consume() {
                if(lock.tryLock()) {
                    System.out.println(Thread.currentThread().getName()+"进入仓库,准备消费!");
                    try {
                        if(storageList.size()==ZERO) {
                            System.out.println("仓库已空!等待生产者生产");
                            Thread.sleep(1000);
                            empty.await();//消费者线程加入线程等待池
                        }
                        if(storageList.size()!=ZERO) {
                            System.out.println(Thread.currentThread().getName()+"从仓库取得产品:"+storageList.remove(0));
                        }
                        
                        Thread.sleep(1000);
                        full.signalAll();//唤醒生产者线程
                    }catch(InterruptedException ie) {
                        System.out.println("中断异常");
                        ie.printStackTrace();
                    }finally{
                        lock.unlock();
                    }
                }
            }
    
            @Override
            public void run() {
                while(true) {
                    consume();
                }
            }
            
        }
        
        //启动生产者和消费者线程
        public void start() {
            for(int i=1;i<5;i++) {
                //new Thread(new Producer()).start();
                //new Thread(new Customer()).start();
                pool.execute(new Producer());
                pool.execute(new Customer());
            }
            
        }
        
        public static void main(String[] args) {
            ProducerCustomerWithLock pc = new ProducerCustomerWithLock();
            pc.start();
        }
    }
    View Code

    3、使用BlockingQueue

    package cn.luxh.app.test;
    
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.Executor;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    
    
    /**
     * BlockingQueue实现的生产者和消费者
     *
     */
    public class ProducerCustomerWithBlockingQueue {
        
        Executor pool = Executors.newFixedThreadPool(10);
        
        //仓库
        private BlockingQueue<String> storageQueue = new LinkedBlockingQueue<String>(5);
        
        //仓库容量
        private int MAX_SIZE = 3;
        
        //仓库为空
        private int ZERO = 0;
        
        
        
        //生产者线程
        private class Producer implements Runnable{
            
            //生产方法,需同步
            private void produce(){
                try {
                    System.out.println(Thread.currentThread().getName()+"进入仓库,准备生产!");
                    if(storageQueue.size()==MAX_SIZE) {
                        System.out.println("仓库已满!等待消费者消费");
                        Thread.sleep(1000);
                    }
                    if(storageQueue.size()<=MAX_SIZE) {
                        String product = "产品"+new Random().nextInt();
                        storageQueue.put(product);
                        System.out.println(Thread.currentThread().getName()+"往仓库中生产了一个产品!");
                    }
                    Thread.sleep(1000);
                }catch(InterruptedException ie) {
                    System.out.println("中断异常");
                    ie.printStackTrace();
                }
            }
    
            @Override
            public void run() {
                while(true) {
                    produce();
                }
            }
        }
        
        //消费者线程
        private class Customer implements Runnable{
            
            //消费方法,需同步
            private void consume() {
                try {
                    System.out.println(Thread.currentThread().getName()+"进入仓库,准备消费!");
                    if(storageQueue.size()==ZERO) {
                        System.out.println("仓库已空!等待生产者生产");
                        Thread.sleep(1000);
                    }
                    if(storageQueue.size()!=ZERO) {
                        System.out.println(Thread.currentThread().getName()+"从仓库取得产品:"+storageQueue.take());
                    }
                    Thread.sleep(1000);
                }catch(InterruptedException ie) {
                    System.out.println("中断异常");
                    ie.printStackTrace();
                }
            }
    
            @Override
            public void run() {
                while(true) {
                    consume();
                }
            }
            
        }
        
        //启动生产者和消费者线程
        public void start() {
            for(int i=1;i<5;i++) {
                //new Thread(new Producer()).start();
                ///new Thread(new Customer()).start();
                pool.execute(new Producer());
                pool.execute(new Customer());
            }
            
        }
        
        public static void main(String[] args) {
            ProducerCustomerWithBlockingQueue pc = new ProducerCustomerWithBlockingQueue();
            pc.start();
        }
    }
    View Code
  • 相关阅读:
    spring-cloud 注册中心配置
    SpringMVC执行流程和原理
    ssm 框架整合 配置
    Spring整合ActiveMQ 之 ActiveMQ配置
    redis配置代码
    world
    web服务器和web应用服务器的区别?
    经典面试题:Mybatis原理
    springmvc原理|执行过程|解决了什么问题?
    Hadoop 学习目录(搁置)
  • 原文地址:https://www.cnblogs.com/luxh/p/3300956.html
Copyright © 2011-2022 走看看