zoukankan      html  css  js  c++  java
  • javaStream与响应式流

    将Java Stream用于响应式编程中,是有局限性的。比如如下两个需要面对的问题:

    1. Web 应用具有I/O密集的特点,I/O阻塞会带来比较大的性能损失或资源浪费,我们需要一种异步非阻塞的响应式的库,而Java Stream是一种同步API
    2. 假设我们要搭建从数据层到前端的一个变化传递管道,可能会遇到数据层每秒上千次的数据更新,而显然不需要向前端传递每一次更新,这时候就需要一种流量控制能力,就像我们家里的水龙头,可以控制开关流速,而Java Stream不具备完善的对数据流的流量控制的能力。

    具备“异步非阻塞”特性和“流量控制”能力的数据流,我们称之为响应式流(Reactive Stream)。

    目前有几个实现了响应式流规范的Java库,这里简单介绍两个:RxJavaReactor

    要介绍RxJava,就不得不提ReactiveX(Reactive Extensions,Rx),它最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET、JavaScript和C++,Rx近几年越来越流行了,现在已经支持几乎全部的流行编程语言了,包括RxJS、RxJava等。

    后来,Java社区的一些大牛凑到一起制定了一个响应式流规范。RxJava团队随后对1版本进行了重构,形成了兼容该响应流规范的RxJava 2。

    Reactor是Pivotal旗下的项目,与大名鼎鼎的Spring是兄弟关系,因此是Spring近期推出的响应式模块WebFlux的“御用”响应式流。Reactor支持响应式流规范,与RxJava相比,它没有任何历史包袱,专注于Server端的响应式开发,而RxJava更多倾向于Android端的响应式开发。

    在Java 9版本中,响应式流的规范被纳入到了JDK中,相应的API接口是java.util.concurrent.Flow

    Spring WebFlux也是本系列文章后边的重点内容。由于WebFlux首选Reactor作为其响应式技术栈的一部分,我们下边也主要以Reactor为主,目前的版本是Reactor3。

    我们继续回到主线,讨论“异步非阻塞”和“流量控制”。注意,本节请不必关注Reactor的代码细节,仅体会使用响应式流的“感觉”就好。

    一、阻塞

    对于阻塞造成的性能损失,我们通常有两种思路来解决:

    1. 并行化:使用更多的线程和硬件资源;
    2. 异步化:基于现有的资源来提高执行效率。

    解决方案之一:多线程

    解决方案之二:非阻塞

    1)非阻塞的回调

    2)异步的CompletableFuture

    Flux<String> ids = ifhrIds(); // <1>
    
    Flux<String> combinations =
        ids.flatMap(id -> { // <2>
            Mono<String> nameTask = ifhrName(id); // <3>
            Mono<Integer> statTask = ifhrStat(id); // <4>
    
            return nameTask.zipWith(statTask, // <5>
                    (name, stat) -> "Name " + name + " has stats " + stat);
        });
    
    Mono<List<String>> result = combinations.collectList(); // <6>
    
    List<String> results = result.block(); // <7>
    assertThat(results).containsExactly( // <8>
        "Name NameJoe has stats 103",
        "Name NameBart has stats 104",
        "Name NameHenry has stats 105",
        "Name NameNicole has stats 106",
        "Name NameABSLAJNFOAJNFOANFANSF has stats 121"
    );

    说明:

      1. 这一次,我们从一个异步方式提供的 ids 序列(Flux<String>)开始。
      2. 对于序列中的每一个元素,我们异步地处理它(flatMap 方法内)两次。
      3. 获取相应的 name。
      4. 获取相应的 statistic.
      5. 异步地组合两个值。
      6. 随着序列中的元素值“到位”,它们收集一个 List 中。
      7. 在生成流的环节,我们可以继续异步地操作 Flux 流,对其进行组合和订阅(subscribe)。
        最终我们很可能得到一个 Mono 。由于是测试,我们阻塞住(block()),等待流处理过程结束,
        然后直接返回集合。
      8. Assert 结果。

    这种非阻塞数据流的感觉,让我想起来了《让×××飞》里边最经典的一段:姜文饰演的张麻子朝新来县长那“马拉的火车啪啪啪连续打了N枪,旁边兄弟问“打中没有”,张麻子说“让×××飞一会儿~”,稍后就见拉火车的马缰绳全都被×××打断,马匹四散,非常6+1!如果张麻子每打一枪都看看前一枪有没有射中的话,还怎么装X呢?

    通过上边的例子可见,回调或 CompletableFuture在处理复杂逻辑时会遇到的相似的窘境,反观Reactor3提供的API,却可以显著减少代码量,提高代码可阅读性,尤其是还可以提供一些不错的功能。

    二、流量控制——回压

    在响应式流中,数据流的发出者叫做Publisher,监听者叫做Subscriber。我们后续就统一直译叫做“发布者”和“订阅者”吧。

     

    问题来了,假如发布者发出数据的速度和订阅者处理数据的速度不同的时候,怎么办呢?订阅者处理速度快的话,那还好说,但是如果处理速度跟不上数据发出的速度,就像这样:

    回压

    如果没有流量控制,那么订阅者会被发布者快速产生的数据流淹没。就像在一个流水线上,如果某个工位处理比较慢,而上游下料比较快的话,这个工位的工人师傅就吃不消了,这个时候他需要一种途径来告诉上游下料慢一些。

    同样的,订阅者也需要有一种能够向上游反馈流量需求的机制:

    title

    这种能够向上游反馈流量请求的机制就叫做回压(backpressure,也有翻译为“背压”的)。

    在具体的使用过程中,回压的处理会涉及不同的策略。举两个例子以便于理解:

    举例:缓存的策略

    缓存

    如图,订阅者处理完一个元素的时候通过request(1)跟发布者再请求一个元素。由于发布者的数据不能很快被订阅者处理掉,那么发布者会将未处理的数据元素缓存起来。

    这种处理方式与消息队列有些相似之处,发布者需要维护一个队列用来缓存还没有被处理的元素。通常用于对数据准确性要求比较高的场景,比如发布者这儿是突然到来的数据高峰,都是要保存到数据库的,作为订阅者的数据持久层没有那么快的处理速度,那么发布者就需要将数据暂时缓存起来。

    举例:丢弃的策略

    丢弃

    如图,发布者不需要缓存来不及处理的数据,而是直接丢弃,当订阅者请求数据的时候,会拿到发布者那里最近的一个数据元素。比如我们在做一个监控系统,后台的监控数据以每秒10个的速度产生,而前端界面只需要每秒钟更新一下监控数据即可,那作为发布者的后台就不用缓存数据了,因为这种时效性强的场景,用不到的数据直接丢掉即可。

    在后续的实战阶段,我们还会再深入了解回压的作用原理。

    1.2.3 总结

    以上就是响应式流的两个核心特点:异步非阻塞,以及基于“回压”机制的流量控制。

    这样我们有了基于响应式流的“升级版”的响应式编程:

    响应式编程

    Reactor3和RxJava2都是具有以上特点的响应式流的具体实现库。

    响应式编程通常作为面向对象编程中的“观察者模式”(Observer design pattern)的一种扩展。 响应式流(reactive streams)与“迭代子模式”(Iterator design pattern)也有相通之处, 因为其中也有 Iterable-Iterator 这样的对应关系。主要的区别在于,Iterator 是基于 “拉取”(pull)方式的,而响应式流是基于“推送”(push)方式的。

    使用 iterator 是一种“命令式”(imperative)编程范式,因为什么时候获取下一个元素取决于开发者。在响应式流中,相对应的角色是“发布者 - 订阅者”(Publisher-Subscriber),当有新的值到来的时候,反过来由发布者(Publisher) 通知订阅者(Subscriber),这种“推送”模式是响应式的关键。此外,对推送来的数据的操作 是通过一种声明式(declaratively)而不是命令式(imperatively)的方式表达的:开发者通过 描述“处理流程”来定义对数据流的处理逻辑。

     转自:https://blog.51cto.com/liukang/2090183

  • 相关阅读:
    CrackMe17
    逆向按钮事件定位
    CrackMe20
    CrackMe14
    CrackMe09
    CrackMe08
    分布式事务seata
    SpringBoot自动注入原理初解与实现
    InnoDB事务日志(redo log 和 undo log)详解
    高频面试题:Spring 如何解决循环依赖?
  • 原文地址:https://www.cnblogs.com/duanxz/p/14959710.html
Copyright © 2011-2022 走看看