RxJava2 Observable
前述
java-
1.8
maven-
3
rxjava-
2.2.3
我也不知道称呼为基类好不好...
官方介绍.
io.reactivex.Observable
: 0..N flows, no backpressure.
0...N flows, 但不支持背压.
示例(Observable
的简单使用)
RxJava是基于观察者模式的. Observable
是一个被观察者(它的观察者是Observer
).
在
Observable
的操作实现类中会生成6份信号(由while
实现)并发射, 可它的观察者Observer
则只接收前4份信号, 并逐一打印(也就是处理).
Observable
的操作实现类 - HelloObservable.java
package yag;
import io.reactivex.Observable;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
/**
* @author Senyag
*/
public class HelloObservable {
public void helloObservable(){
// 代码这样排个人觉得会直观一些.
// 初始化Observable
Observable
// create()操作符: 通过以编程方式调用observer方法从头开始创建一个Observable
.create((ObservableOnSubscribe<Integer>) observableEmitter -> {
//observableEmitter: 发射器
Integer i = 0;
while ( i < 7){
i++;
observableEmitter.onNext(i);
}
})
// subscribe()操作符: 根据Observable的发射和通知进行操作
.subscribe(new Observer<Integer>() { // Observer 就是观察者
private Disposable mDisposable;
@Override
public void onSubscribe(Disposable disposable) {
mDisposable = disposable;
}
@Override
public void onNext(Integer i) {
if (i == 5){
// mDisposable可以切断操作, 让Observer不再接收信息.
mDisposable.dispose();
}else {
System.out.println("现在接收到的信号是: 第" + i + "信号");
}
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
});
}
}
执行者 - Runner.java
package yag;
public class Runner {
public static void main(String[] args){
HelloObservable helloObservable = new HelloObservable();
helloObservable.helloObservable();
}
}
执行结果
现在接收到的信号是: 第1信号
现在接收到的信号是: 第2信号
现在接收到的信号是: 第3信号
现在接收到的信号是: 第4信号
Process finished with exit code 0
小结
用到了两个操作符: create()
(创建发射器)和subscribe()
(处理所发射的请求). 官方中针对这些操作符给出了特定的一页来介绍它们: 传送门