zoukankan      html  css  js  c++  java
  • 关于生产者/消费者/订阅者模式的那些事

    生产者/消费者模式


    简单介绍

    用来干嘛的?

    生产者/消费者模式的产生主要目的就是为了解决非同步的生产与消费之间的问题。

    什么是非同步呢?
    比方我刚刚生产了某个产品,而此时你正在打游戏,没空来取,要打完游戏来取,这就导致了我生产产品和你取产品是两个非同步的动作,你不知道我什么时候生产完产品,而我也不知道你什么时候来取。

    而生产者/消费者模式就是解决这个非同步问题的,由于肯定不可能我生产完一个就给你打个电话叫你来取,然后等你取完我再生产下一个,这是多么低效的一种做法。

    所以这个模式运用而生,这个模式在生活中也有非常好的体现,如:快递员派信这个样例。我就是生产者,快递员就是消费者。而生产者与消费者之间是通过什么来解决这样的非同步的问题呢?就是一个存储中介,作为快递员派信这个样例中,信箱就是这个存储中介。每次我仅仅要把写完的信扔入信箱,而快递员隔三差五的就会来取一次信,这两个动作是全然异步的,我把信扔入信箱后就不须要管什么了,之后肯定有快递员来取。

    怎样设计

    设计图

    生产者/消费者模式能够解决非同步的生产与消费的问题。归功就是存储中介的作用,由于生产者仅仅要把生产完的物品放入存储中介中即可了,而不必关系消费者什么时候来取,当消费者须要时自然会来取,当存储中介满了的话。那么生产者将停止生产,由于再生产就没地放了,这时候就须要等待消费者消费了。而当存储中介没有时。这时候消费者来取那肯定取不到,所以也须要Wait,等待生产者生产后才干取到。

    所以这就有了以下这个设计图:
    这里写图片描写叙述

    定位

    从上图中能够知道,一个完整的生产者/消费者模式具有生产者、消费者、存储中介以及产品。

    这四个缺一不可,而关于它们的定位也至关重要
    1 . Product
    对于产品,什么样的对象才干成为产品呢。一是依据当时的业务逻辑来推断,比方运行完某些操作后的产生的Result。二是必须保持每一个产品之间的完整性和独立性。保证各个产品之间互不影响、互不关联。


    2 . Store
    对于存储中介。它肯定是一块具有额定大小的存储空间,而这个存储空间一般来说具有FIFO的数据结构,比方JDK内置了具有堵塞作用的有界队列:ArrayBlockingQueue、LinkedBlockingQueue。

    并且存储中介须要起到生产者与消费者解耦的作用,这样的优点是当后期生产者或者消费者的生产方式或处理方式变了。这样仅仅须要改变一方,而另外一方则不须要调整。并且它负责协调生产者与消费者之间的生产消费关系。
    3 . Producer
    对于生产者。它是具有配置Product各种属性的一个对象,能够设计成Factory、Builder、装饰者模式等等,一般来说生产者有单独的一个线程用来生产产品,当然假设量大的话能够用多个线程去生产。只是须要处理一下线程同步的问题(Semaphore|synchronized|ThreadLocal)
    4 . Consumer
    对于消费者,和Producer差点儿相同,主要就是用来处理Product的,一般也有单独的一个线程去处理Product。

    实例

    1 . 生产者
    AppleProducer

    public class AppleProducer {
        private Store<Apple> mStore;
        private static ExecutorService mWorkThread = Executors.newFixedThreadPool(5);
        private String name;
    
        public AppleProducer setName(String name) {
            this.name = name;
            return this;
        }
    
        public AppleProducer bindStore(Store<Apple> store) {
            this.mStore = store;
            return this;
        }
    
        public void production() {
            mWorkThread.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        Thread.currentThread().interrupt();
                    }
                    Apple apple = new Apple("第" + name + "个产品");
                    mStore.push(apple);
                }
            });
        }
    }

    2 . 消费者
    AppleConsumer

    public class AppleConsumer {
        private Store<Apple> mStore;
        private ExecutorService mWorkThread = Executors.newFixedThreadPool(1);
    
        public AppleConsumer bindStore(Store<Apple> store) {
            this.mStore = store;
            return this;
        }
    
        public void consume() {
            mWorkThread.submit(new Runnable() {
                @Override
                public void run() {
                    for (; ; ) {
                        try {
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            Thread.currentThread().interrupt();
                        }
                        Apple apple = mStore.take();
                        System.out.println("apple:" + apple.getName() + "消费了");
                    }
                }
            });
    
        }
    }

    3 . 存储中介
    Store

    public class Store<T> {
        private BlockingQueue<T> mQueue = new ArrayBlockingQueue<>(10, true);
    
        public void push(T t) {
            try {
                mQueue.put(t);
            } catch (InterruptedException e) {
                e.printStackTrace();
                Thread.currentThread().interrupt();
            }
            System.out.println("product生产了...");
        }
    
        public T take() {
            T t = null;
            try {
                t = mQueue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
                Thread.currentThread().interrupt();
            }
            System.out.println("product取出了...");
            return t;
        }
    
        public void release() {
            if (mQueue.isEmpty())
                return;
            mQueue.clear();
        }
    }

    4 . 产品

    public class Apple {
        private String name;
    
        public Apple(String name) {
            this.name = name;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    }

    5 . Test

    public class Test {
        public static void main(String[] args) {
            Store<Apple> store = new Store<>();
    
            new AppleConsumer()
                    .bindStore(store)
                    .consume();
    
            for (int i = 0; i < 30; i++) {
                new AppleProducer()
                        .bindStore(store)
                        .setName(i + "")
                        .production();
            }
        }
    }

    result:

    product生产了…
    product生产了…
    product生产了…
    product生产了…
    product生产了…
    product取出了…
    apple:第1个产品消费了
    product生产了…
    product生产了…
    product生产了…
    product生产了…
    product生产了…
    product生产了…
    product取出了…
    apple:第0个产品消费了
    product生产了…
    product取出了…
    product生产了…
    apple:第3个产品消费了
    product取出了…
    apple:第2个产品消费了
    product生产了…
    product取出了…
    apple:第4个产品消费了
    product生产了…
    product取出了…
    apple:第5个产品消费了
    product生产了…
    product取出了…
    apple:第6个产品消费了
    product生产了…


    对于生产者,为了避免生产线程数量过多採取了一个线程池控制生产线程的数量。而生产者每一件产品都是由单独的一个线程来生产。对于消费者,用一个线程去轮询取队列里的产品,有则取出,没有则堵塞等待,由于ArrayBlockingQueue本身是支持并发的,所以在多线程共同操作一个存储队列的情况下。并不会有并发的问题。
    所以生产者/消费者模式也支持多生产——多消费的模式:
    这里写图片描写叙述
    对于有众多种类不同的生产者,能够用一个工厂类来管理。

    数据源/订阅者模式

    关于这个模式,事实上是生产者/消费者模式的变体。这样的模式并不须要存储中介,而是通过一个DataSource空壳来包装数据,对于公布者提交了一个Task后。将马上返回一个DataSource,对于任务运行完后的结果,假设你想获取则必须通过datasource.subscribe(new XxxSubscriber(…))来订阅获取运行后的结果,而假设不通过数据源订阅的方式来获取而直接通过datasource.getData()获取则返回null,由于DataSource仅仅是一个获取数据的空壳。

    这样的模式在Fresco源代码中有非常好的体现。用了大量的这样的模式。
    这里写图片描写叙述

    栗子:

            DataSource<Apple> dataSource = ProducerFactory.newAppleProducer().submit(name);
            //Apple apple = dataSource.getData();// apple is null!
            dataSource.subscribe(new BaseDataSubscriber<Apple>() {
                @Override
                protected void onSuccess(DataSource<Apple> dataSource) {
                            Apple apple = dataSource.getData();
                            //...
                }
    
                @Override
                protected void onFailure(DataSource<Apple> dataSource,Throwable throwable) {
                            //...
                }
            });
  • 相关阅读:
    (转)C++ typename的起源与用法
    EOS智能合约深度解析
    cmake常用变量和命令解析
    (转)Linux下source命令详解
    eosio_install.sh执行过程
    java之collection总结
    Guava之RangeMap
    java file.listFiles()按文件名称、日期、大小排序
    Java下载文件的几种方式
    Java泛型Class<T>、T与Class<?>
  • 原文地址:https://www.cnblogs.com/yfceshi/p/7289948.html
Copyright © 2011-2022 走看看