zoukankan      html  css  js  c++  java
  • RxJava2实战---第七章 合并操作符和连接操作符

    RxJava2实战---第七章 合并操作符和连接操作符

    RxJava的合并操作符:

    • startWith():在数据序列的开头增加一项数据。
    • merge:将多个Observable合并为一个。
    • mergeDelayError():合并多个Observable,让没有错误的Observable都完成后再发射错误通知。
    • zip():使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果。
    • combineLatest():当两个Observable中的任何一个发射一个数据时,通知一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后在发射这个函数的结果。
    • **join(): **无论何时,如果一个Observable发射了一个数据项,就需要在另一个Observable发射的数据项定义的时间窗口内,将两个Observable发射的数据合并发射。
    • switchOnNext():将一个发射Observable的Observable转换成另一个Observable,后者发射这些Observable最近发射的数据。

    RxJava的连续操作符,主要是ConnectableObservable所使用的操作符和Observable所使用的操作符。

    • ConnectableObservable.connect():指示一个可连续的Observable开始发射数据项。
    • Observable.publish():将一个Observable转换为一个可连续的Observable.
    • Observable.replay():确保所有的订阅者看到相同的数据序列,即使它们在Observable开始发射数据之后才订阅。
    • ConnectableObservable.reCount():让一个可连续的Observable表现得像一个普通的Observable。

    合并操作符能够同时处理多个被观察者,并发送相应的事件。

    对于连接操作符,有一个很重要的概念connectableObservable。可连接的Observable在被订阅时并不发射数据,只有在它的connect()被调用时才开始发射数据。

    1. 合并操作符

    1.1 startWith

    在数据序列的开头插入一条指定的项

            Observable.just("Hello Java","Hello Kotlin","Hello Android")
                    .startWith("Hello Rx")
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            System.out.println(s);     
                        }
                    });
    
            //Lamand表达式
            Observable.just("Hello Java","Hello Kotlin","Hello Android")
                    .startWith("Hello Rx")
                    .subscribe(s -> System.out.println(s));
    
    	执行结果:
    	Hello Rx
        Hello Java
        Hello Kotlin
        Hello Android
    

    startWith操作符支持传递Iterable,同时还有一个startWithArray的操作符

    使用了startWithArray操作符之后,可以再使用startWith操作符。

    多次使用startWith操作符,最后发射的数据也永远排在最前面。

            Observable.just("Hello Java","Hello Kotlin","Hello Android")
                    .startWithArray("Hello Rx","Hello Android")
                    .startWith("Hello World")
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            System.out.println(s);
                        }
                    });
    
            //Lamand表达式
            Observable.just("Hello Java","Hello Kotlin","Hello Android")
                    .startWithArray("Hello Rx","Hello RN")
                    .startWith("Hello World")
                    .subscribe(s -> System.out.println(s));
    
    	执行结果:
    	Hello World
        Hello Rx
        Hello Android
        Hello Java
        Hello Kotlin
        Hello Android
    

    startWith还可以传递一个Observable对象,它会将那个Observable的发射物插在原始Observable发射的数据序列之前,然后把这个当做自己的发射物集合。

            Observable.just("Hello Java","Hello Kotlin","Hello Android")
                    .startWithArray("Hello Rx","Hello Android")
                    .startWith(Observable.just("HellowWorld"))
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            System.out.println(s);
                        }
                    });
    
            //Lamand表达式
            Observable.just("Hello Java","Hello Kotlin","Hello Android")
                    .startWithArray("Hello Rx","Hello RN")
                    .startWith(Observable.just("HellowWorld"))
                    .subscribe(s -> System.out.println(s));
    
    	执行结果:
    	HellowWorld
        Hello Rx
        Hello Android
        Hello Java
        Hello Kotlin
        Hello Android
    

    1.2 concat/concatArray

    组合多个被观察者一起发送数据,合并后 按发送顺序串行执行

    按发送顺序串行执行

    concat最多连接4个,而concatArray可以连接多个。

            Observable.concatArray(Observable.intervalRange(1,3,0,1,TimeUnit.SECONDS)
                    ,Observable.intervalRange(4,3,0,1,TimeUnit.SECONDS))
                    .subscribe(new Consumer<Long>() {
                        @Override
                        public void accept(Long aLong) throws Exception {
                            System.out.println(aLong);
                        }
                    });
            
            //lamand表达式
            Observable.concatArray(Observable.intervalRange(1,3,0,1,TimeUnit.SECONDS)
                    ,Observable.intervalRange(4,3,0,1,TimeUnit.SECONDS))
                    .subscribe(integer -> System.out.println(integer));
    
    	执行结果:
    	1
    	2
    	3
    	4
    	5
    	6
    

    1.3 concatArrayDelayError/mergeArrayDelayError

    使用concat和merge操作符时,如果遇到其中一个被观察者发出onError事件则会马上终止其他被观察者的事件,如果希望onError事件推迟到其他被观察者都结束后才触发,可以使用对应的concatArrayDelayError或者mergeArrayDelayError操作符

            Observable
                    .concatArrayDelayError(
                    Observable.create(new ObservableOnSubscribe<Integer>() {
                            @Override
                            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                                emitter.onNext(1);
                                emitter.onNext(2);
                                emitter.onError(new NullPointerException());
                                emitter.onNext(3);
                            }
                        }),
                    Observable.just(4,5,6))
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            System.out.println(integer);
                        }
                    }, new Consumer<Throwable>() {
                        @Override
                        public void accept(Throwable throwable) throws Exception {
                            throwable.printStackTrace();
                            System.out.println(throwable.getMessage());
                        }
                    });
    
    执行结果:
        1
        2
        4
        5
        6
        java.lang.NullPointerException
            at com.loan.rxjavademo.RxjavaTest$3.subscribe(RxjavaTest.java:36)
    	null        
    
    
            //lamand表达式
            Observable.concatArrayDelayError(
                    Observable.create(emitter -> {
                        emitter.onNext(1);
                        emitter.onError(new NullPointerException());
                        emitter.onNext(2);
                        emitter.onNext(3);
                    }),Observable.just(4,5,6))
                    .subscribe(integer -> System.out.println(integer),throwable -> {
                        throwable.printStackTrace();
                        System.out.println(throwable.getMessage());
                    });
                    
    	执行结果:
        1
        4
        5
        6
        java.lang.NullPointerException
        null
    

    1.4 merge/mergeArray

    合并多个(最多四个)Observable的发射物,是按照时间线并行的,如果其中一个Observable发射了onError通知终止,则merge操作符生成的Observable也会立即以onError通知终止。

    如果只是两个被观察者合并,则还可以使用mergeWith操作符,Observable.merge(odds,evens)等价于odds.mergeWith(evens)

    按时间线并行执行

    merge最多连接4个,而mergeArray可以连接多个。

            Observable.mergeArray(Observable.intervalRange(1,3,0,1,TimeUnit.SECONDS)
                    ,Observable.intervalRange(4,3,0,1,TimeUnit.SECONDS))
                    .subscribe(new Consumer<Long>() {
                        @Override
                        public void accept(Long aLong) throws Exception {
                            System.out.println(aLong);
                        }
                    });
                   
    
            //lamand表达式
             Observable.mergeArray(Observable.intervalRange(1,3,0,1,TimeUnit.SECONDS)
                    ,Observable.intervalRange(4,3,0,1,TimeUnit.SECONDS))
                    .subscribe(integer -> System.out.println(integer));
                    
            执行结果:
            1
            4
            2
            5
            3
            6
    

    1.5 mergeWith

    合并两个Observable

            Observable.just(1,2,3)
                    .mergeWith(Observable.just(4,5,6))
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            System.out.println(integer);
                        }
                    });
    		执行结果:
            1
            2
            3
            4
            5
            6
    

    1.6 zip

    通过一个函数将多个Observable的发射物结合到一起,返回一个Observable,使用一个函数按顺序结合两个或多个Observable发射的数据项,然后发射这个函数返回的结果。它按照严格的顺序应用这个函数,只发射与发射数据项最少的那个Observable一样多的数据

    事件组合方式 = 严格按照原先事件序列 进行对位合并
    最终合并的事件数量 = 多个被观察者(Observable)中数量最少的数量

    可以接受1-9个参数

            Observable.zip(Observable.just(1, 3, 5), Observable.just(2, 4, 6),
                    new BiFunction<Integer, Integer, Integer>() {
                        @Override
                        public Integer apply(Integer integer, Integer integer2) throws Exception {
                            return integer+integer2;
                        }
                    })
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            System.out.println(integer);
                        }
                    });
    
            //Lamand表达式
            Observable.zip(Observable.just(1,3,5),Observable.just(2,4,6,8,10),
                    (integer, integer2) -> integer+integer2)
                    .subscribe(integer -> System.out.println(integer));
                    
    		执行结果:
            3
            7
            11
    

    1.7 combineLatest

    combineLatest操作符的行为类似于zip,但是只有当原始的Observable中的每一个都发射了一条数据时zip才发射数据,而combineLatest则是当原始的Observable中任意一个发射了数据时就发射一条数据,当原始Observable的任何一个发射了一条数据时,combineLatest使用一个函数结合他们最近发射的数据,然后发射这个函数的返回值。

            Observable.combineLatest(Observable.just(1, 3, 5), Observable.just(2, 4, 6,8,10),
                    new BiFunction<Integer, Integer, Integer>() {
                        @Override
                        public Integer apply(Integer integer, Integer integer2) throws Exception {
                            System.out.println("integer:"+integer+"   integer2:"+integer2);
                            return integer+integer2;
                        }
                    })
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            System.out.println(integer);
                        }
                    });
    
            //Lamand表达式
            Observable.combineLatest(Observable.just(1,3,5),Observable.just(2,4,6,8,10),
                    (integer, integer2) -> integer+integer2)
                    .subscribe(integer -> System.out.println(integer));
                    
    	执行结果:
        integer:5   integer2:2
        7
        integer:5   integer2:4
        9
        integer:5   integer2:6
        11
        integer:5   integer2:8
        13
        integer:5   integer2:10
        15
    

    1.8 join

    join操作符结合两个Observable发射的数据,基于时间窗口(针对每条数据特定的原则)选择待集合的数据项。将这些时间窗口实现为一些Observables,他们的生命周期从任何一条Observable发射的每一条数据开始。当这个定义时间窗口的Observable发射了一条数据或者完成时,与这条数据关联的窗口也会关闭。只要这条数据的窗口是打开的,它就继续结合其他Observable发射的任何数据项。

    join()四个参数的用途:

    • Observable:源Observable需要组合的Observable,这里可以称之为目标Observable.
    • Function:接收从源Observable发射来的数据,并返回一个Observable,这个Observable的生命周期决定了源Observable发射数据的有效期。
    • FUnction:接受目标Observable发射来的数据,并返回一个Observable,这个Observable的生命周期决定了目标Observable发射数据的有效期。
    • BiFunction:接受从源Observable和目标Observable发射的数据,并将这两个数据祝贺后返回。

    join操作符的效果类似于排列组合,把第一个数据源A作为基窗口,它根据自己的节奏不断发射数据元素;第二个数据源B,没发射一个数据,我们都把它和第一个数据源A中已经发射的数据进行一对一匹配。举例来说,如果某一时刻B发射了一个数据”B“,此时A已经发射了a,b,c,d共四个数据,那么合并操作符就是把"B"依次与a,b,c,d配对,得到4组数据:[a,B]、[b,B]、[c,B]、[d,B]。

            Observable<Integer> o1=Observable.just(1,2,3);
            Observable<Integer> o2=Observable.just(4,5,6);
    
            o1.join(o2, new Function<Integer, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(Integer integer) throws Exception {
                    System.out.println("apply1:"+integer);
                    return Observable.just(integer+"").delay(200,TimeUnit.MILLISECONDS);
                }
            }, new Function<Integer, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(Integer integer) throws Exception {
                    System.out.println("apply2:"+integer);
                    return Observable.just(integer+"").delay(200,TimeUnit.MILLISECONDS);
                }
            }, new BiFunction<Integer, Integer, String>() {
                @Override
                public String apply(Integer integer, Integer integer2) throws Exception {
                    System.out.println("apply3:  integer:"+integer+"  intteger2:"+integer2);
                    return integer+":"+integer2;
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    System.out.println(s);
                }
            });
            
    
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            
            执行结果:
            apply1:1
            apply1:2
            apply1:3
            apply2:4
            apply3:  integer:1  intteger2:4
            1:4
            apply3:  integer:2  intteger2:4
            2:4
            apply3:  integer:3  intteger2:4
            3:4
            apply2:5
            apply3:  integer:1  intteger2:5
            1:5
            apply3:  integer:2  intteger2:5
            2:5
            apply3:  integer:3  intteger2:5
            3:5
            apply2:6
            apply3:  integer:1  intteger2:6
            1:6
            apply3:  integer:2  intteger2:6
            2:6
            apply3:  integer:3  intteger2:6
            3:6
    

    对上述代码做一点修改:

            Observable<Integer> o1=Observable.just(1,2,3).delay(200,TimeUnit.MILLISECONDS);
            Observable<Integer> o2=Observable.just(4,5,6);
    
            o1.join(o2, new Function<Integer, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(Integer integer) throws Exception {
                    System.out.println("apply1:"+integer);
                    return Observable.just(integer+"").delay(200,TimeUnit.MILLISECONDS);
                }
            }, new Function<Integer, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(Integer integer) throws Exception {
                    System.out.println("apply2:"+integer);
                    return Observable.just(integer+"").delay(200,TimeUnit.MILLISECONDS);
                }
            }, new BiFunction<Integer, Integer, String>() {
                @Override
                public String apply(Integer integer, Integer integer2) throws Exception {
                    System.out.println("apply3:  integer:"+integer+"  intteger2:"+integer2);
                    return integer+":"+integer2;
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    System.out.println(s);
                }
            });
    
    
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            执行结果:
            apply2:4
            apply2:5
            apply2:6
            apply1:1
            apply3:  integer:1  intteger2:4
            1:4
            apply3:  integer:1  intteger2:5
            1:5
            apply3:  integer:1  intteger2:6
            1:6
            apply1:2
            apply3:  integer:2  intteger2:4
            2:4
            apply3:  integer:2  intteger2:5
            2:5
            apply3:  integer:2  intteger2:6
            2:6
            apply1:3
            apply3:  integer:3  intteger2:4
            3:4
            apply3:  integer:3  intteger2:5
            3:5
            apply3:  integer:3  intteger2:6
            3:6
    

    1.9 reduce

    把被观察者需要发送的事件聚合成1个事件 & 发送

    聚合的逻辑根据需求撰写,但本质都是前2个数据聚合,然后与后1个数据继续进行聚合,依次类推

    自定义聚合条件,前2个数据聚合得到结果与第三个数据再聚合。以此类推...

    示例:累加计算

            Observable.just(1,2,3,4,5)
                    .reduce(new BiFunction<Integer, Integer, Integer>() {
                        @Override
                        public Integer apply(Integer integer, Integer integer2) throws Exception {
                            return integer+integer2;
                        }
                    })
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            System.out.println(integer);
                        }
                    });
    
    
            //Lamanda表达式
            Observable.just(1,2,3,4,5)
                    .reduce((integer, integer2) -> integer+integer2)
                    .subscribe(integer -> System.out.println(integer));
    
    		执行结果:
    		15
    

    1.10 collect

    将被观察者Observable发送的数据事件收集到一个数据结构里

    collect和reduce操作符类似,不过它是需要自己定义收集的容器和收集逻辑

            Observable.just(1,2,3,4,5)
                    .collect(new Callable<List<Integer>>() {
    
                        @Override
                        public List<Integer> call() throws Exception {
                            return new ArrayList<Integer>();
                        }
                    }, new BiConsumer<List<Integer>, Integer>() {
                        @Override
                        public void accept(List<Integer> integers, Integer integer) throws Exception {
                            integers.add(integer);
                        }
                    })
                    .subscribe(new BiConsumer<List<Integer>, Throwable>() {
                        @Override
                        public void accept(List<Integer> integers, Throwable throwable) throws Exception {
                            System.out.println(integers);
                        }
                    });
                    
                    
               执行结果:
               [1, 2, 3, 4, 5]
    

    2. 连接操作符

    connect和refCount是ConnectableObservable所使用的的操作符。

    connectableObservable继承自Observable,然而它并不是在调用subscribe()的时候发射数据,而是只有对其使用connect草祖父时它才会发射数据,所以可以用来更灵活地控制数据发射的时机。

    2.1 connect

    用来触发connectableObservable发射数据的。我们可以等所有的观察者们都订阅了connectableObservable之后再发射数据

    2.2 publish

    是将普通的Observable转换成connectableObservable

    
            SimpleDateFormat sdf=new SimpleDateFormat("HH:mm:ss");
    
            Observable observable=Observable.interval(1,TimeUnit.SECONDS).take(6);
    
            ConnectableObservable<Long> connectableObservable=observable.publish();
    
            connectableObservable.subscribe(new Observer<Long>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Long aLong) {
                    System.out.println("subscriber1:onNext:"+aLong+"  time:"+sdf.format(new Date()));
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.println("subscriber1:error:"+e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    System.out.println("subscriber1:onComplete");
                }
            });
    
            connectableObservable.delaySubscription(3, TimeUnit.SECONDS)
                    .subscribe(new Observer<Long>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Long aLong) {
                            System.out.println("subscriber2:onNext:"+aLong+"  time:"+sdf.format(new Date()));
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            System.out.println("subscriber2:error:"+e.getMessage());
                        }
    
                        @Override
                        public void onComplete() {
                            System.out.println("subscriber2:onComplete");
                        }
                    });
    
            connectableObservable.connect();
    
    
    
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
        执行结果:
        subscriber1:onNext:2  time:16:25:31
        subscriber1:onNext:3  time:16:25:32
        subscriber2:onNext:3  time:16:25:32
        subscriber1:onNext:4  time:16:25:33
        subscriber2:onNext:4  time:16:25:33
        subscriber1:onNext:5  time:16:25:34
        subscriber2:onNext:5  time:16:25:34
        subscriber1:onComplete
        subscriber2:onComplete
    

    2.3 replay

    生成connectableObservable,保证所有的观察者收到相同的数据序列,即使它们在Observable开始发射数据之后才订阅。

    replay操作符返回一个connectableObservable对象,并且可以缓存发射过得数据,这样即使有订阅者在发射之后进行订阅,也能收到之前发射过的数据。不过使用replay操作符最好还是先限定缓存的大小,否则缓存的数据太多时会占用很大一块内存。对缓存的控制可以从空间和时间两个方面来实现。

            SimpleDateFormat sdf=new SimpleDateFormat("HH:mm:ss");
    
            Observable observable=Observable.interval(1,TimeUnit.SECONDS).take(6);
    
            ConnectableObservable<Long> connectableObservable=observable.replay();
            connectableObservable.connect();
    
            connectableObservable.subscribe(new Observer<Long>() {
                @Override
                public void onSubscribe(Disposable d) {
                }
    
                @Override
                public void onNext(Long aLong) {
                    System.out.println("subscriber1:onNext:"+aLong+"  time:"+sdf.format(new Date()));
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.println("subscriber1:error:"+e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    System.out.println("subscriber1:onComplete");
                }
            });
    
    
    
            connectableObservable.delaySubscription(3, TimeUnit.SECONDS)
                    .subscribe(new Observer<Long>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Long aLong) {
                            System.out.println("subscriber2:onNext:"+aLong+"  time:"+sdf.format(new Date()));
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            System.out.println("subscriber2:error:"+e.getMessage());
                        }
    
                        @Override
                        public void onComplete() {
                            System.out.println("subscriber2:onComplete");
                        }
                    });
    
    
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            执行结果:
            subscriber1:onNext:0  time:16:39:16
            subscriber1:onNext:1  time:16:39:17
            subscriber1:onNext:2  time:16:39:18
            subscriber2:onNext:0  time:16:39:18
            subscriber2:onNext:1  time:16:39:18
            subscriber2:onNext:2  time:16:39:18
            subscriber1:onNext:3  time:16:39:19
            subscriber2:onNext:3  time:16:39:19
            subscriber1:onNext:4  time:16:39:20
            subscriber2:onNext:4  time:16:39:20
            subscriber1:onNext:5  time:16:39:21
            subscriber2:onNext:5  time:16:39:21
            subscriber1:onComplete
            subscriber2:onComplete
    

    2.4 refCount

    将connectableObservable转换成普通的Observable,同时又保持了Hot Observable的特性。当出现第一个订阅者时,refCount会调用connect()。每个订阅者每次都会接收到同样的数据,但是当所有订阅者都取消订阅(dispose)时,refCount会自动dipose上游的Observable。

    所有的订阅者都取消订阅后,则数据流停止。如果重新订阅则数据流重新开始。如果不是所有的订阅者都取消了订阅,而是只取消了部分。则部分订阅者/观察者重新开始订阅时,不会从头开始数据流。

            SimpleDateFormat sdf=new SimpleDateFormat("HH:mm:ss");
    
            Observable observable=Observable.interval(1,TimeUnit.SECONDS).take(6);
    
            ConnectableObservable<Long> connectableObservable=observable.publish();
    
            Observable obsRefCount=connectableObservable.refCount();
    
            observable.subscribe(new Observer<Long>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Long aLong) {
                    System.out.println("subscriber1:onNext:"+aLong+"  time:"+sdf.format(new Date()));
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.println("subscriber1:error:"+e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    System.out.println("subscriber1:onComplete");
                }
            });
            observable.subscribe(new Observer<Long>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Long aLong) {
                    System.out.println("subscriber2:onNext:"+aLong+"  time:"+sdf.format(new Date()));
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.println("subscriber2:error:"+e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    System.out.println("subscriber2:onComplete");
                }
            });
            obsRefCount.subscribe(new Observer<Long>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Long aLong) {
                    System.out.println("subscriber3:onNext:"+aLong+"  time:"+sdf.format(new Date()));
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.println("subscriber3:error:"+e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    System.out.println("subscriber3:onComplete");
                }
            });
    
            obsRefCount.delaySubscription(3, TimeUnit.SECONDS)
                    .subscribe(new Observer<Long>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Long aLong) {
                            System.out.println("subscriber4:onNext:"+aLong+"  time:"+sdf.format(new Date()));
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            System.out.println("subscriber4:error:"+e.getMessage());
                        }
    
                        @Override
                        public void onComplete() {
                            System.out.println("subscriber4:onComplete");
                        }
                    });
    
    
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            执行结果:
            subscriber1:onNext:0  time:16:30:30
            subscriber2:onNext:0  time:16:30:30
            subscriber3:onNext:0  time:16:30:30
            subscriber2:onNext:1  time:16:30:31
            subscriber1:onNext:1  time:16:30:31
            subscriber3:onNext:1  time:16:30:31
            subscriber1:onNext:2  time:16:30:32
            subscriber2:onNext:2  time:16:30:32
            subscriber3:onNext:2  time:16:30:32
            subscriber1:onNext:3  time:16:30:33
            subscriber2:onNext:3  time:16:30:33
            subscriber3:onNext:3  time:16:30:33
            subscriber4:onNext:3  time:16:30:33
            subscriber1:onNext:4  time:16:30:34
            subscriber2:onNext:4  time:16:30:34
            subscriber3:onNext:4  time:16:30:34
            subscriber4:onNext:4  time:16:30:34
            subscriber2:onNext:5  time:16:30:35
            subscriber1:onNext:5  time:16:30:35
            subscriber1:onComplete
            subscriber2:onComplete
            subscriber3:onNext:5  time:16:30:35
            subscriber4:onNext:5  time:16:30:35
            subscriber3:onComplete
            subscriber4:onComplete
    
  • 相关阅读:
    在react-native中dva的使用
    js获取任意一天的0点和23:59:59时间
    IntelliJ IDEA 快捷键(转载收藏)
    Android中对已安装应用的管理实现
    Retrofit的初次使用
    GreenDao的初次使用--号称Android最快的关系型数据库
    RxJava操作符的简单使用
    dagger2的初次使用
    Android-沉浸式状态栏的实现
    Mac之如何查看已用端口
  • 原文地址:https://www.cnblogs.com/wangjiaghe/p/11890865.html
Copyright © 2011-2022 走看看