zoukankan      html  css  js  c++  java
  • 响应式编程笔记三:一个简单的HTTP服务器

    # 响应式编程笔记三:一个简单的HTTP服务器

    本文我们将继续前面的学习,但将更多的注意力放在用例和编写实际能用的代码上面,而非基本的APIs学习。

    我们会看到Reactive是一个有用的抽象 - 对于并发编程来说 - 但它还有一些非常低级别的特性,应该引起我们的注意。

    如果我们开始使用这些功能,挖掘其全部潜能,那我们可以控制我们应用中的layers - 那些之前不可见的、被容器|平台|框架隐藏起来的layers!

    ## Bridging from Blocking to Reactive with Spring MVC | 在Spring MVC中,将阻塞式桥接到响应式

    响应式会强迫你用不同的眼光去看待世界。

    不再是请求和获取(也许没有获取到),所有的东西都作为一个`sequence(Publisher)`被投递过来,当然你必须先`subscribe`。

    不再是一直等待一个响应,你必须注册一个`callback`。

    当你习惯了的时候,就变得很轻松,但是,除非整个世界都变成了响应式的,不然你还是需要与旧式的阻塞式API打交道。

    假定我们有一个阻塞式方法,会返回一个 `HttpStatus`:

    private RestTemplate restTemplate = new RestTemplate();
    
    private HttpStatus block(int value) {
        return this.restTemplate.getForEntity("http://example.com/{value}", String.class, value)
                .getStatusCode();
    }

    我们希望以不同的参数来调用它,并聚合所有的结果。这是一个典型的 **分散-聚集 (scatter-gather)** 用例,假定每次请求你都获得了一个分页结果,但最后需要得到所有结果的 top N。由于阻塞式操作与 scatter-gather模式无关,我们将其放到一个 `block()`方法中,并在稍后实现它。现在,先来看一个**坏例子**

    Flux.range(1, 10) //(1)
        .log()
        .map(this::block) //(2)
        .collect(Result::new, Result::add) //(3)
        .doOnSuccess(Result::stop) //(4)

    1. 发起请求的次数

    2. 阻塞式代码

    3. 收集结果,并聚合到一个独立的对象中

    4. 最后结束(结果是 `Mono<Result>`)

    不要学习这个坏例子,因为,虽然APIs用的没问题,但仍然会阻塞住线程!

    这个例子和直接循环发起请求没什么不同。

    更好的实现 则应该将对`block()`的调用放到一个后台线程中。例如:

    private Mono<HttpStatus> fetch(int value) {
        return Mono.fromCallable(()->block(value))  //(1)
            .subscribeOn(this.scheduler);           //(2)
    }

    1. 阻塞式代码现在位于Callable中,会延迟执行。

    2. 在后台线程订阅。

    `scheduler` 需要另外声明:

    Scheduler scheduler = Schedulers.parallel();

    然后,我们就可以使用`flatMap()`,而非`map()`:

    Flux.range(1, 10)
        .log()
        .flatMap(                           //(1)
            this::fetch, 4)                 //(2)
        .collect(Result::new, Result::add)
        .doOnSuccess(Result::stop);

    1. 使用新的publisher来并行处理

    2. flatMap 的并发量

    ## Embedding in a Non-Reactive Server 内置一个非响应式服务器

    如果你想在一个非响应式服务器中运行上面的代码,可以使用Spring MVC:

    @RequestMapping("/parallel")
    public CompletableFuture<Result> parallel() {
        return Flux.range(1, 10)
            .log()
            .flatMap(this::fetch, 4)
            .doOnSuccess(Result::stop)
            .toFuture();
    }

    如果你读过`@RequestMapping`的Javadocs,那你会发现其方法可以返回一个`CompletableFuture`,这样,应用会使用这个返回来生成一个真正的返回值 - 在另一个线程中。本例中的这个"另一个线程"是由"scheduler"提供的, scheduler是一个线程池,所以真正的处理是多线程的,上面的代码,会4线程并发!

    ## No Such Thing as a Free Lunch | 没有免费的午餐

    虽然上面 用后台线程运行 scatter-gather 代码 是一个有用的模式,但它仍不够完美 - 虽然没有阻塞调用者,但阻塞了别的,就是说,它只是转移了问题。

    我们有一个HTTP服务器,可能会带有NIO handlers,将工作传回到一个线程池,每个线程处理一个HTTP request - 所有的这些都发生在Servlet容器内部。

    request是被异步处理的,因此Tomcat的worker线程没有被阻塞,scheduler中创建的线程池会用4线程来处理。

    我们在处理10个后端请求( 对 `block()`的调用),因此,使用scheduler会有一个最大的、理论的受益,就是降低4倍延迟。

    换句话说,如果在一个线程中处理需要 1000ms的话,那现在可能只需要 250ms了。

    注意,这里只是可能:只有在没有竞争的情况下才会那么快。

    提示:tomcat默认分配了100个线程来处理HTTP请求。如果所有的请求都经过我们的scheduler线程池的话,那完全超出了线程池的容量。这是一个完全错误的搭配:scheduler线程池可能是一个瓶颈!这意味着性能调优会非常艰难,你可能调整了所有配置,然后达到一个很脆弱的平衡 - 随时可能被破坏。

    Tomcat allocates 100 threads for processing HTTP requests by default. That is excessive if we know all the processing is going to be on our scheduler thread pool. There is an impedance mismatch: the scheduler thread pool can be a bottleneck because it has fewer threads than the upstream Tomcat thread pool. This highlights the fact that performance tuning can be very hard, and, while you might have control of all the configuration, it’s a delicate balance.

    我们可以使用弹性的线程池,而非固定的。 这对Reactor来说非常简单,只要使用 `Schedulers.elastic()`即可 - 可以多次调用,但只会有一个实例!

    ## Reactive all the Way Down

    从阻塞式到响应式的桥接是一个有用的模式,且很容易在Spring MVC中实现。

    下一步就是完全地干掉应用线程中的阻塞式,这得使用新的APIs和新的工具。

    极限就是完全响应式,从服务器到客户端。这是Spring Reactive的目标!

    Spring Reactive是一个新的框架,与Spring MVC是完全不同的方向,但会实现同样的需求,并使用相似的编程模型。

    >注意,Spring Reactive开始是一个单独的项目,但已经被打包进Spring Framework了,版本5 。

    还是拿前面的 scatter-gather例子来说,如果想全响应式,那第一步就是使用`spring-boot-starter-web-reactive`来代替`spring-boot-starter-web`。

    org.springframework.boot.experimentalspring-boot-starter-web-reactive ...org.springframework.boot.experimentalspring-boot-dependencies-web-reactive0.1.0.M1pomimport

    >注意,上面的版本可能已过时,请自行查找新版本。

    <dependencies>
      <dependency>
        <groupId>org.springframework.boot.experimental</groupId>
        <artifactId>spring-boot-starter-web-reactive</artifactId>
      </dependency>
      ...
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot.experimental</groupId>
                <artifactId>spring-boot-dependencies-web-reactive</artifactId>
                <version>0.1.0.M1</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    然后,在controller中,只需要这样:

    @RequestMapping("/parallel")
    public Mono<Result> parallel(){
        return Flux.range(1, 10)
            .log()
            .flatMap(this::fetch, 4)
            .collect(Result::new, Result::add)
            .doOnSuccess(Result::stop);
    }

    在Spring Boot中运行,会自动在Tomcat、Jetty或Netty中运行。默认是Tomcat,如果有不同的选择,需要先exclude tomcat!

    我们仍然使用阻塞式后端调用 `block()`,因此,我们仍然需要`subscribe()`一个线程池来避免阻塞住调用者。

    我们可以使用非阻塞式客户端WebClient,而非RestTemplate!

    private WebClient client = new WebClient(new ReactorHttpClientRequestFactory());
    
    private Mono<HttpStatus> fetch(int value){
        return this.client.perform(HttpRequestBuilders.get("http://example.com"))
                   .extract(WebResponseExtractors.response(String.class)) 
                   .map(response -> response.getStatusCode());
    }

    注意 `WebClient.perform()` (或者更确切的说是 `WebResponseExtractor` ) 会有一个响应式返回类型,我们已经将其转换为 `Mono<HttpStatus>`,但我们没有`subscribe`它。我们希望框架负责所有的订阅,这样我们就是全响应式了。

    >警告:Spring Reactive中 那些返回`Publisher`的方法,都是非阻塞的!而普通方法返回`Publisher`(或`Flux`|`Mono`|`Observable`) 则可能是非阻塞的!如果你在编写这样的方法,最好认真分析、测试一下它们是否阻塞。

    >注意:上面我们使用了一个非阻塞的客户端来简化了HTTP栈,同样也可以在常规Spring MVC中工作。`fetch()`的结果可被转换成一个 `CompletableFuture`,从常规的`@RequestMapping`方法中传出。

    ## Inversion of Control | 控制反转

    现在,我们可以移除并发数量了:

    @RequestMapping("/netty")
    public Mono<Result> netty() {
        return Flux.range(1, 10)        //(1)
            .log()
            .flatMap(this::fetch)       //(2)
            .collect(Result::new, Result::add)
            .doOnSuccess(Result::stop);
    }

    1. 发起10个调用

    2. 使用新的publisher来并发处理

    现在我们不需要额外的 callable和 subscriber线程了,代码简洁了不要太多。

    响应式`WebClient` 返回了一个 `Mono`,这会驱动我们在变形链中直接选择`flatMap()`,然后就可以得到我们需要的code了!

    优雅,可读性更高,更易于维护!

    还有,因为没有了线程池和并发量,也就没有了魔数 4

    当然,还是有一个限制,但不会再影响我们在应用层(application tier)的选择,也不会局限于服务器容器了。

    这不是魔法,因为仍然是物理规律综合的结果,所以,后端调用仍然会执行 100ms,但竞争很少 - 我们可能看到10个请求完全同时执行。

    当服务器的负载增加了延迟时,吞吐率会自然地降级,而降级方式则由缓冲竞争和内核网络治理,而非应用线程管理。

    这是一种控制反转,由底层控制,而非应用代码控制。

    请记住,同样的应用代码运行在Tomcat、Jetty或Netty中。

    目前,Tomcat和Jetty的支持,是基于Servlet 3.1 异步处理之上的,所以受限于 一个请求一个线程。

    当同样的代码运行在Netty服务器平台上时,就没有这个限制了,服务器可以分派请求到web客户端。

    只要客户端没有阻塞,所有人都会高兴。

    netty服务器和客户端的性能指标是类似的,但Netty服务器不受限于 一个线程处理一个请求,所以,它不使用大的线程池,我们可能期望看到一些不同的资源利用。

    我们会在本系列的其他文章中讨论它。

    >提示:在[样例代码](https://github.com/dsyer/reactive-notes)中,reactive样例支持的maven profiles有:tomcat、tomcatNext(for Tomcat 8.5)、jetty、netty。

    >注意:很多应用中的阻塞式代码不是HTTP后端调用,而是数据库交互。目前很少有数据库支持非阻塞式客户端(MongoDB和Couchbase是礼物,但也不如HTTP客户端成熟)。线程池和 blocking-to-reactive pattern会存在很长时间,一直到所有数据库能够跟上。

    ## Still No Free Lunch | 还是没有免费的午餐

    虽然到目前为止我们做的看起来都很好,但很快就会有一些错误发生,例如 表现恶劣的网络连接、后端服务忍受严重的延迟。

    首先,最明显的就是我们写的代码都是声明式的,所以很难调试。当错误发生时,诊断可能很模糊。使用原生的、低级别的APIs,例如不带Spring的Reactor,或者没有Reactor的Netty级别,可能会让情况变得更糟,因为我们必须构建大量错误处理,每次与网络交互,都要重复一些呆板的代码。起码,混合使用Spring和Reactor,我们可以看到栈追踪记录、未捕获的异常。它们可能不是那么好理解,因为它们是发生在我们不能控制的线程中;有些时候,它们还给出一些非常低级的信息。

    另一个痛苦之源则是,如果我们犯了错误,并阻塞在我们的响应式callbacks里,我们会停住(hold up)该线程里的**所有requests**。

    在基于Servlet的容器中,每个request都被隔离到一个线程中,阻塞不会停住其他的requests,因为它们是在不同的线程上处理。

    阻塞所有requests是麻烦的来源,但它仅会在延迟增加(见下面原文) 时出现。在响应式世界里,阻塞一个request会导致所有requests加大延迟,而阻塞所有requests则会让服务器跪下来唱征服,因为没有额外的缓冲层和线程去处理。

    > Blocking all requests is still a recipe for trouble, but it only shows up as increased latency with roughly a constant factor per request.

    ## Conclusion | 结论

    能够控制异步处理中的每个移动部分,是一件很爽事:每一层都有一个线程池尺寸和一个队列。

    我们可以让某层使用弹性的线程池,根据它们的工作去调整。

    但同时,这也是一种负担,我们开始寻找更简单的或者更简洁的。

    大量分析的结论是,移除额外的线程,配合物理硬件的限制来使用,通常是一个更好的选择。

    This is an example of "mechanical sympathy", as is famously exploited by LMAX to great effect in the [Disruptor Pattern](https://lmax-exchange.github.io/disruptor/).

    我们已经开始看到响应式的强大,但是请记住,强大伴随着责任。

    它是激进的,它也是基础的。

    它是"放下一切,从头开始"的领域。

    你可能希望看到响应式不是所有问题的解决方案。事实上它的确不是,它只是特定一类问题的解决方案。

    你的收获可能远超学习、修改、维护的代价。

    ## 原文

    https://spring.io/blog/2016/07/20/notes-on-reactive-programming-part-iii-a-simple-http-server-application

  • 相关阅读:
    ege图形化编程配置过程及出现的问题解决方法
    两个头文件相互包含导致未定义类型
    20180318CSP比赛
    jdk7和jdk8都下载了 如何设置java版本为jdk7?
    CCF|CSP|模拟试题|游戏
    2018蓝桥杯|基础练习|十六进制转八进制
    2018蓝桥杯|历届试题|翻硬币
    2018蓝桥杯|历届试题|数字游戏
    2020年9月18日 可变字符序列:StringBuffer和StringBuilder(尽量掌握底层代码跟踪分析的能力)
    2020年9月17日 String 常用方法四、五、六、七、八、九
  • 原文地址:https://www.cnblogs.com/larryzeal/p/8878171.html
Copyright © 2011-2022 走看看