zoukankan      html  css  js  c++  java
  • RxJava2详细攻略(二)

    3. 组合操作符

    3.1 concat()

    方法预览

    public static <T> Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2, ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)
    

    有什么用?

    可以将多个观察者组合在一起,然后按照之前发送顺序发送事件。需要注意的是,concat()最多只能发送4个事件。

    怎么用?

    Observable.concat(Observable.just(1, 2), Observable.just(3, 4), Observable.just(5, 6), Observable.just(7, 8))
    .subscribe(new Observer<Integer>(){
        @Override
        public void onSubscribe(Disposable d){
            
        }
        
        @Override
        public void onNext(Integer integer){
            Log.d(TAG, "==onNext" + integer);
        }
        
        @Override
        public void onError(Throwable e){
            
        }
        
        @Override
        public void onComplete{
            
        }
    });
    

    打印如下:

    05-21 15:40:26.738 7477-7477/com.example.rxjavademo D/chan: ================onNext 1
    ================onNext 2
    05-21 15:40:26.739 7477-7477/com.example.rxjavademo D/chan: ================onNext 3
    ================onNext 4
    ================onNext 5
    ================onNext 6
    ================onNext 7
    ================onNext 8
    

    3.2 concatArray()

    方法预览

    public static<T> Observable<T> concatArray(ObservableSource<? extends T> ...sources)
    

    有什么用?

    与concat()作用一样,不过concatArray()可以发送多于4个被观察者。

    Observable.concatArray(Observable.just(1, 2),
    Observable.just(3, 4),
    Observable.just(5, 6),
    Observable.just(7, 8),
    Observable.just(9, 10))
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
    
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    

    打印结果:

    05-21 15:47:18.581 9129-9129/com.example.rxjavademo D/chan: ================onNext 1
    ================onNext 2
    ================onNext 3
    ================onNext 4
    ================onNext 5
    ================onNext 6
    ================onNext 7
    ================onNext 8
    ================onNext 9
    ================onNext 10
    

    3.3 merge()

    方法预览

    public static <T> Observable<T> merge(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2,ObservableSource<? extends T> source3,ObservableSource<? extends T> source4)
    

    有什么用?

    这个方法与concat()知识一样,只是concat()是串行发送事件,而merge()并行发送事件。

    怎么用?

    现在来演示concat和merge的区别:

    Observable.merge(
    Observable.interval(1, TimeUnit.SECONDS).map(new Function<Long, String>(){
        @Override
        public String apply(Long aLong)throws Exception{
            return "A" + aLong;
        }
    }),
    Observable.interval(1, TimeUnit.SECONDS).map(new Function<Long, String>(){
        @Override
        public String apply(Long aLong) throws Exception{
            return "B" + aLong;
        }
    }))
        .subscribe(new Observer<String>(){
            public void onSubscribe(Disposable d){
                
            }
            
            @Override
            public void onNext(String s){
                Log.d(TAG, "==onNext" + s);
            }
            
            @Override
            public void onError(Throwable e){
                
            }
            
            @Override
            public void onComplete(){
                
            }
        });
    

    打印结果如下:

    05-21 16:10:31.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B0
    05-21 16:10:31.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A0
    05-21 16:10:32.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A1
    05-21 16:10:32.126 12801-12850/com.example.rxjavademo D/chan: =====================onNext B1
    05-21 16:10:33.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A2
    05-21 16:10:33.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B2
    05-21 16:10:34.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A3
    05-21 16:10:34.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B3
    05-21 16:10:35.124 12801-12849/com.example.rxjavademo D/chan: =====================onNext A4
    05-21 16:10:35.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B4
    05-21 16:10:36.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A5
    05-21 16:10:36.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B5
    ......
    

    从结果来看,A和B的事件序列都可以发出,将以上的代码换成concat(),看看打印结果:

    05-21 16:17:52.352 14597-14621/com.example.rxjavademo D/chan: =====================onNext A0
    05-21 16:17:53.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A1
    05-21 16:17:54.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A2
    05-21 16:17:55.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A3
    05-21 16:17:56.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A4
    05-21 16:17:57.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A5
    ......
    

    从结果可以知道,只有等第一个被观察者发送完事件之后,第二个观察者才会发送事件。

    mergeArray()和merge()的作用是一样的,只是它可以发送4个以上的被观察者。

    3.4 concatArrayDelayError() & mergeArrayDelayError()

    方法预览

    public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources)
    public static <T> Observable<T> mergeArrayDelayError(ObservableSource<? extends T>... sources)
    

    有什么用?

    在concatArray()和mergeArray()两个方法中,如果其中一个被观察者发送了一个Error事件,那么就会停止发送事件,如果你想onError()事件延迟到所有被观察者都发送完事件再执行的话,就可以使用这两个方法。

    怎么用?

    首先使用concatArray()来验证一下:

    Observable.concatArray(
    Observable.create(new ObservableOnSubscribe < Integer > () {
        @Override
        public void subscribe(ObservableEmitter < Integer > e) throws Exception {
            e.onNext(1);
            e.onError(new NumberFormatException());
        }
    }), Observable.just(2, 3, 4))
        .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
    
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "===================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "===================onError ");
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    

    打印结果:

    05-21 16:38:59.725 17985-17985/com.example.rxjavademo D/chan: ===================onNext 1
    ===================onError 
    

    从结果可以知道确实中断了,现在换用concatArrayDelayError(),代码如下:

    Observable.concatArrayDelayError(
    Observable.create(new ObservableOnSubscribe<Integer>(){
        @Override
        public void subscribe(ObservableEmitter<Integer> e)throws Exception{
            e.onNext(1);
            e.onError(new NumberFormatException());
        }
    }),
    Observable.just(2, 3, 4))
    .subscribe(new Observable<Integer>(){
        @Override
        public void onSubscribe(Disposable d){
            
        }
        
        @Override
        public void onNext(Integer integer){
            Log.d(TAG, "==onNext" + integer);
        }
        
        @Override
        public void onError(Throwable e){
            Log.d(TAG, "==onError");
        }
        
        @Override
        public void onComplete(){
            
        }
    });
    

    打印结果如下:

    05-21 16:40:59.329 18199-18199/com.example.rxjavademo D/chan: ===================onNext 1
    ===================onNext 2
    ===================onNext 3
    ===================onNext 4
    ===================onError 
    

    从结果可以看到,onError事件是在所有被观察者发送完事件才发送的。mergeArrayDelayError也是同样的作用。

    3.5 zip()

    方法预览

    public static <T1, T2, R> Observable<R> zip(ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> zipper)
    ......
    

    有什么用?

    会将多个被观察者合并,根据各个被观察者发送事件的顺序一个个结合起来,最终发送的事件数量会与源Observable中最少事件的数量一样。

    怎么用?

    Observable.zip(Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS)
    .map(new Function<Long, String>(){
        @Override
        public String apply(Long aLong) throws Exception{
        String s1 = "A" + aLong;
        Log.d(TAG, "==A 发送的事件" + s1);
        return s1;
    }}),
    Observable.intervalRange(1, 6, 1, 1, TimeUnit.SECONDS)
        .map(new Function<Long, String>(){
            @Override
            public String apply(Long aLong) throws Exception{
                String s2 = "B" + aLong;
                Log.d(TAG, "==B 发送的事件" + s2);
                return s2;
            }
        }),
        new BiFunction<String, String, String>(){
            @Override
            public String apply(String s, String s2) throws Exception{
                String res = s + s2;
                return res;
            }
        })
        .subscribe(new Observer<String>(){
            @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "===================onSubscribe ");
        }
    
        @Override
        public void onNext(String s) {
            Log.d(TAG, "===================onNext " + s);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "===================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "===================onComplete ");
        }
        });
    

    上面代码中有两个Observable,第一个发送事件的数量为5个,第二个发送事件的数量为6个。现在来看下打印结果:

    05-22 09:10:39.952 5338-5338/com.example.rxjavademo D/chan: ===================onSubscribe 
    05-22 09:10:40.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A1
    05-22 09:10:40.953 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B1
    ===================onNext A1B1
    05-22 09:10:41.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A2
    05-22 09:10:41.954 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B2
    ===================onNext A2B2
    05-22 09:10:42.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A3
    05-22 09:10:42.953 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B3
    05-22 09:10:42.953 5338-5362/com.example.rxjavademo D/chan: ===================onNext A3B3
    05-22 09:10:43.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A4
    05-22 09:10:43.953 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B4
    05-22 09:10:43.954 5338-5363/com.example.rxjavademo D/chan: ===================onNext A4B4
    05-22 09:10:44.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A5
    05-22 09:10:44.953 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B5
    05-22 09:10:44.954 5338-5363/com.example.rxjavademo D/chan: ===================onNext A5B5
    ===================onComplete 
    

    可以发现最终接受到的事件数量是5,那么为什么第二个Observable没有发送第六个事件呢?因为在这之前第一个Observable已经发送了onComplete事件,所以第二个Observable不会再发送事件。

    3.6 combineLatest() & combineLatestDelayError()

    方法预览

    public static <T1, T2, R> Observable<R> combineLatest(ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> combiner)
    ....... 
    

    有什么用?

    comnbineLatest()的作用与zip()类似,但是combineLatest()发送事件的序列是与发送的时间线有关的,当combineLatest()中所有的Observable都发送了事件,只要其中有一个Observable发送事件,这个事件就会和其他Observable最近发送的事件结合起来发送,这样可能还是比较抽象,看看以下例子代码。

    怎么用?

    Observable.combineLatest(
    Observable.intervalRange(1, 4, 1, 1, TimeUnit.SECONDS)
        .map(new Function<Long, String>(){
            @Override
            public String apply(Long aLong) throws Exception{
                String s1 = "A" + aLong;
                Log.d(TAG, "==A 发送的事件" + s1);
                return s1;
            }
        }),
        Observable.intervalRange(1, 5, 2, 2, TimeUnit.SECONDS)
            .map(new Function<Long, String>(){
                @Override
                public String apply(Long aLong) throws Exception{
                    String s2 = "B" + aLong;
                    Log.d(TAG, "==B 发送的事件" + s2);
                    return s2;
                }
            }),
            new BiFunction<String, String, String>(){
             @Override
             public String apply(String s, String s2) throws Exception{
                 String res = s + s2;
                 return res;
             }
            })
            .subscribe(new Observer<String>(){
                @Override
                public void onSubscribe(Disposable d){
                    Log.d(TAG, "==onSubscribe");
                }
                
                @Override
        public void onNext(String s) {
            Log.d(TAG, "===================最终接收到的事件 " + s);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "===================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "===================onComplete ");
        }
        });
    

    分析上面的代码,Observable A 会每隔1秒就发送一次事件,Observable B 会隔两秒发送一次事件,来看看打印结果:

    05-22 11:41:20.859 15104-15104/? D/chan: ===================onSubscribe 
    05-22 11:41:21.859 15104-15128/com.example.rxjavademo D/chan: ===================A 发送的事件 A1
    05-22 11:41:22.860 15104-15128/com.example.rxjavademo D/chan: ===================A 发送的事件 A2
    05-22 11:41:22.861 15104-15129/com.example.rxjavademo D/chan: ===================B 发送的事件 B1
    05-22 11:41:22.862 15104-15129/com.example.rxjavademo D/chan: ===================最终接收到的事件 A2B1
    05-22 11:41:23.860 15104-15128/com.example.rxjavademo D/chan: ===================A 发送的事件 A3
    ===================最终接收到的事件 A3B1
    05-22 11:41:24.860 15104-15128/com.example.rxjavademo D/chan: ===================A 发送的事件 A4
    05-22 11:41:24.861 15104-15129/com.example.rxjavademo D/chan: ===================B 发送的事件 B2
    05-22 11:41:24.861 15104-15128/com.example.rxjavademo D/chan: ===================最终接收到的事件 A4B1
    05-22 11:41:24.861 15104-15129/com.example.rxjavademo D/chan: ===================最终接收到的事件 A4B2
    05-22 11:41:26.860 15104-15129/com.example.rxjavademo D/chan: ===================B 发送的事件 B3
    05-22 11:41:26.861 15104-15129/com.example.rxjavademo D/chan: ===================最终接收到的事件 A4B3
    05-22 11:41:28.860 15104-15129/com.example.rxjavademo D/chan: ===================B 发送的事件 B4
    05-22 11:41:28.861 15104-15129/com.example.rxjavademo D/chan: ===================最终接收到的事件 A4B4
    05-22 11:41:30.860 15104-15129/com.example.rxjavademo D/chan: ===================B 发送的事件 B5
    05-22 11:41:30.861 15104-15129/com.example.rxjavademo D/chan: ===================最终接收到的事件 A4B5
    ===================onComplete 
    

    分析上述结果可以知道,当发送A1事件之后,因为B并没有发送任何事件,所以根本不会发生结合,当B发送了B1事件之后,就会与A最后发送的事件A2结合成A2B1,这样只要后面一有被观察者发送事件,这个事件就会与其他被观察者最近发送的事件结合起来了。

    因为combineLastestDelayError()就是多了延迟发送onError()功能,所以不做赘述。

    3.7 reduce()

    方法预览

    public final Maybe<T> reduce(BiFunction<T, T, T> reducer)
    

    有什么用?

    与scan()操作符的作用相同,也是将发送数据以一定逻辑聚合起来,这两个的区别在于scan()没处理一次数据就会将事件发送给观察者,而reduce()会将所有数据聚合在一起才会发送事件给观察者。

    怎么用?

    Observable.just(0, 1, 2, 3)
    .reduce(new BiFunction<Integer, Integer, Integer>(){
        @Override
        public Integer apply(Integer integer, Integer integer 2) throws Exception{
            int res = integer + integer2;
            Log.d(TAG, "==integer" + integer);
            Log.d(TAG, "==integer2" + integer2);
            Log.d(TAG, "==res" + res);
            return res;
        }
    })
    .subscribe(new Comsumer<Integer>(){
        @Override
        public void accept(Integer integer) throws Exception{
            Log.d(TAG, "==accept" + integer);
        }
    });
    

    打印结果

    05-22 14:21:46.042 17775-17775/? D/chan: ====================integer 0
    ====================integer2 1
    ====================res 1
    ====================integer 1
    ====================integer2 2
    ====================res 3
    ====================integer 3
    ====================integer2 3
    ====================res 6
    ==================accept 6
    

    从结果可以看出,其实就是前两个数据聚合之后,然后再与后一个数据进行聚合,一直到没有数据为止。

    3.8 collect()

    方法预览

    public final <U> Single<U> collect(Callable<? extends U> initialValueSupplier, BiConsumer<? super U, ? super T> collector)
    

    有什么用?

    将数据收集到数据结构当中。

    怎么用?

    Observable.just(1, 2, 3, 4)
    .collect(new Callable<ArrayList<Integer>>(){
        @Override
        public ArrayList<Integer> call() throws Exception{
            return new ArrayList<>();
        }
    },
    new BiConsumer<ArrayList<Integer>, Integer>(){
        @Override
        public void accept(ArrayList<Integer> integers, Integer integer) throws Exception{
            integers.ad(integer);
        }
    })
    .subscribe(new Consumer<ArrayList<Integer>>(){
        @Override
        public void accept(ArrayList<Integer> integers)throws Exception{
            Log.d(TAG, "==accept" + integers);
        }
    });
    

    打印结果:

    05-22 16:47:18.257 31361-31361/com.example.rxjavademo D/chan: ===============accept [1, 2, 3, 4]
    

    3.9 startWith() & startWithArray()

    方法预览

    public final Observable<T> startWith(T item)
    public final Observable<T> startWithArray(T... items)
    

    有什么用?

    在发送时间之前追加事件,startWith()追加一个事件,startWithArray()可以追加多个事件。追加的事件会先发出。

    怎么用?

    Observable.just(5, 6, 7)
    .startWithArray(2, 3, 4)
    startWith(1)
    .subscribe(new Consumer<Integer>(){
        @Override
        public void accept(Integer integer) throws Exception{
            Log.d(TAG, "==accept" + integer);
        }
    });
    

    打印结果:

    05-22 17:08:21.282 4505-4505/com.example.rxjavademo D/chan: ================accept 1
    ================accept 2
    ================accept 3
    ================accept 4
    ================accept 5
    ================accept 6
    ================accept 7
    

    3.10 count()

    方法预览

    public final Single<Long> count()
    

    有什么用?

    返回被观察者发送事件的数量

    怎么用?

    Observable.just(1, 2, 3)
    .count()
    .subscribe(new Consumer<Long>(){
        @Override
        public void accept(Long aLong) throws Exception{
            Log.d(TAG, "==aLong" + aLong);
        }
    });
    

    打印结果:

    05-22 20:41:25.025 14126-14126/? D/chan: =======================aLong 3
    
  • 相关阅读:
    C# 图片与Base64的相互转化
    LeetCode 303. Range Sum Query – Immutable
    LeetCode 300. Longest Increasing Subsequence
    LeetCode 292. Nim Game
    LeetCode 283. Move Zeroes
    LeetCode 279. Perfect Squares
    LeetCode 268. Missing Number
    LeetCode 264. Ugly Number II
    LeetCode 258. Add Digits
    LeetCode 257. Binary Tree Paths
  • 原文地址:https://www.cnblogs.com/hyeri/p/14266745.html
Copyright © 2011-2022 走看看