zoukankan      html  css  js  c++  java
  • 生产者消费者模式

    定义

             生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

    实现

    1.利用wait和notify实现

    package productorConsumer;
    
    import java.util.LinkedList;
    import java.util.List;
    import java.util.Random;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledThreadPoolExecutor;
    import java.util.concurrent.ThreadPoolExecutor;
    
    /**
     * @author littlecar
     * @date 2019/7/26 10:24
     */
    public class ProductorConsumer {
        public static void main(String[] arg) {
            LinkedList<Integer> list = new LinkedList<>();
            ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(15);
            for (int i = 0; i < 5; i++) {
                scheduledThreadPoolExecutor.submit(new Productor(list, 5));
            }
            for (int i = 0; i < 10; i++) {
                scheduledThreadPoolExecutor.submit(new Consumer(list));
            }
        }
        static class Productor implements Runnable{
            private List<Integer> list;
            private int maxLength;
            public Productor(List<Integer> list,int maxLength) {
                this.list=list;
                this.maxLength=maxLength;
            }
            @Override
            public void run() {
                while (true) {
                    synchronized (list) {
                        try {
                            while (list.size() == maxLength) {
                                System.out.println("生产者"+Thread.currentThread().getName()+"list已满,进行wait");
                                list.wait();
                                System.out.println("生产者"+Thread.currentThread().getName()+"退出wait");
                            }
                            Random r = new Random();
                            int i = r.nextInt();
                            System.out.println("生产者"+Thread.currentThread().getName()+"生产数据"+i);
                            list.add(i);
                            list.notifyAll();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
        static class Consumer implements Runnable{
            private List<Integer> list;
            public Consumer(List<Integer> list) {
                this.list=list;
            }
            @Override
            public void run() {
                while (true) {
                    synchronized (list) {
                        try {
                            while (list.isEmpty()) {
                                System.out.println("消费者"+Thread.currentThread().getName()+"list为空,进行wait");
                                list.wait();
                                System.out.println("消费者"+Thread.currentThread().getName()+"退出wait");
                            }
                            Integer i = list.remove(0);
                            System.out.println("消费者"+Thread.currentThread().getName()+"消费数据"+i);
                            list.notifyAll();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }

    2.利用Lock中Condition的await/signalAll实现生产者消费者

    package productorConsumer;
    
    import java.util.LinkedList;
    import java.util.List;
    import java.util.Random;
    import java.util.concurrent.ScheduledThreadPoolExecutor;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * @author littlecar
     * @date 2019/7/26 14:00
     */
    public class ProductorConsumer1 {
        private static ReentrantLock lock = new ReentrantLock();
        private static Condition full = lock.newCondition();
        private static Condition empty = lock.newCondition();
        public static void main(String[] args) {
            LinkedList<Integer> list = new LinkedList<>();
            ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(15);
            for (int i = 0; i < 5; i++) {
                scheduledThreadPoolExecutor.submit(new Productor(list, lock,5));
            }
            for (int i = 0; i < 10; i++) {
                scheduledThreadPoolExecutor.submit(new Consumer(list,lock));
            }
        }
        static class Productor implements Runnable{
            private List<Integer> list;
            private Lock lock;
            private int maxLength;
            public Productor(List<Integer> list,Lock lock,int maxLength) {
                this.list=list;
                this.lock=lock;
                this.maxLength=maxLength;
            }
            @Override
            public void run() {
                while (true) {
                    lock.lock();
                    try {
                        while (list.size() == maxLength) {
                            System.out.println("生产者"+Thread.currentThread().getName()+"list已满,进行wait");
                            full.await();
                            System.out.println(""+Thread.currentThread().getName()+"退出wait");
                        }
                        Random r = new Random();
                        int i = r.nextInt();
                        System.out.println("生产者"+Thread.currentThread().getName()+"生产数据"+i);
                        list.add(i);
                        empty.signalAll();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally{
                        lock.unlock();
                    }
                }
            }
        }
        static class Consumer implements Runnable{
            private List<Integer> list;
            private Lock lock;
            public Consumer(List<Integer> list,Lock lock) {
                this.list=list;
                this.lock=lock;
            }
            @Override
            public void run() {
                while (true) {
                    lock.lock();
                    try {
                        while (list.isEmpty()) {
                            System.out.println("消费者"+Thread.currentThread().getName()+"list为空,进行wait");
                            empty.await();
                            System.out.println("消费者"+Thread.currentThread().getName()+"退出wait");
                        }
                        Integer i = list.remove(0);
                        System.out.println("消费者"+Thread.currentThread().getName()+"消费数据"+i);
                        full.signalAll();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally{
                        lock.unlock();
                    }
                }
            }
        }
    }

    3. 使用BlockingQueue实现生产者-消费者

    package productorConsumer;
    
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ScheduledThreadPoolExecutor;
    
    /**
     * @author littlecar
     * @date 2019/7/26 14:15
     */
    public class ProductorConsumer2 {
        private static LinkedBlockingQueue<Integer> linkedBlockingQueue=new LinkedBlockingQueue<>();
        public static void main(String[] args) {
            ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(15);
            for (int i = 0; i < 5; i++) {
                scheduledThreadPoolExecutor.submit(new Productor(linkedBlockingQueue));
            }
            for (int i = 0; i < 10; i++) {
                scheduledThreadPoolExecutor.submit(new Consumer(linkedBlockingQueue));
            }
        }
    
        static class Productor implements Runnable {
            private BlockingQueue queue;
            public Productor(BlockingQueue queue){
                this.queue=queue;
            }
            @Override
            public void run() {
                try {
                    while (true) {
                        Random r = new Random();
                        int i = r.nextInt();
                        System.out.println("生产者"+Thread.currentThread().getName()+"生产数据"+i);
                        queue.put(i);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        static class Consumer implements Runnable {
            private BlockingQueue queue;
            public Consumer(BlockingQueue queue){
                this.queue=queue;
            }
            @Override
            public void run() {
                try {
                    while (true) {
                        Integer i = (Integer) queue.take();
                        System.out.println("消费者"+Thread.currentThread().getName()+"消费数据"+i);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
  • 相关阅读:
    HDOJ1213 并查集
    poj 3070 Fibonacci
    csu 1102 Palindrome
    C#格式化数值结果表
    正则表达式基础知识
    C#验证Email是否真正存在
    【翻译】Scott Mitchell的ASP.NET2.0数据教程中文版索引
    分块下载,测试文件 3.8GB
    asp.net的TextBox回车触发事件
    Cookie加密
  • 原文地址:https://www.cnblogs.com/xc-chejj/p/11250127.html
Copyright © 2011-2022 走看看