zoukankan      html  css  js  c++  java
  • 生产者消费者模式

    定义

             生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

    实现

    1.利用wait和notify实现

    package productorConsumer;
    
    import java.util.LinkedList;
    import java.util.List;
    import java.util.Random;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledThreadPoolExecutor;
    import java.util.concurrent.ThreadPoolExecutor;
    
    /**
     * @author littlecar
     * @date 2019/7/26 10:24
     */
    public class ProductorConsumer {
        public static void main(String[] arg) {
            LinkedList<Integer> list = new LinkedList<>();
            ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(15);
            for (int i = 0; i < 5; i++) {
                scheduledThreadPoolExecutor.submit(new Productor(list, 5));
            }
            for (int i = 0; i < 10; i++) {
                scheduledThreadPoolExecutor.submit(new Consumer(list));
            }
        }
        static class Productor implements Runnable{
            private List<Integer> list;
            private int maxLength;
            public Productor(List<Integer> list,int maxLength) {
                this.list=list;
                this.maxLength=maxLength;
            }
            @Override
            public void run() {
                while (true) {
                    synchronized (list) {
                        try {
                            while (list.size() == maxLength) {
                                System.out.println("生产者"+Thread.currentThread().getName()+"list已满,进行wait");
                                list.wait();
                                System.out.println("生产者"+Thread.currentThread().getName()+"退出wait");
                            }
                            Random r = new Random();
                            int i = r.nextInt();
                            System.out.println("生产者"+Thread.currentThread().getName()+"生产数据"+i);
                            list.add(i);
                            list.notifyAll();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
        static class Consumer implements Runnable{
            private List<Integer> list;
            public Consumer(List<Integer> list) {
                this.list=list;
            }
            @Override
            public void run() {
                while (true) {
                    synchronized (list) {
                        try {
                            while (list.isEmpty()) {
                                System.out.println("消费者"+Thread.currentThread().getName()+"list为空,进行wait");
                                list.wait();
                                System.out.println("消费者"+Thread.currentThread().getName()+"退出wait");
                            }
                            Integer i = list.remove(0);
                            System.out.println("消费者"+Thread.currentThread().getName()+"消费数据"+i);
                            list.notifyAll();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }

    2.利用Lock中Condition的await/signalAll实现生产者消费者

    package productorConsumer;
    
    import java.util.LinkedList;
    import java.util.List;
    import java.util.Random;
    import java.util.concurrent.ScheduledThreadPoolExecutor;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * @author littlecar
     * @date 2019/7/26 14:00
     */
    public class ProductorConsumer1 {
        private static ReentrantLock lock = new ReentrantLock();
        private static Condition full = lock.newCondition();
        private static Condition empty = lock.newCondition();
        public static void main(String[] args) {
            LinkedList<Integer> list = new LinkedList<>();
            ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(15);
            for (int i = 0; i < 5; i++) {
                scheduledThreadPoolExecutor.submit(new Productor(list, lock,5));
            }
            for (int i = 0; i < 10; i++) {
                scheduledThreadPoolExecutor.submit(new Consumer(list,lock));
            }
        }
        static class Productor implements Runnable{
            private List<Integer> list;
            private Lock lock;
            private int maxLength;
            public Productor(List<Integer> list,Lock lock,int maxLength) {
                this.list=list;
                this.lock=lock;
                this.maxLength=maxLength;
            }
            @Override
            public void run() {
                while (true) {
                    lock.lock();
                    try {
                        while (list.size() == maxLength) {
                            System.out.println("生产者"+Thread.currentThread().getName()+"list已满,进行wait");
                            full.await();
                            System.out.println(""+Thread.currentThread().getName()+"退出wait");
                        }
                        Random r = new Random();
                        int i = r.nextInt();
                        System.out.println("生产者"+Thread.currentThread().getName()+"生产数据"+i);
                        list.add(i);
                        empty.signalAll();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally{
                        lock.unlock();
                    }
                }
            }
        }
        static class Consumer implements Runnable{
            private List<Integer> list;
            private Lock lock;
            public Consumer(List<Integer> list,Lock lock) {
                this.list=list;
                this.lock=lock;
            }
            @Override
            public void run() {
                while (true) {
                    lock.lock();
                    try {
                        while (list.isEmpty()) {
                            System.out.println("消费者"+Thread.currentThread().getName()+"list为空,进行wait");
                            empty.await();
                            System.out.println("消费者"+Thread.currentThread().getName()+"退出wait");
                        }
                        Integer i = list.remove(0);
                        System.out.println("消费者"+Thread.currentThread().getName()+"消费数据"+i);
                        full.signalAll();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally{
                        lock.unlock();
                    }
                }
            }
        }
    }

    3. 使用BlockingQueue实现生产者-消费者

    package productorConsumer;
    
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ScheduledThreadPoolExecutor;
    
    /**
     * @author littlecar
     * @date 2019/7/26 14:15
     */
    public class ProductorConsumer2 {
        private static LinkedBlockingQueue<Integer> linkedBlockingQueue=new LinkedBlockingQueue<>();
        public static void main(String[] args) {
            ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(15);
            for (int i = 0; i < 5; i++) {
                scheduledThreadPoolExecutor.submit(new Productor(linkedBlockingQueue));
            }
            for (int i = 0; i < 10; i++) {
                scheduledThreadPoolExecutor.submit(new Consumer(linkedBlockingQueue));
            }
        }
    
        static class Productor implements Runnable {
            private BlockingQueue queue;
            public Productor(BlockingQueue queue){
                this.queue=queue;
            }
            @Override
            public void run() {
                try {
                    while (true) {
                        Random r = new Random();
                        int i = r.nextInt();
                        System.out.println("生产者"+Thread.currentThread().getName()+"生产数据"+i);
                        queue.put(i);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        static class Consumer implements Runnable {
            private BlockingQueue queue;
            public Consumer(BlockingQueue queue){
                this.queue=queue;
            }
            @Override
            public void run() {
                try {
                    while (true) {
                        Integer i = (Integer) queue.take();
                        System.out.println("消费者"+Thread.currentThread().getName()+"消费数据"+i);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
  • 相关阅读:
    poj 2763 Housewife Wind
    hdu 3966 Aragorn's Story
    poj 1655 Balancing Act 求树的重心
    有上下界的网络流问题
    URAL 1277 Cops and Thieves 最小割 无向图点带权点连通度
    ZOJ 2532 Internship 网络流求关键边
    ZOJ 2760 How Many Shortest Path 最大流+floyd求最短路
    SGU 438 The Glorious Karlutka River =) 拆点+动态流+最大流
    怎么样仿写已知网址的网页?
    5-10 公路村村通 (30分)
  • 原文地址:https://www.cnblogs.com/xc-chejj/p/11250127.html
Copyright © 2011-2022 走看看