RxJava2 Single, Maybe 和 Completable
原文: https://www.jianshu.com/p/66a55abbadef
参考而做的二次实现. 并重排版一下他的总结, 在此做个记录好回顾. 本文没有原文那么详尽, 建议阅读原文.
前述
java-
1.8
maven-
3
rxjava-
2.2.3
io.reactivex.Single
: a flow of exactly 1 item or an error,io.reactivex.Completable
: a flow without items but only a completion or error signal,io.reactivex.Maybe
: a flow with no items, exactly one item or an error.
像
Observale
和Flowable
都是用来发射数据流的(0..N), 然而出现了三个只有1个数据的基类: (https://www.jianshu.com/p/66a55abbadef)
Single
- 只发射一条单一的数据,或者一条异常通知, 不能发射完成通知,其中数据与通知只能发射一个。Compoletable
- 只发射一条完成通知,或者一条异常通知,不能发射数据,其中完成通知与异常通知只能发射一个Maybe
- 可发射一条单一的数据,以及发射一条完成通知,或者一条异常通知,其中完成通知和异常通知只能发射一个,发射数据只能在发射完成通知或者异常通知之前,否则发射数据无效。
示例(Single
简单使用)
Single
操作实现类 - HelloSingle.java
package yag;
import io.reactivex.Single;
import io.reactivex.SingleObserver;
import io.reactivex.SingleOnSubscribe;
import io.reactivex.disposables.Disposable;
public class HelloSingle {
public void helloSingle(){
Single
.create((SingleOnSubscribe<Integer>) singleEmitter -> {
// 发射
singleEmitter.onSuccess(1);
singleEmitter.onSuccess(2);
})
.subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onSuccess(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
});
}
}
执行者 - Runner.java
package yag;
public class Runner {
public static void main(String[] args){
HelloSingle helloSingle = new HelloSingle();
helloSingle.helloSingle();
}
}
执行结果
1
Process finished with exit code 0
只接收第一条信息.
小结
发送异常信息 - 使用onError()
方法.
发射器接口SingleEmitter
:
1、方法void onSuccess(T t)
用来发射一条单一的数据,且一次订阅只能调用一次,不同于Observale的发射器ObservableEmitter中的void onNext(@NonNull T value)方法,在一次订阅中,可以多次调用多次发射。
2、方法void onError(Throwable t)
等同于ObservableEmitter
中的void onError(@NonNull Throwable error)
用来发射一条错误通知
3、SingleEmitter
中没有用来发射完成通知的void onComplete()
方法。
方法onSuccess
与onError
只可调用一个,若先调用onError
则会导致onSuccess
无效,若先调用onSuccess
,则会抛出io.reactivex.exceptions.UndeliverableException
异常。
观察者SingleObserver
:
方法void onSubscribe(Disposable d)
等同于Observer中的void onSubscribe(Disposable d)
。
方法void onSuccess(T t)
类似于Observer
中的onNext(T t)
用来接收Single发的数据。
方法void onError(Throwable e)
等同于Observer中的void onError(Throwable e)
用来处理异常通知。
没有用来处理完成通知的方法void onComplete()
示例(Completable
简单使用)
Completable
操作实现类 - HelloCompletable.java
package yag;
import io.reactivex.Completable;
import io.reactivex.CompletableEmitter;
import io.reactivex.CompletableObserver;
import io.reactivex.CompletableOnSubscribe;
import io.reactivex.disposables.Disposable;
public class HelloCompletable {
public void helloCompletable(){
Completable
.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter completableEmitter) throws Exception {
completableEmitter.onComplete();
}
})
.subscribe(new CompletableObserver() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onComplete() {
System.out.println("执行完成");
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
});
}
}
执行者 - Runner.java
package yag;
public class Runner {
public static void main(String[] args){
HelloCompletable helloCompletable = new HelloCompletable();
helloCompletable.helloCompletable();
}
}
执行结果
执行完成
Process finished with exit code 0
补充
onError()
(发射异常信息), 由CompletableObserver
的onError()
.
示例( Maybe
简单使用)
Maybe
操作实现类 - HelloMaybe.java
package yag;
import io.reactivex.Maybe;
import io.reactivex.MaybeEmitter;
import io.reactivex.MaybeObserver;
import io.reactivex.MaybeOnSubscribe;
import io.reactivex.disposables.Disposable;
public class HelloMaybe {
public void helloMaybe(){
Maybe
.create(new MaybeOnSubscribe<Integer>() {
@Override
public void subscribe(MaybeEmitter<Integer> maybeEmitter) {
maybeEmitter.onSuccess(1);
maybeEmitter.onComplete();
}
})
.subscribe(new MaybeObserver<Integer>() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onSuccess(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("执行完成");
}
});
}
}
执行者 - Runner.java
package yag;
public class Runner {
public static void main(String[] args){
HelloMaybe helloMaybe = new HelloMaybe();
helloMaybe.helloMaybe();
}
}
执行结果
1
Process finished with exit code 0