响应式编程(reactive programming)是一种基于数据流(data stream)和变化传递(propagation of change)的声明式(declarative)的编程范式。
keywords:
异步非阻塞
数据流
流水线
算子 (计算逻辑)
声明式 :声明式比较适合基于流的处理方式
服务端技术栈
客户端技术
Spring WebFlux也提供了一个响应式的Http客户端API WebClient。它可以用函数式的方式异步非阻塞地发起Http请求并处理响应。其底层也是由Netty提供的异步支持。
我们可以把WebClient看做是响应式的RestTemplate,与后者相比,前者:
是非阻塞的,可以基于少量的线程处理更高的并发;
可以使用Java 8 lambda表达式;
支持异步的同时也可以支持同步的使用方式;
可以通过数据流的方式与服务端进行双向通信。
整个技术栈从命令式的、同步阻塞的【spring-webmvc + servlet + Tomcat】变成了响应式的、异步非阻塞的【spring-webflux + Reactor + Netty】
WebFlux的函数式开发模式
HandlerFunction和RouterFunction
HandlerFunction相当于Controller中的具体处理方法,输入为请求,输出为装在Mono中的响应:
Mono<T extends ServerResponse> handle(ServerRequest request);
RouterFunction,顾名思义,路由,相当于@RequestMapping,用来判断什么样的url映射到那个具体的HandlerFunction,输入为请求,输出为装在Mono里边的Handlerfunction:
Mono<HandlerFunction<T>> route(ServerRequest request);
服务器推送
SSE:服务端推送(Server Send Event),在客户端发起一次请求后会保持该连接,服务器端基于该连接持续向客户端发送数据,从HTML5开始加入。
public Mono<ServerResponse> sendTimePerSec(ServerRequest serverRequest) {
return ok().contentType(MediaType.TEXT_EVENT_STREAM).body( // 1
Flux.interval(Duration.ofSeconds(1)). // 2
map(l -> new SimpleDateFormat("HH:mm:ss").format(new Date())),
String.class);
}
响应式Spring Data
一切都是异步非阻塞的
概念
响应式编程通常作为面向对象编程中的“观察者模式”(Observer design pattern)的一种扩展。 响应式流(reactive streams)与“迭代子模式”(Iterator design pattern)也有相通之处, 因为其中也有 Iterable-Iterator 这样的对应关系。主要的区别在于,Iterator 是基于 “拉取”(pull)方式的,而响应式流是基于“推送”(push)方式的。
使用 iterator 是一种“命令式”(imperative)编程范式,因为什么时候获取下一个元素取决于开发者。在响应式流中,相对应的角色是“发布者 - 订阅者”(Publisher-Subscriber),当有新的值到来的时候,反过来由发布者(Publisher) 通知订阅者(Subscriber),这种“推送”模式是响应式的关键。此外,对推送来的数据的操作 是通过一种声明式(declaratively)而不是命令式(imperatively)的方式表达的:开发者通过 描述“处理流程”来定义对数据流的处理逻辑。
和lamada
- 函数响应式编程的重点在于“函数式”的语言特性,这个概念在二十年前就盖棺定论了。
- 响应式编程的重点在于“基于事件流”的异步编程范式,由不断产生的数据/时间来推动逻辑的执行。
(type1 arg1, type2 arg2...) -> { body }
函数式”编程范式的核心特点之一:函数是”一等公民”。
所谓”一等公民”(first class),指的是函数与其他数据类型一样,处于平等地位,可以赋值给其他变量,也可以作为参数,传入另一个函数,或者作为别的函数的返回值。
@FunctionalInterface
函数式接口
Function,接受一个输入参数,返回一个结果。参数与返回值的类型可以不同,我们之前的map方法内的lambda就是表示这个函数式接口的;
Consumer,接受一个输入参数并且无返回的操作。比如我们针对数据流的每一个元素进行打印,就可以用基于Consumer的lambda;
Supplier,无需输入参数,只返回结果。看接口名就知道是发挥了对象工厂的作用;
Predicate,接受一个输入参数,返回一个布尔值结果。比如我们在对数据流中的元素进行筛选的时候,就可以用基于Predicate的lambda;
…
Flux与Mono
Publisher
Flux和Mono都可以发出三种“数据信号”:元素值、错误信号、完成信号
- 首先,错误信号和完成信号都是终止信号,二者不可能同时共存;
- 如果没有发出任何一个元素值,而是直接发出完成/错误信号,表示这是一个空数据流;
- 如果没有错误信号和完成信号,那么就是一个无限数据流。
Subscriber
// 订阅并定义对正常数据元素、错误信号和完成信号的处理,以及订阅发生时的处理逻辑
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer);
Flux.just(1, 2, 3, 4, 5, 6)
仅仅声明了这个数据流,此时数据元素并未发出,只有subscribe()
方法调用的时候才会触发数据流。所以,订阅前什么都不会发生。
操作符 Operator
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html
调度器与线程模型
Scheduler
public void testSyncToAsync() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Mono.fromCallable(() -> getStringSync()) // 1
.subscribeOn(Schedulers.elastic()) // 2
.subscribe(System.out::println, null, countDownLatch::countDown);
countDownLatch.await(10, TimeUnit.SECONDS);
}
回压
subscribe(Subscriber subscriber)
BaseSubscriber
ref