zoukankan      html  css  js  c++  java
  • Android RxJava使用介绍(四) RxJava的操作符

    本篇文章继续介绍下面类型的操作符

    • Combining Observables(Observable的组合操作符)
    • Error Handling Operators(Observable的错误处理操作符)

    Combining Observables(Observable的组合操作符)

    combineLatest操作符

    combineLatest操作符把两个Observable产生的结果进行合并,合并的结果组成一个新的Observable。

    这两个Observable中随意一个Observable产生的结果,都和还有一个Observable最后产生的结果,依照一定的规则进行合并。

    流程图例如以下:
    这里写图片描写叙述
    调用样例例如以下:

    //产生0,5,10,15,20数列
            Observable<Long> observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
                    .map(new Func1<Long, Long>() {
                        @Override
                        public Long call(Long aLong) {
                            return aLong * 5;
                        }
                    }).take(5);
    
            //产生0,10,20,30,40数列
            Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
                    .map(new Func1<Long, Long>() {
                        @Override
                        public Long call(Long aLong) {
                            return aLong * 10;
                        }
                    }).take(5);
    
    
            Observable.combineLatest(observable1, observable2, new Func2<Long, Long, Long>() {
                @Override
                public Long call(Long aLong, Long aLong2) {
                    return aLong+aLong2;
                }
            }).subscribe(new Subscriber<Long>() {
                @Override
                public void onCompleted() {
                    System.out.println("Sequence complete.");
                }
    
                @Override
                public void onError(Throwable e) {
                    System.err.println("Error: " + e.getMessage());
                }
    
                @Override
                public void onNext(Long aLong) {
                    System.out.println("Next: " + aLong);
                }
            });

    执行结果例如以下:
    Next: 0
    Next: 5
    Next: 15
    Next: 20
    Next: 30
    Next: 35
    Next: 45
    Next: 50
    Next: 60
    Sequence complete.

    join操作符

    join操作符把相似于combineLatest操作符,也是两个Observable产生的结果进行合并,合并的结果组成一个新的Observable。可是join操作符能够控制每一个Observable产生结果的生命周期,在每一个结果的生命周期内,能够与还有一个Observable产生的结果依照一定的规则进行合并。流程图例如以下:
    这里写图片描写叙述

    join方法的使用方法例如以下:
    observableA.join(observableB,
    observableA产生结果生命周期控制函数,
    observableB产生结果生命周期控制函数。
    observableA产生的结果与observableB产生的结果的合并规则)

    调用样例例如以下:

    //产生0,5,10,15,20数列
            Observable<Long> observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
                    .map(new Func1<Long, Long>() {
                        @Override
                        public Long call(Long aLong) {
                            return aLong * 5;
                        }
                    }).take(5);
    
            //产生0,10,20,30,40数列
            Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
                    .map(new Func1<Long, Long>() {
                        @Override
                        public Long call(Long aLong) {
                            return aLong * 10;
                        }
                    }).take(5);
    
            observable1.join(observable2, new Func1<Long, Observable<Long>>() {
                @Override
                public Observable<Long> call(Long aLong) {
                    //使Observable延迟600毫秒执行
                    return Observable.just(aLong).delay(600, TimeUnit.MILLISECONDS);
                }
            }, new Func1<Long, Observable<Long>>() {
                @Override
                public Observable<Long> call(Long aLong) {
                    //使Observable延迟600毫秒执行
                    return Observable.just(aLong).delay(600, TimeUnit.MILLISECONDS);
                }
            }, new Func2<Long, Long, Long>() {
                @Override
                public Long call(Long aLong, Long aLong2) {
                    return aLong + aLong2;
                }
            }).subscribe(new Subscriber<Long>() {
                @Override
                public void onCompleted() {
                    System.out.println("Sequence complete.");
                }
    
                @Override
                public void onError(Throwable e) {
                    System.err.println("Error: " + e.getMessage());
                }
    
                @Override
                public void onNext(Long aLong) {
                    System.out.println("Next: " + aLong);
                }
            });

    执行结果例如以下:
    Next: 0
    Next: 5
    Next: 15
    Next: 20
    Next: 30
    Next: 35
    Next: 45
    Next: 50
    Next: 60
    Sequence complete.

    groupJoin操作符

    groupJoin操作符很相似于join操作符,差别在于join操作符中第四个參数的传入函数不一致。其流程图例如以下:
    这里写图片描写叙述

    调用样例例如以下:

    //产生0,5,10,15,20数列
            Observable<Long> observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
                    .map(new Func1<Long, Long>() {
                        @Override
                        public Long call(Long aLong) {
                            return aLong * 5;
                        }
                    }).take(5);
    
            //产生0,10,20,30,40数列
            Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
                    .map(new Func1<Long, Long>() {
                        @Override
                        public Long call(Long aLong) {
                            return aLong * 10;
                        }
                    }).take(5);
    
            observable1.groupJoin(observable2, new Func1<Long, Observable<Long>>() {
                @Override
                public Observable<Long> call(Long aLong) {
                    return Observable.just(aLong).delay(1600, TimeUnit.MILLISECONDS);
                }
            }, new Func1<Long, Observable<Long>>() {
                @Override
                public Observable<Long> call(Long aLong) {
                    return Observable.just(aLong).delay(600, TimeUnit.MILLISECONDS);
                }
            }, new Func2<Long, Observable<Long>, Observable<Long>>() {
                @Override
                public Observable<Long> call(Long aLong, Observable<Long> observable) {
                    return observable.map(new Func1<Long, Long>() {
                        @Override
                        public Long call(Long aLong2) {
                            return aLong + aLong2;
                        }
                    });
                }
            }).subscribe(new Subscriber<Observable<Long>>() {
                @Override
                public void onCompleted() {
                    System.out.println("Sequence complete.");
                }
    
                @Override
                public void onError(Throwable e) {
                    System.err.println("Error: " + e.getMessage());
                }
    
                @Override
                public void onNext(Observable<Long> observable) {
                    observable.subscribe(new Subscriber<Long>() {
                        @Override
                        public void onCompleted() {
    
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onNext(Long aLong) {
                            System.out.println("Next: " + aLong);
                        }
                    });
                }
            });

    执行结果例如以下:
    Next: 0
    Next: 5
    Next: 10
    Next: 15
    Next: 20
    Next: 25
    Next: 30
    Next: 35
    Next: 40
    Next: 45
    Next: 50
    Next: 60
    Next: 55
    Sequence complete.

    merge操作符

    merge操作符是依照两个Observable提交结果的时间顺序,对Observable进行合并,如ObservableA每隔500毫秒产生数据为0,5,10,15,20。而ObservableB每隔500毫秒产生数据0,10,20,30,40。当中第一个数据延迟500毫秒产生,最后合并结果为:0,0,5,10,10,20,15,30,20,40;其流程图例如以下:
    这里写图片描写叙述

    调用样例例如以下:

    //产生0,5,10,15,20数列
            Observable<Long> observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
                    .map(new Func1<Long, Long>() {
                        @Override
                        public Long call(Long aLong) {
                            return aLong * 5;
                        }
                    }).take(5);
    
            //产生0,10,20,30,40数列
            Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
                    .map(new Func1<Long, Long>() {
                        @Override
                        public Long call(Long aLong) {
                            return aLong * 10;
                        }
                    }).take(5);
    
            Observable.merge(observable1, observable2)
                    .subscribe(new Subscriber<Long>() {
                        @Override
                        public void onCompleted() {
                            System.out.println("Sequence complete.");
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            System.err.println("Error: " + e.getMessage());
                        }
    
                        @Override
                        public void onNext(Long aLong) {
                            System.out.println("Next:" + aLong);
                        }
                    });

    执行结果例如以下:
    Next:0
    Next:0
    Next:5
    Next:10
    Next:10
    Next:20
    Next:15
    Next:30
    Next:20
    Next:40
    Sequence complete.

    mergeDelayError操作符

    从merge操作符的流程图能够看出,一旦合并的某一个Observable中出现错误,就会立即停止合并,并对订阅者回调执行onError方法,而mergeDelayError操作符会把错误放到全部结果都合并完毕之后才执行,其流程图例如以下:
    这里写图片描写叙述

    调用样例例如以下:

    //产生0,5,10数列,最后会产生一个错误
            Observable<Long> errorObservable = Observable.error(new Exception("this is end!"));
            Observable < Long > observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
                    .map(new Func1<Long, Long>() {
                        @Override
                        public Long call(Long aLong) {
                            return aLong * 5;
                        }
                    }).take(3).mergeWith(errorObservable.delay(3500, TimeUnit.MILLISECONDS));
    
            //产生0,10,20,30,40数列
            Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
                    .map(new Func1<Long, Long>() {
                        @Override
                        public Long call(Long aLong) {
                            return aLong * 10;
                        }
                    }).take(5);
    
            Observable.mergeDelayError(observable1, observable2)
                    .subscribe(new Subscriber<Long>() {
                        @Override
                        public void onCompleted() {
                            System.out.println("Sequence complete.");
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            System.err.println("Error: " + e.getMessage());
                        }
    
                        @Override
                        public void onNext(Long aLong) {
                            System.out.println("Next:" + aLong);
                        }
                    });

    执行结果例如以下:
    Next:0
    Next:0
    Next:5
    Next:10
    Next:10
    Next:20
    Next:30
    Next:40
    Error: this is end!

    startWith操作符

    startWith操作符是在源Observable提交结果之前。插入指定的某些数据。其流程图例如以下:
    这里写图片描写叙述

    调用样例例如以下:

    Observable.just(10,20,30).startWith(2, 3, 4).subscribe(new Subscriber<Integer>() {
                @Override
                public void onCompleted() {
                    System.out.println("Sequence complete.");
                }
    
                @Override
                public void onError(Throwable e) {
                    System.err.println("Error: " + e.getMessage());
                }
    
                @Override
                public void onNext(Integer value) {
                    System.out.println("Next:" + value);
                }
            });

    执行结果例如以下:
    Next:2
    Next:3
    Next:4
    Next:10
    Next:20
    Next:30
    Sequence complete.

    switchOnNext操作符

    switchOnNext操作符是把一组Observable转换成一个Observable,转换规则为:对于这组Observable中的每一个Observable所产生的结果,假设在同一个时间内存在两个或多个Observable提交的结果,仅仅取最后一个Observable提交的结果给订阅者,其流程图例如以下:
    这里写图片描写叙述

    调用样例例如以下:

    //每隔500毫秒产生一个observable
            Observable<Observable<Long>> observable = Observable.timer(0, 500, TimeUnit.MILLISECONDS).map(new Func1<Long, Observable<Long>>() {
                @Override
                public Observable<Long> call(Long aLong) {
                    //每隔200毫秒产生一组数据(0,10,20,30,40)
                    return Observable.timer(0, 200, TimeUnit.MILLISECONDS).map(new Func1<Long, Long>() {
                        @Override
                        public Long call(Long aLong) {
                            return aLong * 10;
                        }
                    }).take(5);
                }
            }).take(2);
    
            Observable.switchOnNext(observable).subscribe(new Subscriber<Long>() {
                @Override
                public void onCompleted() {
                    System.out.println("Sequence complete.");
                }
    
                @Override
                public void onError(Throwable e) {
                    System.err.println("Error: " + e.getMessage());
                }
    
                @Override
                public void onNext(Long aLong) {
                    System.out.println("Next:" + aLong);
                }
            });

    执行结果例如以下:
    Next:0
    Next:10
    Next:20
    Next:0
    Next:10
    Next:20
    Next:30
    Next:40
    Sequence complete.

    zip操作符

    zip操作符是把两个observable提交的结果,严格依照顺序进行合并,其流程图例如以下:
    这里写图片描写叙述

    调用样例例如以下:

    Observable<Integer> observable1 = Observable.just(10,20,30);
            Observable<Integer> observable2 = Observable.just(4, 8, 12, 16);
            Observable.zip(observable1, observable2, new Func2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer integer, Integer integer2) {
                    return integer + integer2;
                }
            }).subscribe(new Subscriber<Integer>() {
                @Override
                public void onCompleted() {
                    System.out.println("Sequence complete.");
                }
    
                @Override
                public void onError(Throwable e) {
                    System.err.println("Error: " + e.getMessage());
                }
    
                @Override
                public void onNext(Integer value) {
                    System.out.println("Next:" + value);
                }
            });

    执行结果例如以下:
    Next:14
    Next:28
    Next:42
    Sequence complete.

    Error Handling Operators(Observable的错误处理操作符)

    onErrorReturn操作符

    onErrorReturn操作符是在Observable错误发生或异常的时候(即将回调oError方法时),拦截错误并执行指定的逻辑,返回一个跟源Observable同样类型的结果。最后回调订阅者的onComplete方法。其流程图例如以下:
    这里写图片描写叙述
    调用样例例如以下:

    Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    if (subscriber.isUnsubscribed()) return;
                    //循环输出数字
                    try {
                        for (int i = 0; i < 10; i++) {
                            if (i == 4) {
                                throw new Exception("this is number 4 error!

    "); } subscriber.onNext(i); } subscriber.onCompleted(); } catch (Exception e) { subscriber.onError(e); } } }); observable.onErrorReturn(new Func1<Throwable, Integer>() { @Override public Integer call(Throwable throwable) { return 1004; } }).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("Sequence complete."); } @Override public void onError(Throwable e) { System.err.println("Error: " + e.getMessage()); } @Override public void onNext(Integer value) { System.out.println("Next:" + value); } });

    执行结果例如以下:
    Next:0
    Next:1
    Next:2
    Next:3
    Next:1004
    Sequence complete.

    onErrorResumeNext操作符

    onErrorResumeNext操作符跟onErrorReturn相似,仅仅只是onErrorReturn仅仅能在错误或异常发生时仅仅返回一个和源Observable同样类型的结果,而onErrorResumeNext操作符是在错误或异常发生时返回一个Observable,也就是说能够返回多个和源Observable同样类型的结果,其流程图例如以下:
    这里写图片描写叙述
    调用样例例如以下:

    Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<?

    super Integer> subscriber) { if (subscriber.isUnsubscribed()) return; //循环输出数字 try { for (int i = 0; i < 10; i++) { if (i == 4) { throw new Exception("this is number 4 error。"); } subscriber.onNext(i); } subscriber.onCompleted(); } catch (Exception e) { subscriber.onError(e); } } }); observable.onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() { @Override public Observable<? extends Integer> call(Throwable throwable) { return Observable.just(100,101, 102); } }).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("Sequence complete."); } @Override public void onError(Throwable e) { System.err.println("Error: " + e.getMessage()); } @Override public void onNext(Integer value) { System.out.println("Next:" + value); } });

    执行结果例如以下:
    Next:0
    Next:1
    Next:2
    Next:3
    Next:100
    Next:101
    Next:102
    Sequence complete.

    onExceptionResumeNext操作符

    onExceptionResumeNext操作符和onErrorResumeNext操作符相似。不同的地方在于onErrorResumeNext操作符是当Observable错误发生或异常时触发,而onExceptionResumeNext是当Observable发生异常时才触发。

    这里要普及一个概念就是,java的异常分为错误(error)和异常(exception)两种。它们都是继承于Throwable类。

    错误(error)通常是比較严重的系统问题,比方我们常常遇到的OutOfMemoryError、StackOverflowError等都是错误。错误一般继承于Error类,而Error类又继承于Throwable类,假设须要捕获错误,须要使用try..catch(Error e)或者try..catch(Throwable e)句式。

    使用try..catch(Exception e)句式无法捕获错误

    异常(Exception)也是继承于Throwable类。通常是依据实际处理业务抛出的异常。分为执行时异常(RuntimeException)和普通异常。普通异常直接继承于Exception类。假设方法内部没有通过try..catch句式进行处理。必须通过throws关键字把异常抛出外部进行处理(即checked异常);而执行时异常继承于RuntimeException类,假设方法内部没有通过try..catch句式进行处理,不须要显式通过throws关键字抛出外部。如IndexOutOfBoundsException、NullPointerException、ClassCastException等都是执行时异常。当然RuntimeException也是继承于Exception类,因此是能够通过try..catch(Exception e)句式进行捕获处理的。
    onExceptionResumeNext流程图例如以下:
    这里写图片描写叙述

    调用样例例如以下:

     Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<?

    super Integer> subscriber) { if (subscriber.isUnsubscribed()) return; //循环输出数字 try { for (int i = 0; i < 10; i++) { if (i == 4) { throw new Exception("this is number 4 error!"); } subscriber.onNext(i); } subscriber.onCompleted(); } catch (Throwable e) { subscriber.onError(e); } } }); observable.onExceptionResumeNext(Observable.just(100, 101, 102)).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("Sequence complete."); } @Override public void onError(Throwable e) { System.err.println("Error: " + e.getMessage()); } @Override public void onNext(Integer value) { System.out.println("Next:" + value); } });

    执行结果例如以下:
    Next:0
    Next:1
    Next:2
    Next:3
    Next:100
    Next:101
    Next:102
    Sequence complete.

    retry操作符

    retry操作符是当Observable错误发生或者异常时,又一次尝试执行Observable的逻辑。假设经过n次又一次尝试执行后仍然出现错误或者异常,则最后回调执行onError方法。当然假设源Observable没有错误或者异常出现,则依照正常流程执行。

    其流程图例如以下:
    这里写图片描写叙述

    调用样例例如以下:

    Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<?

    super Integer> subscriber) { if (subscriber.isUnsubscribed()) return; //循环输出数字 try { for (int i = 0; i < 10; i++) { if (i == 4) { throw new Exception("this is number 4 error!"); } subscriber.onNext(i); } subscriber.onCompleted(); } catch (Throwable e) { subscriber.onError(e); } } }); observable.retry(2).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("Sequence complete."); } @Override public void onError(Throwable e) { System.err.println("Error: " + e.getMessage()); } @Override public void onNext(Integer value) { System.out.println("Next:" + value); } });

    执行结果例如以下:
    Next:0
    Next:1
    Next:2
    Next:3

    Next:0
    Next:1
    Next:2
    Next:3

    Next:0
    Next:1
    Next:2
    Next:3
    Error: this is number 4 error!

    retryWhen操作符

    retryWhen操作符相似于retry操作符,都是在源observable出现错误或者异常时,又一次尝试执行源observable的逻辑,不同在于retryWhen操作符是在源Observable出现错误或者异常时,通过回调第二个Observable来推断是否又一次尝试执行源Observable的逻辑。假设第二个Observable没有错误或者异常出现。则就会又一次尝试执行源Observable的逻辑,否则就会直接回调执行订阅者的onError方法。其流程图例如以下:
    这里写图片描写叙述

    调用样例例如以下:

    Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    System.out.println("subscribing");
                    subscriber.onError(new RuntimeException("always fails"));
                }
            });
    
            observable.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
                @Override
                public Observable<?> call(Observable<? extends Throwable> observable) {
                    return observable.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() {
                        @Override
                        public Integer call(Throwable throwable, Integer integer) {
                            return integer;
                        }
                    }).flatMap(new Func1<Integer, Observable<?

    >>() { @Override public Observable<?

    > call(Integer integer) { System.out.println("delay retry by " + integer + " second(s)"); //每一秒中执行一次 return Observable.timer(integer, TimeUnit.SECONDS); } }); } }).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("Sequence complete."); } @Override public void onError(Throwable e) { System.err.println("Error: " + e.getMessage()); } @Override public void onNext(Integer value) { System.out.println("Next:" + value); } });

    执行结果例如以下:
    subscribing
    delay retry by 1 second(s)
    subscribing
    delay retry by 2 second(s)
    subscribing
    delay retry by 3 second(s)
    subscribing
    Sequence complete.

    好了,先介绍这么多。下回继续介绍其它的操作符。敬请期待!

  • 相关阅读:
    Java实现 LeetCode 807 保持城市天际线 (暴力)
    ASP.NET关于书籍详情和删除的Demo(HttpHandler进行页面静态化[自动生成html网页]+Entity Framework通过类创建数据库+EF删查)
    ASP.NET关于书籍详情和删除的Demo(HttpHandler进行页面静态化[自动生成html网页]+Entity Framework通过类创建数据库+EF删查)...
    ASP.NET关于书籍详情和删除的Demo(HttpHandler进行页面静态化[自动生成html网页]+Entity Framework通过类创建数据库+EF删查)...
    Java实现 LeetCode 806 写字符串需要的行数 (暴力模拟)
    Java实现 LeetCode 806 写字符串需要的行数 (暴力模拟)
    Java实现 LeetCode 806 写字符串需要的行数 (暴力模拟)
    Java实现 LeetCode 805 数组的均值分割 (DFS+分析题)
    .net web api返回结果为json
    httpclient与webapi
  • 原文地址:https://www.cnblogs.com/gavanwanggw/p/7258824.html
Copyright © 2011-2022 走看看