4.功能操作符
4.1 delay()
方法预览
public final Observable<T> delay(long delay, TimeUnit unit)
有什么用?
延迟一段时间发送事件
怎么用?
Observable.just(1, 2, 3)
.delay(2, TimeUnit.SECONDS)
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "=======================onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "=======================onNext " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG, "=======================onSubscribe");
}
});
这里延迟了两秒才发送事件,来看看打印结果:
05-22 20:53:43.618 16880-16880/com.example.rxjavademo D/chan: =======================onSubscribe
05-22 20:53:45.620 16880-16906/com.example.rxjavademo D/chan: =======================onNext 1
05-22 20:53:45.621 16880-16906/com.example.rxjavademo D/chan: =======================onNext 2
=======================onNext 3
=======================onSubscribe
从打印结果可以看出onSubscribe回调2秒之后onNext才会回调。
4.2 doOnEach()
方法预览
public final Observable<T> doOnEach(final Consumer<? super Notification<T>> onNotification)
有什么用?
Observable每发送一件事件之前都会先回调这个方法。
怎么用?
Observable.create(new ObservableOnSubscribe<Integer>(){
@Override
public void subscribe(ObservableEmitter<Integer> e)throws Exception{
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.doOnEach(new Consumer<Notification<Integer>>(){
@Override
public void accept(Notification<Integer> integerNotification) throws Exception{
Log.d(TAG, "==onSubscribe");
}
})
打印结果:
05-23 09:07:05.547 19867-19867/? D/chan: ==================onSubscribe
==================doOnEach 1
==================onNext 1
==================doOnEach 2
==================onNext 2
==================doOnEach 3
==================onNext 3
==================doOnEach null
==================onComplete
从结果可以看出每发送一个事件之前都会回调doOnEach方法,并且可以取出onNext()发送的值。
4.3 doOnNext()
方法预览
public final Observable<T> doOnNext(Consumer<? super T> onNext)
有什么用?
Observable每发送onNext()之前都会先回调这个方法。
怎么用?
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.doOnNext(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "==================doOnNext " + integer);
}
})
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印结果:
05-23 09:09:36.769 20020-20020/com.example.rxjavademo D/chan: ==================onSubscribe
==================doOnNext 1
==================onNext 1
==================doOnNext 2
==================onNext 2
==================doOnNext 3
==================onNext 3
==================onComplete
4.4 doAfterNext()
方法预览
public final Observable<T> doAfterNext(Consumer<? super T> onAfterNext)
**有什么用?
Observable每发送onNext()之后都会回调这个方法。
怎么用?
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.doAfterNext(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "==================doAfterNext " + integer);
}
})
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印结果:
05-23 09:15:49.215 20432-20432/com.example.rxjavademo D/chan: ==================onSubscribe
==================onNext 1
==================doAfterNext 1
==================onNext 2
==================doAfterNext 2
==================onNext 3
==================doAfterNext 3
==================onComplete
4.5 doOnComplete()
方法预览
public final Observable<T> doOnComplete(Action onComplete)
有什么用?
Observable每发送onComplete()之前都会回调这个方法。
怎么用?
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "==================doOnComplete ");
}
})
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印结果:
05-23 09:32:18.031 20751-20751/? D/chan: ==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================doOnComplete
==================onComplete
4.6 doOnError()
方法预览
public final Observable<T> doOnError(Consumer<? super Throwable> onError)
有什么用?
Observable每发送onError()之前都会回调这个方法。
怎么用?
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new NullPointerException());
}
})
.doOnError(new Consumer < Throwable > () {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "==================doOnError " + throwable);
}
})
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印结果:
05-23 09:35:04.150 21051-21051/? D/chan: ==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================doOnError java.lang.NullPointerException
==================onError
4.7 doOnSubscribe()
方法预览
public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe)
有什么用?
Observable每发送onSubscribe()之前都会回调这个方法。
怎么用?
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.doOnSubscribe(new Consumer < Disposable > () {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d(TAG, "==================doOnSubscribe ");
}
})
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印结果:
05-23 09:39:25.778 21245-21245/? D/chan: ==================doOnSubscribe
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================onComplete
4.8 doOnDispose()
方法预览
public finalObservable<T> doOnDispose(Action onDispose)
有什么用?
当调用Disposable的dispose()之后回调该方法。
怎么用?
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "==================doOnDispose ");
}
})
.subscribe(new Observer < Integer > () {
private Disposable d;
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
this.d = d;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
d.dispose();
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印结果:
05-23 09:55:48.122 22023-22023/com.example.rxjavademo D/chan: ==================onSubscribe
==================onNext 1
==================doOnDispose
4.9 doOnLifecycle()
方法预览
public final Observable<T> doOnLifecycle(final Consumer<? super Disposable> onSubscribe, final Action onDispose)
有什么用?
在回调onSubscribe之前回调该方法的第一个参数的回调方法,可以使用该回调方法决定是否取消订阅。
怎么用?
doOnLifecycle()第二个参数的回调方法的作用与doOnDisposable()是一样的,现在用下面的例子来讲解;
Observable.create(new ObservableOnSubscribe<Integer>(){
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception{
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.doOnLifecycle(new Consumer<Disposable>(){
@Override
public void accept(Disposable disposable) throws Exception{
Log.d(TAG, "==doOnLifecycle accept");
}
},new Action(){
@Override
public void run() throws Exception{
Log.d(TAG, "==doOnLifecycle Action");
}
})
.doOnDispose(
new Action(){
@Override
public void run() throws Exception{
Log.d(TAG, "==doOnDispose Action");
}
})
.subscribe(new Observer<Integer>(){
private Disposable d;
@Override
public void onSubscribe(Disposable d){
Log.d(TAG, "==onSubscribe");
this.d = d;
}
@Override
public void onNext(Integer integer){
Log.d(TAG, "==onNext" + integer);
d.dispose();
}
@Override
public void onError(Throwable e){
Log.d(TAG, "==onEror");
}
@Override
public void onComplete(){
Log.d(TAG, "==onComplete");
}
});
打印结果:
05-23 10:20:36.345 23922-23922/? D/chan: ==================doOnLifecycle accept
==================onSubscribe
==================onNext 1
==================doOnDispose Action
==================doOnLifecycle Action
可以看到当在onNext()方法进行取消订阅操作后,doOnDispose()和doOnLifecycle()都会被回调。
如果使用doOnLifecycle进行取消订阅,来看打印结果:
05-23 10:32:20.014 24652-24652/com.example.rxjavademo D/chan: ==================doOnLifecycle accept
==================onSubscribe
可以发现doOnDispose Action和doOnLifecycle Action都没有被回调。
4.10 doOnTerminate() & doAfterTerminate()
方法预览
public final Observable<T> doOnTerminate(final Action onTerminate)
public final Observable<T> doAfterminate(Action onFinally)
有什么用?
doOnTerminate 是在onError或者onComplete 发送之前回调,而doAfterTerminate则是onError或者onComplete发送之后回调。
怎么用?
observable.create(new ObservableOnSubscribe<Integer>(){
@Override
public void subscribe(ObservableEmitter<Integer> e)throws Exception{
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new NullPointerException());
e.onComplete();
}
})
.doOnTerminate(new Action(){
@Override
public void run() throws Exception{
Log.d(TAG, "==doOnTerminate");
}
})
.subscribe(new Observer<Integer>(){
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印结果:
05-23 10:00:39.503 22398-22398/com.example.rxjavademo D/chan: ==================onSubscribe
==================onNext 1
==================onNext 2
05-23 10:00:39.504 22398-22398/com.example.rxjavademo D/chan: ==================onNext 3
==================doOnTerminate
==================onComplete
doAfterTerminate 也是差不多,不做赘述。
4.11 doFinally()
方法预览
public final Observable<T> doFinally(Action onFinally)
有什么用?
在所有时间发送之前回调该方法。
怎么用?
这里你可能会有个问题,那就是doFinally()和doAfterTerminate()到底有什么区别?
区别就是在于取消订阅,如果取消订阅之后doAfterTerminate()就不会被回调,而doFinally()无论如何都会被回调,且都会在事件序列的最后。
现在用以下例子来说明:
Observable.create(new ObservableOnSubscribe<Integer>(){
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception{
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.doFinally(new Action(){
@Override
public void run() throws Exception{
Log.d(TAG, "==doFinally");
}
})
.doOnDispose(new Action(){
@Override
public void run() throws Exception{
Log.d(TAG, "==doOnDispose");
}
})
.doAfterTerminate(new Action(){
@Override
public void run() throws Exception{
Log.d(TAG, "==doAfterTerminate");
}
})
.subscribe(new Observer<Integer>(){
private Disposable d;
@Override
public void onSubscribe(Disposable d){
Log.d(TAG, "==onSubscribe");
this.d = d;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
d.dispose();
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印结果:
05-23 10:10:10.469 23196-23196/? D/chan: ==================onSubscribe
05-23 10:10:10.470 23196-23196/? D/chan: ==================onNext 1
==================doOnDispose
==================doFinally
可以看到如果调用了dispose()方法,doAfterTerminate()不会被回调。
现在试试把dispose()注释掉看看,打印结果:
05-23 10:13:34.537 23439-23439/com.example.rxjavademo D/chan: ==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================onComplete
==================doAfterTerminate
==================doFinally
doAfterTerminate 已经成功回调,doFinally() 还是会在事件序列的最后。
4.12 onErrorReturn()
方法预览
public final Observable<T> onErrorReturn(Function<? super Throwable, ? extends T> valueSupplier)
有什么用?
当接收到一个onError()事件之后回调,返回的值会回调onNext()方法,并正常结束该事件序列。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new NullPointerException());
}
})
.onErrorReturn(new Function<Throwable, Integer>(){
@Override
public integer apply(Throwable throwable) throws Exception{
Log.d(TAG, "==onErrorReturn" + throwable);
return 404;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印结果:
05-23 18:35:18.175 19239-19239/? D/chan: ==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================onErrorReturn java.lang.NullPointerException
==================onNext 404
==================onComplete
4.13 onErrorResumeNext()
方法预览
public final Observable<T> onErrorResumeNext(Function<? super Throwable, ? extends ObservableSource<? extends T>> resumeFunction)
有什么用?
当接收到onError()事件时,返回一个新的Observable, 并正常结束事件序列。
怎么用?
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new NullPointerException());
}
})
.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>(){
@Override
public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception{
Log.d(TAG, "==onErrorResumeNext" + throwable);
return Observable.just(4, 5, 6);
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印结果:
05-23 18:43:10.910 26469-26469/? D/chan: ==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================onErrorResumeNext java.lang.NullPointerException
==================onNext 4
==================onNext 5
==================onNext 6
==================onComplete
4.14 onExceptionResumeNext()
方法预览
public final Observable<T> onExceptionResumeNext(final ObservableSource<? extends T> next)
有什么用?
与onErrorResumeNext()作用基本一致,但是这个方法只能捕捉Exception。
怎么用?
先来试试onExceptionResumeNext()是否捕捉Error。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new Error("404"));
}
})
.onExceptionResumeNext(new Observable<Integer>(){
@Override
protected void subscribeActual(Observer<? super Integer> observer){
observer.onNext(333);
observer.onComplete();
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印结果:
05-23 22:23:08.873 1062-1062/com.example.louder.rxjavademo D/chan: ==================onSubscribe
05-23 22:23:08.874 1062-1062/com.example.louder.rxjavademo D/chan: ==================onNext 1
==================onNext 2
==================onNext 3
==================onError
从打印结果可以看到,观察者收到onError()事件,证明onErrorResumeNext()不能捕捉Error事件。
将被观察者的e.onError(new Error("404"))改为e.onError(new Exception("404")),现在看看是否能捕捉Exception事件:
05-23 22:32:14.563 10487-10487/com.example.louder.rxjavademo D/chan: ==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================onNext 333
==================onComplete
从打印结果可知,这个方法成功捕获Exception事件。
4.15 retry()
方法预览
public final Observable<T> retry(long times)
......
有什么用?
如果出现错误事件,则会重新发送所有事件序列,times是代表重新发的次数。
怎么用?
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new Exception("404"));
}
})
.retry(2)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印结果:
05-23 22:46:18.537 22239-22239/com.example.louder.rxjavademo D/chan: ==================onSubscribe
05-23 22:46:18.538 22239-22239/com.example.louder.rxjavademo D/chan: ==================onNext 1
==================onNext 2
==================onNext 3
==================onNext 1
==================onNext 2
==================onNext 3
==================onNext 1
==================onNext 2
==================onNext 3
==================onError
4.16 retryUntil()
方法预览
public final Observable<T> retryUntil(final BooleanSupplier stop)
有什么用?
出现错误事件后,可以通过此方法判断是否继续发送事件。
怎么用?
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new Exception("404"));
}
})
.retryUntil(new BooleanSupplier(){
@Override
public boolean getAsBoolean() throws Exception{
if(i == 6){
return true;
}
return false;
}
})
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
i += integer;
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
4.17 retryWhen()
方法预览
public final void safeSubscribe(Observer<? super T> s)
有什么用?
当被观察者接收到异常或者错误事件时会回调该方法,这个方法会返回一个新的被观察者。如果返回的被观察者发送Error事件则之前的被观察者不会继续发送事件,如果发送正常事件则之前的被观察者会继续不断重试发送事件。
怎么用?
Observable.create(new ObservableOnSubscribe<String>(){
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception{
e.onNext("chan");
e.onNext("ze");
e.onNext("de");
e.onError(new Exception("404"));
e.onNext("haha");
}
})
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>(){
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable)throws Exception{
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>(){
if(!throwable.toString().equals("java.lang.Exception:404")){
return Observable.just("可以忽略的异常");
}else{
return Observable.error(new Throwable("终止"));
}
}
});
}
})
.subscribe(new Observer < String > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(String s) {
Log.d(TAG, "==================onNext " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError " + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印结果:
05-24 09:13:25.622 28372-28372/com.example.rxjavademo D/chan: ==================onSubscribe
05-24 09:13:25.623 28372-28372/com.example.rxjavademo D/chan: ==================onNext chan
==================onNext ze
==================onNext de
05-24 09:13:25.624 28372-28372/com.example.rxjavademo D/chan: ==================onError java.lang.Throwable: 终止
将onError(new Exception("404"))改为onError(new Exception("303"))看看打印结果:
==================onNext chan
05-24 09:54:08.653 29694-29694/? D/chan: ==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
......
从结果可以看出,会不断重复发送消息。
4.18 repeat()
方法预览
public final Observable<T> repeat(long times)
....
有什么用?
重复发送被观察者的事件,times为发送次数。
怎么用?
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.repeat(2)
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "===================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "===================onNext " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG, "===================onComplete ");
}
});
打印结果:
05-24 11:33:29.565 8544-8544/com.example.rxjavademo D/chan: ===================onSubscribe
===================onNext 1
===================onNext 2
===================onNext 3
===================onNext 1
===================onNext 2
===================onNext 3
05-24 11:33:29.565 8544-8544/com.example.rxjavademo D/chan: ===================onComplete
从结果可以看出,该事件发送了两次。
4.19 repeatWhen()
方法预览
public final Observable<T> repeatWhen(final Function<? super Observable<Object>, ? extends ObservableSource<?>> handler)
有什么用?
这个方法可以会返回一个新的观察者设定一定逻辑来决定是否重复发送事件。
怎么用?
这里分三个情况,如果新的观察者返回onComplete 或者onError 事件,则旧的被观察者不会继续发送事件。如果被观察者返回其他事件,则会重复发送事件。
现在试验发送onComplete 事件,代码如下:
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>(){
@Override
public ObservableSource<?> apply(Observable <Object> objectObservable) throws Exception{
return Observable.empty();
//return Observable.error(new Exception("404"));
//return Observable.just(4);null;
}
})
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "===================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "===================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "===================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "===================onComplete ");
}
});
打印结果:
05-24 11:44:33.486 9379-9379/com.example.rxjavademo D/chan: ===================onSubscribe
05-24 11:44:33.487 9379-9379/com.example.rxjavademo D/chan: ===================onComplete
下面直接看看发送onError事件和其他事件的打印结果。
发送onError打印结果:
05-24 11:46:29.507 9561-9561/com.example.rxjavademo D/chan: ===================onSubscribe
05-24 11:46:29.508 9561-9561/com.example.rxjavademo D/chan: ===================onError
发送其他事件的打印结果:
05-24 11:48:35.844 9752-9752/com.example.rxjavademo D/chan: ===================onSubscribe
===================onNext 1
===================onNext 2
===================onNext 3
===================onComplete
4.20 subscribeOn()
方法预览
public final Observable<T> subscribeOn(Scheduler scheduler)
有什么用?
指定被观察者的线程,要注意如果多次调用此方法,只有第一次生效。
怎么用?
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "======================onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "======================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "======================onError");
}
@Override
public void onComplete() {
Log.d(TAG, "======================onComplete");
}
});
打印结果:
05-26 10:43:26.964 22530-22530/com.example.rxjavademo D/chan: ======================onSubscribe
05-26 10:43:26.966 22530-22569/com.example.rxjavademo D/chan: =========================currentThread name: RxNewThreadScheduler-1
05-26 10:43:26.967 22530-22569/com.example.rxjavademo D/chan: ======================onNext 1
======================onNext 2
======================onNext 3
======================onComplete
可以看到打印结果被观察者是在一条新的线程。
4.21 ObserveOn()
方法预览
public final Observable<T> observeOn(Scheduler scheduler)
有什么用?
指定观察者的线程,每指定一次就会生效一次。
怎么用?
Observable.just(1, 2, 3)
.observeOn(Schedulers.newThread())
.flatMap(new Function < Integer, ObservableSource < String >> () {
@Override
public ObservableSource < String > apply(Integer integer) throws Exception {
Log.d(TAG, "======================flatMap Thread name " + Thread.currentThread().getName());
return Observable.just("chan" + integer);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer < String > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "======================onSubscribe");
}
@Override
public void onNext(String s) {
Log.d(TAG, "======================onNext Thread name " + Thread.currentThread().getName());
Log.d(TAG, "======================onNext " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "======================onError");
}
@Override
public void onComplete() {
Log.d(TAG, "======================onComplete");
}
});
打印结果:
05-26 10:58:04.593 25717-25717/com.example.rxjavademo D/chan: ======================onSubscribe
05-26 10:58:04.594 25717-25753/com.example.rxjavademo D/chan: ======================flatMap Thread name RxNewThreadScheduler-1
05-26 10:58:04.595 25717-25753/com.example.rxjavademo D/chan: ======================flatMap Thread name RxNewThreadScheduler-1
======================flatMap Thread name RxNewThreadScheduler-1
05-26 10:58:04.617 25717-25717/com.example.rxjavademo D/chan: ======================onNext Thread name main
======================onNext chan1
======================onNext Thread name main
======================onNext chan2
======================onNext Thread name main
======================onNext chan3
05-26 10:58:04.618 25717-25717/com.example.rxjavademo D/chan: ======================onComplete
从打印结果可以知道,observeOn成功切换了线程。
下表总结了RxJava中的调度器:
调度器 | 作用 |
---|---|
Schedulers.computation() | 用于使用计算任务,如事件循环和回调处理。 |
Schedulers.immediate() | 当前线程 |
Schedulers.io() | 用于IO线程,如果异步阻塞IO操作 |
Schedulers.newThread() | 创建一个新的线程 |
AndroidSchedulers.mainThread() | Android的UI线程,用于操作UI。 |