zoukankan      html  css  js  c++  java
  • [译]响应式编程笔记二:写点代码

    # 响应式编程笔记二:写点代码

    ## 新建一个项目

    我们使用Reactor库莱演示。
    https://start.spring.io 新建一个空项目,然后添加Reactor Core依赖。

    ```xml
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
    <version>3.0.0.RC2</version>
    </dependency>
    ```

    ## What Makes it Functional?

    Reactive的基本building block是`a sequence of events`,和两个主角:一个是events的publisher,一个是events的subscriber。
    也可以将`a sequence`叫做`stream`,因为就是。
    Reactor将publisher叫做Flux(实现了Reactive Streams的Publisher接口)。RxJava库非常类似,有很多平行的特性,所以我们会讲一下`Observable`
    另外,监狱Reactor 2.0 叫它Stream,会与Java 8中的Streams混淆,所以这里直接使用Reactor 3.0。

    ### Generators | 生成器

    Flux是什么?
    是一序列的特定POJO类型的事件的publisher,所以,是泛型的。
    Flux<T>,就是T类型事件的publisher。
    Flux有一些静态方法,可以方便地根据不同的sources创建其实例。
    ```java
    //如从一个数组创建:
    Flux<String> flux = Flux.just("red", "white", "blue");
    ```
    上面生成了一个Flux,现在可以开工了。
    实际上你只能做两件事:操作它(transform it, or combine it with other sequences)、订阅它(别忘了是一个publisher)。

    ### Single Valued Sequences | 单值序列

    你会经常遇到仅含有一个或零个元素的sequence,例如按照主键查找的repository方法。
    Reactor提供了Mono类型,代表了a single valued or empty Flux。
    Mono的API非常类似Flux,但更专注,因为不是所有操作符都适合单值序列。
    RxJava也有一个,叫Single,还有一个Completable用于empty序列。而Reactor中的empty序列是Mono<Void>。

    ### Operators | 操作符

    Flux有很多方法,几乎所有方法都是操作符。
    我们不会在这里过多的关注它们,因为你可以从Javadocs等地方获取想要的知识。
    我们仅需要知道操作符是什么,以及它能做什么。
    例如,如果想要让Flux中的事件被记录到标准输出,你可以调用`.log()`。或者,你可以将其transform - 使用`.map()`
    ```java
    Flux<String> flux = Flux.just("red", "white", "blue");
    Flux<String> upper = flux
    .log()
    .map(String::toUpperCase);
    ```
    到目前为止,唯一有趣的东西,就是,实际上还没有任何数据被处理!
    对一个Flux调用操作符,就像是在做一个后续执行的计划。
    完全是声明式的,这就是为什么人们叫它`函数式的`
    除非数据开始流动,否则操作符中的逻辑不会被执行 - 而除非有人订阅了(subscribe)这个Flux(Publisher),否则数据不会开始流动!

    Reactive库中到处都是同样的声明式、函数式的数据序列处理,Java 8的Streams也一样。因此,使用Stream的理念也是一样的:
    ```java
    Stream<String> stream = Streams.of("red", "white", "blue");
    Stream<String> upper = stream.map(value -> {
    System.out.println(value);
    return value.toUpperCase();
    });
    ```
    我们对Flux的观察,同样适用于这里:没有任何数据被处理,仅仅是一个执行的计划。
    然而,Flux和Stream之间有一些非常重要的区别,从而使得Stream不适合作为Reactive用例的API。
    - Flux拥有更多操作符,但其中多数仅仅是为了方便,真正的区别在于你想什么时候消费数据!
    >有一个非常有用的blog,是关于[Reactive Types](https://spring.io/blog/2016/04/19/understanding-reactive-types)的,
    在里面描述了不同流式和响应式API的区别 - 通过查看类型的定义,以及如何使用它们。Flux和Stream的区别更是高亮的,更详细。

    ### Subscribers | 订阅者

    想要让数据流动,你必须订阅Flux,使用某个`subscribe()`方法。只有这些方法才会让数据流动!
    它们会回溯你声明在序列上的操作符,并请求publisher开始创建数据。
    在上面的小例子中,意味着底层的字符串集合会开始迭代。而在更复杂的用例中,可能会触发 读取文件、从数据库中读取数据或调用一个HTTP服务。
    ```java
    Flux.just("red", "white", "blue")
    .log()
    .map(String::toUpperCase)
    .subscribe(); //注意,这里没有指定任何消费行为,所以,你懂的!
    ```
    输出是:
    ```text
    09:17:59.665 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@3ffc5af1)
    09:17:59.666 [main] INFO reactor.core.publisher.FluxLog - request(unbounded)
    09:17:59.666 [main] INFO reactor.core.publisher.FluxLog - onNext(red)
    09:17:59.667 [main] INFO reactor.core.publisher.FluxLog - onNext(white)
    09:17:59.667 [main] INFO reactor.core.publisher.FluxLog - onNext(blue)
    09:17:59.667 [main] INFO reactor.core.publisher.FluxLog - onComplete()
    ```

    注意,这里的`subscribe()`没有任何参数,仅仅是请求publisher发送**所有数据** -- 所以仅有一个`request()`被记录,是`unbounded`
    >这里有个小知识点,request,相当于TCP的ACK,其参数值就是ACK的sliding window 大小!
    我们可以看到每个数据被发布时的回调方法`onNext()`,序列结束时的回调方法`onComplete()`,以及最开始的订阅时的回调方法`onSuscribe()`
    __如果你需要,你可以自己监听这些事件,使用Flux中的`doOn*()`方法,这些方法都是操作符,不是订阅者,所以不会导致数据流动__
    `subscribe()`方法有重载,其变体有不同的选项以控制要发生的事情。
    一个重要且方便的形式是带有回调方法的变体。
    第一个参数是一个Consumer,用于每个item的回调,你还可以选择添加一个Consumer - 用于error,以及一个Runnable - 当序列完成时执行。
    ```java
    Flux.just("red", "white", "blue")
    .log()
    .map(String::toUpperCase)
    .subscribe(System.out::println);
    ```
    其输出:
    ```text
    09:56:12.680 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@59f99ea)
    09:56:12.682 [main] INFO reactor.core.publisher.FluxLog - request(unbounded)
    09:56:12.682 [main] INFO reactor.core.publisher.FluxLog - onNext(red)
    RED
    09:56:12.682 [main] INFO reactor.core.publisher.FluxLog - onNext(white)
    WHITE
    09:56:12.682 [main] INFO reactor.core.publisher.FluxLog - onNext(blue)
    BLUE
    09:56:12.682 [main] INFO reactor.core.publisher.FluxLog - onComplete()
    ```
    我们可以控制数据流,让其`bounded` - 有几种方式可以达成。
    原生API是你从`Subscriber`得到的`Subscription`。相对于上面`subscribe()`的长格式是:
    ```java
    .subscribe(new Subscriber<String>() {
    @Override
    public void onSubscribe(Subscription s) {
    s.request(Long.MAX_VALUE);
    }
    @Override
    public void onNext(String t) {
    System.out.println(t);
    }
    @Override
    public void onError(Throwable t) {
    }
    @Override
    public void onComplete() {
    }
    });
    ```
    如果想要控制同一时刻最多消费2个item,可以这样使用`Subscription`
    ```java
    .subscribe(new Subscriber<String>() {
    private long count = 0;
    private Subscription subscription;
    @Override
    public void onSubscribe(Subscription subscription) {
    this.subscription = subscription;
    subscription.request(2);
    }
    @Override
    public void onNext(String t) {
    count++;
    if (count>=2) {
    count = 0;
    subscription.request(2);
    }
    }
    //...
    }
    ```
    `Subscriber` 会一次批发2个items! 这可是一个非常常见的用例,也许你会想着将其提取出来,这样可读性就更佳。
    不过,先来看看输出吧:
    ```text
    09:47:13.562 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@61832929)
    09:47:13.564 [main] INFO reactor.core.publisher.FluxLog - request(2)
    09:47:13.564 [main] INFO reactor.core.publisher.FluxLog - onNext(red)
    09:47:13.565 [main] INFO reactor.core.publisher.FluxLog - onNext(white)
    09:47:13.565 [main] INFO reactor.core.publisher.FluxLog - request(2)
    09:47:13.565 [main] INFO reactor.core.publisher.FluxLog - onNext(blue)
    09:47:13.565 [main] INFO reactor.core.publisher.FluxLog - onComplete()
    ```
    事实上,批量订阅是非常常见的用力,所以`Flux`提供了几个便利的方法。上面的例子可以这样写:
    ```java
    Flux.just("red", "white", "blue")
    .log()
    .map(String::toUpperCase)
    .subscribe(null, 2); //FIXME 新版本没有这个方法了!
    ```
    输出是:
    ```text
    10:25:43.739 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@4667ae56)
    10:25:43.740 [main] INFO reactor.core.publisher.FluxLog - request(2)
    10:25:43.740 [main] INFO reactor.core.publisher.FluxLog - onNext(red)
    10:25:43.741 [main] INFO reactor.core.publisher.FluxLog - onNext(white)
    10:25:43.741 [main] INFO reactor.core.publisher.FluxLog - request(2)
    10:25:43.741 [main] INFO reactor.core.publisher.FluxLog - onNext(blue)
    10:25:43.741 [main] INFO reactor.core.publisher.FluxLog - onComplete()
    ```
    务必注意,新版本已经没有这个方法了!!!!!
    >注意,诸如Spring Reactive Web这种会为你处理sequences的库,会handle the subscription。
    >将这些关注点推到栈底是件很好的事,因为会避免让你陷入零乱的非阻塞逻辑代码中,从而变得更有可读性,便于测试和维护。
    >因此,如果能够避免subscribe to a sequence,或者起码也可以将那些代码放入一个处理层,而非业务层,是非常好的。

    ### Threads, Schedulers and Background Processing | 线程、定时器、背压处理

    上面的所有log,有一个有意思的特点,就是它们都位于main thread,就是调用`subscribe()`的线程。
    这揭示了一个重要的特点:Reactor对于线程的使用是非常节俭的,因为这会给你最佳的性能体验!
    如果你一直在使用线程、线程池、异步执行 等方式来榨干服务的每一份性能,也许你会觉得奇怪。
    但这是真的:如果没有必要的线程切换,即便JVM高度优化了线程切换,其速度也不如单线程计算快!
    Reactor掌握了这些关键点,并用来控制异步处理,并假定你知道你在做什么!
    Flux,提供了少数配置方法,用于控制线程边界。例如,你可以配置使用一个背景线程:Flux.subscribeOn():
    ```java
    Flux.just("red", "white", "blue")
    .log()
    .map(String::toUpperCase)
    .subscribeOn(Schedulers.parallel())
    .subscribe(null, 2); //FIXME 旧的方法,不再支持!
    ```
    输出:
    ```text
    13:43:41.279 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@58663fc3)
    13:43:41.280 [parallel-1-1] INFO reactor.core.publisher.FluxLog - request(2)
    13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onNext(red)
    13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onNext(white)
    13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog - request(2)
    13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onNext(blue)
    13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onComplete()
    ```
    >注意,当你执行这段代码的时候,记得不要在处理结束之前退出JVM!
    注意,所有的处理都发生在后台的一个线程上。如果处理是CPU密集型,这样做也行(但无意义,因为不会更快)。
    你也可能想要能够执行IO密集型的处理,甚至可能是阻塞的。这种情况下,你希望尽快的完成,而不会阻塞调用者。那么线程池是你的选择,这也是你从`Schedulers.parallel()`得到的。
    如果想将每个独立item的处理切换到不同的线程中(上限是线程池的数量),那我们需要将它们打散到不同的publisher中,每个publisher都在一个背景线程中请求结果。
    - 一种方式是使用操作符`flatMap()`,会将所有的items映射进一个Publisher(一般是不同类型的),然后返回新类型的sequence:
    ```java
    Flux.just("red", "white", "blue")
    .log()
    .flatMap(value ->
    Mono.just(value.toUpperCase())
    .subscribeOn(Schedulers.parallel()),
    2)
    .subscribe(value -> {
    log.info("Consumed: " + value);
    })
    ```
    注意,这里使用`flatMap()`将items推到child publisher中,然后我们就可以控制每个item线程的订阅了。
    Reactor的默认行为,是尽可能地使用单线程,因此,如果我们需要使用背景线程处理,那就需要显式的写出来。
    实际上,这是一些共知的强制并行处理的技巧之一(更多详见[Reactive Gems issue](https://github.com/reactor/reactive-streams-commons/issues/21))。

    输出:
    ```text
    15:24:36.596 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@6f1fba17)
    15:24:36.610 [main] INFO reactor.core.publisher.FluxLog - request(2)
    15:24:36.610 [main] INFO reactor.core.publisher.FluxLog - onNext(red)
    15:24:36.613 [main] INFO reactor.core.publisher.FluxLog - onNext(white)
    15:24:36.613 [parallel-1-1] INFO com.example.FluxFeaturesTests - Consumed: RED
    15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog - request(1)
    15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onNext(blue)
    15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onComplete()
    15:24:36.614 [parallel-3-1] INFO com.example.FluxFeaturesTests - Consumed: BLUE
    15:24:36.617 [parallel-2-1] INFO com.example.FluxFeaturesTests - Consumed: WHITE
    ```
    注意,现在有多个线程在消费items,而`flatMap()`中的第二个参数`concurrency hint`,则确保任何时刻都只有2 items被处理(意思是总共处理2 items),当然前提是可用。
    我们看到很多`request(1)`,是因为系统试图保留2 items在管道线中,一般来说,此时没有完成处理。
    Reactor 力图聪明地处理,它会从上游Publisher中pre-fetch items,以消除等待时间。
    >3 items 显得太少了,最好多一些数据。
    Flux还有一个`publishOn()`方法,是用于监听器(如onNext()或者consumer callbacks),而非subscriber:
    ```java
    Flux.just("red", "white", "blue")
    .log()
    .map(String::toUpperCase)
    .subscribeOn(Schedulers.newParallel("sub"))
    .publishOn(Schedulers.newParallel("pub"), 2)
    .subscribe(value -> {
    log.info("Consumed: " + value);
    });
    ```
    输出:
    ```text
    15:12:09.750 [sub-1-1] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@172ed57)
    15:12:09.758 [sub-1-1] INFO reactor.core.publisher.FluxLog - request(2)
    15:12:09.759 [sub-1-1] INFO reactor.core.publisher.FluxLog - onNext(red)
    15:12:09.759 [sub-1-1] INFO reactor.core.publisher.FluxLog - onNext(white)
    15:12:09.770 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: RED
    15:12:09.771 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: WHITE
    15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog - request(2)
    15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog - onNext(blue)
    15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog - onComplete()
    15:12:09.783 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: BLUE
    ```
    注意,consumer callbacks(记录"Consumed: ..."),是在publisher thread上!
    > Notice that the consumer callbacks (logging "Consumed: …​") are on the publisher thread pub-1-1. If you take out the subscribeOn() call,
    you might see all of the 2nd chunk of data processed on the pub-1-1 thread as well. This, again, is Reactor being frugal with threads 
    — if there’s no explicit request to switch threads it stays on the same one for the next call, whatever that is.
    > We changed the code in this sample from subscribe(null, 2) to adding a prefetch=2 to the publishOn().
    In this case the fetch size hint in subscribe() would have been ignored.

    ### Extractors: The Subscribers from the Dark Side

    还有另一种方式订阅一个sequence,就是调用 `Mono.block()`或者 `Mono.toFuture()` 或者 `Mono.toStream()`(这些都是提取器方法,它们会跳出Reactive类型,而进入一个阻塞式抽象)。
    Flux也有一些转换器`collectList()``collectMap()`,会将Flux转成Mono!
    They don’t actually subscribe to the sequence, but they do throw away any control you might have had over the suscription at the level of the individual items.
    >警告,一个好的rule是,永远不要调用一个提取器!当然也有例外(否则方法就不会存在)。一个明显的例外就是在测试中,因为阻塞以收集结果很有用。
    这些方法,就是作为转换通道,架起Reactive到阻塞式的桥梁。
    当你调用`Mono.block()`时,你会享受不到任何Reactive Streams带来的好处。
    这一点,就是Reactive Streams 和 Java 8 Streams的关键区别:原生Java Streams只有"all or nothing" 订阅模式,等价于`Mono.block()`
    当然,`subscribe()`也可以阻塞调用线程,所以跟转换器方法一样微信,但你有更多控制 - 你可以使用`subscribeOn()`来阻止阻塞,还可以使用背压来临时决定是否继续。

    ## Conclusion | 总结

    在本文中,我们涉及了Reactive Streams和Reactor APIs的基本内容。
    如果你需要了解更多,有很多书籍,但都不能替代编码,所以,尽量使用[GitHub中的代码](https://github.com/dsyer/reactive-notes)吧。
    [本系列的下一篇文章](https://spring.io/blog/2016/07/20/notes-on-reactive-programming-part-iii-a-simple-http-server-application)会挖掘一些更深的知识:Reactive模型的阻塞式、分发、异步。
  • 相关阅读:
    动手动脑
    大道至简第七八章读后感
    super 的用法
    第六章
    课后作业
    大道至简第五章读后感
    课后作业
    大道至简第四章读后感
    大道至简——第六章
    Java数组课后作业
  • 原文地址:https://www.cnblogs.com/larryzeal/p/8552844.html
Copyright © 2011-2022 走看看