Reactive是函数式编程(Functional),管道流(pipeline,stream),异步非阻塞的,事件驱动的.
org.reactivestreams包主要有4个接口
发布者 Publisher
public interface Publisher<T>{
public void subscribe(Subscriber<? super T> s);
}
订阅者 Subscriber
当接收到Publisher的数据时,会调用响应的回调方法.注册完成时,首先会调用
onSubscribe方法,参数Subscription s包含了注册信息.
为什么注册?注册这个订阅者到发布的信息上
public interface Subscriber<T>{
//注册完成后,首先被调用
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
订阅Subscription
这个不是er而是tion,很明显一个是器具一个是motive
1.通过订阅,订阅者Subscriber可以请求数据request,或者取消订阅cancel
2.在请求数据时,参数long n表示希望接收的数据量,防止发布者Publisher发送过多的数据.
3.一旦开始请求,数据就会在流stream中传输.每接收一个,就会调用onNext(T t);发生错误
时,onError(Throwable t)被调用;传输完成后,onComplete()被调用
public interface Subscription{
//请求数据,参数n为请求的数据量,不是超时时间or other anythings
public void request(long n);
//取消订阅
public void cancel();
}
- Processor
可以看出,Processor接口继承了Subscriber和Publisher,是流的中间环节.in the middle
public interface Processor<T,R> extends Subscriber<T>,Publisher<R>{
}
Reactive Stream中数据从Publisher开始,经过若干个Processor,最终到达Subscriber,即完整的Pipeline.
Project Reactor
依赖
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
Mono和Flux
- 抽象类Mono和Flux实现了Publisher接口,他们是发布者.
- Mono表示少于等于1个数据(即0个,或1个数据)或错误;Flux表示一连串多个数据.
操作
- 创建Flux或Mono,调用subscribe()后,数据开始流动
主要方法有: just,fromArray,fromStream,fromIterable,range
/**
* 创建Flux或Mono,调用subscribe()后,数据开始流动
* 主要方法有: just,fromArray,fromStream,fromIterable,range
*/
@Test
public void create(){
//just方法
String[] arr = new String[]{"hello","world"};
Flux<String> flux1 = Flux.just(arr);
flux1.subscribe(System.out::println);
//Mono支持单个或0个
Mono<String> mono = Mono.just("hi world");
mono.subscribe(System.out::println);
//fromArray
List<String> list = Arrays.asList("welcome", "single one");
Flux<String> flux2 = Flux.fromIterable(list);
flux2.subscribe(System.out::println);
//fromIterable方法
List<String> fruitList = new ArrayList<>();
fruitList.add("Apple");
fruitList.add("Orange");
Flux<String> flux3 = Flux.fromIterable(fruitList);
flux3.subscribe(System.out::println);
//fromStream方法
Stream<String> stream = Stream.of("hi","hello");
Flux<String> flux4 = Flux.fromStream(stream);
//range方法
Flux<Integer> range = Flux.range(0,5);
//interval方法,take方法限制个数为5个
Flux<Long> longFlux = Flux.interval(Duration.ofSeconds(1)).take(5);
longFlux.subscribe(System.out::println);
}
输出:
hello
world
hi world
welcome
single one
Apple
Orange
2.合并mergeWith
@Test
public void mergeFlux(){
Flux<String> source1 = Flux.just("hello","world");
Flux<String> source2 = Flux.just("hi","G");
Flux<String> merge = source1.mergeWith(source2);
merge.subscribe(System.out::println);
}
原文 还有一些其它方法没有加入博客代码中
https://www.jianshu.com/p/9d3a2a28976a