zoukankan      html  css  js  c++  java
  • RxJava2详细攻略(二)

    3. 组合操作符

    3.1 concat()

    方法预览

    public static <T> Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2, ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)
    

    有什么用?

    可以将多个观察者组合在一起,然后按照之前发送顺序发送事件。需要注意的是,concat()最多只能发送4个事件。

    怎么用?

    Observable.concat(Observable.just(1, 2), Observable.just(3, 4), Observable.just(5, 6), Observable.just(7, 8))
    .subscribe(new Observer<Integer>(){
        @Override
        public void onSubscribe(Disposable d){
            
        }
        
        @Override
        public void onNext(Integer integer){
            Log.d(TAG, "==onNext" + integer);
        }
        
        @Override
        public void onError(Throwable e){
            
        }
        
        @Override
        public void onComplete{
            
        }
    });
    

    打印如下:

    05-21 15:40:26.738 7477-7477/com.example.rxjavademo D/chan: ================onNext 1
    ================onNext 2
    05-21 15:40:26.739 7477-7477/com.example.rxjavademo D/chan: ================onNext 3
    ================onNext 4
    ================onNext 5
    ================onNext 6
    ================onNext 7
    ================onNext 8
    

    3.2 concatArray()

    方法预览

    public static<T> Observable<T> concatArray(ObservableSource<? extends T> ...sources)
    

    有什么用?

    与concat()作用一样,不过concatArray()可以发送多于4个被观察者。

    Observable.concatArray(Observable.just(1, 2),
    Observable.just(3, 4),
    Observable.just(5, 6),
    Observable.just(7, 8),
    Observable.just(9, 10))
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
    
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    

    打印结果:

    05-21 15:47:18.581 9129-9129/com.example.rxjavademo D/chan: ================onNext 1
    ================onNext 2
    ================onNext 3
    ================onNext 4
    ================onNext 5
    ================onNext 6
    ================onNext 7
    ================onNext 8
    ================onNext 9
    ================onNext 10
    

    3.3 merge()

    方法预览

    public static <T> Observable<T> merge(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2,ObservableSource<? extends T> source3,ObservableSource<? extends T> source4)
    

    有什么用?

    这个方法与concat()知识一样,只是concat()是串行发送事件,而merge()并行发送事件。

    怎么用?

    现在来演示concat和merge的区别:

    Observable.merge(
    Observable.interval(1, TimeUnit.SECONDS).map(new Function<Long, String>(){
        @Override
        public String apply(Long aLong)throws Exception{
            return "A" + aLong;
        }
    }),
    Observable.interval(1, TimeUnit.SECONDS).map(new Function<Long, String>(){
        @Override
        public String apply(Long aLong) throws Exception{
            return "B" + aLong;
        }
    }))
        .subscribe(new Observer<String>(){
            public void onSubscribe(Disposable d){
                
            }
            
            @Override
            public void onNext(String s){
                Log.d(TAG, "==onNext" + s);
            }
            
            @Override
            public void onError(Throwable e){
                
            }
            
            @Override
            public void onComplete(){
                
            }
        });
    

    打印结果如下:

    05-21 16:10:31.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B0
    05-21 16:10:31.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A0
    05-21 16:10:32.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A1
    05-21 16:10:32.126 12801-12850/com.example.rxjavademo D/chan: =====================onNext B1
    05-21 16:10:33.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A2
    05-21 16:10:33.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B2
    05-21 16:10:34.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A3
    05-21 16:10:34.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B3
    05-21 16:10:35.124 12801-12849/com.example.rxjavademo D/chan: =====================onNext A4
    05-21 16:10:35.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B4
    05-21 16:10:36.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A5
    05-21 16:10:36.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B5
    ......
    

    从结果来看,A和B的事件序列都可以发出,将以上的代码换成concat(),看看打印结果:

    05-21 16:17:52.352 14597-14621/com.example.rxjavademo D/chan: =====================onNext A0
    05-21 16:17:53.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A1
    05-21 16:17:54.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A2
    05-21 16:17:55.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A3
    05-21 16:17:56.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A4
    05-21 16:17:57.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A5
    ......
    

    从结果可以知道,只有等第一个被观察者发送完事件之后,第二个观察者才会发送事件。

    mergeArray()和merge()的作用是一样的,只是它可以发送4个以上的被观察者。

    3.4 concatArrayDelayError() & mergeArrayDelayError()

    方法预览

    public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources)
    public static <T> Observable<T> mergeArrayDelayError(ObservableSource<? extends T>... sources)
    

    有什么用?

    在concatArray()和mergeArray()两个方法中,如果其中一个被观察者发送了一个Error事件,那么就会停止发送事件,如果你想onError()事件延迟到所有被观察者都发送完事件再执行的话,就可以使用这两个方法。

    怎么用?

    首先使用concatArray()来验证一下:

    Observable.concatArray(
    Observable.create(new ObservableOnSubscribe < Integer > () {
        @Override
        public void subscribe(ObservableEmitter < Integer > e) throws Exception {
            e.onNext(1);
            e.onError(new NumberFormatException());
        }
    }), Observable.just(2, 3, 4))
        .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
    
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "===================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "===================onError ");
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    

    打印结果:

    05-21 16:38:59.725 17985-17985/com.example.rxjavademo D/chan: ===================onNext 1
    ===================onError 
    

    从结果可以知道确实中断了,现在换用concatArrayDelayError(),代码如下:

    Observable.concatArrayDelayError(
    Observable.create(new ObservableOnSubscribe<Integer>(){
        @Override
        public void subscribe(ObservableEmitter<Integer> e)throws Exception{
            e.onNext(1);
            e.onError(new NumberFormatException());
        }
    }),
    Observable.just(2, 3, 4))
    .subscribe(new Observable<Integer>(){
        @Override
        public void onSubscribe(Disposable d){
            
        }
        
        @Override
        public void onNext(Integer integer){
            Log.d(TAG, "==onNext" + integer);
        }
        
        @Override
        public void onError(Throwable e){
            Log.d(TAG, "==onError");
        }
        
        @Override
        public void onComplete(){
            
        }
    });
    

    打印结果如下:

    05-21 16:40:59.329 18199-18199/com.example.rxjavademo D/chan: ===================onNext 1
    ===================onNext 2
    ===================onNext 3
    ===================onNext 4
    ===================onError 
    

    从结果可以看到,onError事件是在所有被观察者发送完事件才发送的。mergeArrayDelayError也是同样的作用。

    3.5 zip()

    方法预览

    public static <T1, T2, R> Observable<R> zip(ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> zipper)
    ......
    

    有什么用?

    会将多个被观察者合并,根据各个被观察者发送事件的顺序一个个结合起来,最终发送的事件数量会与源Observable中最少事件的数量一样。

    怎么用?

    Observable.zip(Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS)
    .map(new Function<Long, String>(){
        @Override
        public String apply(Long aLong) throws Exception{
        String s1 = "A" + aLong;
        Log.d(TAG, "==A 发送的事件" + s1);
        return s1;
    }}),
    Observable.intervalRange(1, 6, 1, 1, TimeUnit.SECONDS)
        .map(new Function<Long, String>(){
            @Override
            public String apply(Long aLong) throws Exception{
                String s2 = "B" + aLong;
                Log.d(TAG, "==B 发送的事件" + s2);
                return s2;
            }
        }),
        new BiFunction<String, String, String>(){
            @Override
            public String apply(String s, String s2) throws Exception{
                String res = s + s2;
                return res;
            }
        })
        .subscribe(new Observer<String>(){
            @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "===================onSubscribe ");
        }
    
        @Override
        public void onNext(String s) {
            Log.d(TAG, "===================onNext " + s);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "===================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "===================onComplete ");
        }
        });
    

    上面代码中有两个Observable,第一个发送事件的数量为5个,第二个发送事件的数量为6个。现在来看下打印结果:

    05-22 09:10:39.952 5338-5338/com.example.rxjavademo D/chan: ===================onSubscribe 
    05-22 09:10:40.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A1
    05-22 09:10:40.953 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B1
    ===================onNext A1B1
    05-22 09:10:41.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A2
    05-22 09:10:41.954 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B2
    ===================onNext A2B2
    05-22 09:10:42.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A3
    05-22 09:10:42.953 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B3
    05-22 09:10:42.953 5338-5362/com.example.rxjavademo D/chan: ===================onNext A3B3
    05-22 09:10:43.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A4
    05-22 09:10:43.953 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B4
    05-22 09:10:43.954 5338-5363/com.example.rxjavademo D/chan: ===================onNext A4B4
    05-22 09:10:44.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A5
    05-22 09:10:44.953 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B5
    05-22 09:10:44.954 5338-5363/com.example.rxjavademo D/chan: ===================onNext A5B5
    ===================onComplete 
    

    可以发现最终接受到的事件数量是5,那么为什么第二个Observable没有发送第六个事件呢?因为在这之前第一个Observable已经发送了onComplete事件,所以第二个Observable不会再发送事件。

    3.6 combineLatest() & combineLatestDelayError()

    方法预览

    public static <T1, T2, R> Observable<R> combineLatest(ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> combiner)
    ....... 
    

    有什么用?

    comnbineLatest()的作用与zip()类似,但是combineLatest()发送事件的序列是与发送的时间线有关的,当combineLatest()中所有的Observable都发送了事件,只要其中有一个Observable发送事件,这个事件就会和其他Observable最近发送的事件结合起来发送,这样可能还是比较抽象,看看以下例子代码。

    怎么用?

    Observable.combineLatest(
    Observable.intervalRange(1, 4, 1, 1, TimeUnit.SECONDS)
        .map(new Function<Long, String>(){
            @Override
            public String apply(Long aLong) throws Exception{
                String s1 = "A" + aLong;
                Log.d(TAG, "==A 发送的事件" + s1);
                return s1;
            }
        }),
        Observable.intervalRange(1, 5, 2, 2, TimeUnit.SECONDS)
            .map(new Function<Long, String>(){
                @Override
                public String apply(Long aLong) throws Exception{
                    String s2 = "B" + aLong;
                    Log.d(TAG, "==B 发送的事件" + s2);
                    return s2;
                }
            }),
            new BiFunction<String, String, String>(){
             @Override
             public String apply(String s, String s2) throws Exception{
                 String res = s + s2;
                 return res;
             }
            })
            .subscribe(new Observer<String>(){
                @Override
                public void onSubscribe(Disposable d){
                    Log.d(TAG, "==onSubscribe");
                }
                
                @Override
        public void onNext(String s) {
            Log.d(TAG, "===================最终接收到的事件 " + s);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "===================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "===================onComplete ");
        }
        });
    

    分析上面的代码,Observable A 会每隔1秒就发送一次事件,Observable B 会隔两秒发送一次事件,来看看打印结果:

    05-22 11:41:20.859 15104-15104/? D/chan: ===================onSubscribe 
    05-22 11:41:21.859 15104-15128/com.example.rxjavademo D/chan: ===================A 发送的事件 A1
    05-22 11:41:22.860 15104-15128/com.example.rxjavademo D/chan: ===================A 发送的事件 A2
    05-22 11:41:22.861 15104-15129/com.example.rxjavademo D/chan: ===================B 发送的事件 B1
    05-22 11:41:22.862 15104-15129/com.example.rxjavademo D/chan: ===================最终接收到的事件 A2B1
    05-22 11:41:23.860 15104-15128/com.example.rxjavademo D/chan: ===================A 发送的事件 A3
    ===================最终接收到的事件 A3B1
    05-22 11:41:24.860 15104-15128/com.example.rxjavademo D/chan: ===================A 发送的事件 A4
    05-22 11:41:24.861 15104-15129/com.example.rxjavademo D/chan: ===================B 发送的事件 B2
    05-22 11:41:24.861 15104-15128/com.example.rxjavademo D/chan: ===================最终接收到的事件 A4B1
    05-22 11:41:24.861 15104-15129/com.example.rxjavademo D/chan: ===================最终接收到的事件 A4B2
    05-22 11:41:26.860 15104-15129/com.example.rxjavademo D/chan: ===================B 发送的事件 B3
    05-22 11:41:26.861 15104-15129/com.example.rxjavademo D/chan: ===================最终接收到的事件 A4B3
    05-22 11:41:28.860 15104-15129/com.example.rxjavademo D/chan: ===================B 发送的事件 B4
    05-22 11:41:28.861 15104-15129/com.example.rxjavademo D/chan: ===================最终接收到的事件 A4B4
    05-22 11:41:30.860 15104-15129/com.example.rxjavademo D/chan: ===================B 发送的事件 B5
    05-22 11:41:30.861 15104-15129/com.example.rxjavademo D/chan: ===================最终接收到的事件 A4B5
    ===================onComplete 
    

    分析上述结果可以知道,当发送A1事件之后,因为B并没有发送任何事件,所以根本不会发生结合,当B发送了B1事件之后,就会与A最后发送的事件A2结合成A2B1,这样只要后面一有被观察者发送事件,这个事件就会与其他被观察者最近发送的事件结合起来了。

    因为combineLastestDelayError()就是多了延迟发送onError()功能,所以不做赘述。

    3.7 reduce()

    方法预览

    public final Maybe<T> reduce(BiFunction<T, T, T> reducer)
    

    有什么用?

    与scan()操作符的作用相同,也是将发送数据以一定逻辑聚合起来,这两个的区别在于scan()没处理一次数据就会将事件发送给观察者,而reduce()会将所有数据聚合在一起才会发送事件给观察者。

    怎么用?

    Observable.just(0, 1, 2, 3)
    .reduce(new BiFunction<Integer, Integer, Integer>(){
        @Override
        public Integer apply(Integer integer, Integer integer 2) throws Exception{
            int res = integer + integer2;
            Log.d(TAG, "==integer" + integer);
            Log.d(TAG, "==integer2" + integer2);
            Log.d(TAG, "==res" + res);
            return res;
        }
    })
    .subscribe(new Comsumer<Integer>(){
        @Override
        public void accept(Integer integer) throws Exception{
            Log.d(TAG, "==accept" + integer);
        }
    });
    

    打印结果

    05-22 14:21:46.042 17775-17775/? D/chan: ====================integer 0
    ====================integer2 1
    ====================res 1
    ====================integer 1
    ====================integer2 2
    ====================res 3
    ====================integer 3
    ====================integer2 3
    ====================res 6
    ==================accept 6
    

    从结果可以看出,其实就是前两个数据聚合之后,然后再与后一个数据进行聚合,一直到没有数据为止。

    3.8 collect()

    方法预览

    public final <U> Single<U> collect(Callable<? extends U> initialValueSupplier, BiConsumer<? super U, ? super T> collector)
    

    有什么用?

    将数据收集到数据结构当中。

    怎么用?

    Observable.just(1, 2, 3, 4)
    .collect(new Callable<ArrayList<Integer>>(){
        @Override
        public ArrayList<Integer> call() throws Exception{
            return new ArrayList<>();
        }
    },
    new BiConsumer<ArrayList<Integer>, Integer>(){
        @Override
        public void accept(ArrayList<Integer> integers, Integer integer) throws Exception{
            integers.ad(integer);
        }
    })
    .subscribe(new Consumer<ArrayList<Integer>>(){
        @Override
        public void accept(ArrayList<Integer> integers)throws Exception{
            Log.d(TAG, "==accept" + integers);
        }
    });
    

    打印结果:

    05-22 16:47:18.257 31361-31361/com.example.rxjavademo D/chan: ===============accept [1, 2, 3, 4]
    

    3.9 startWith() & startWithArray()

    方法预览

    public final Observable<T> startWith(T item)
    public final Observable<T> startWithArray(T... items)
    

    有什么用?

    在发送时间之前追加事件,startWith()追加一个事件,startWithArray()可以追加多个事件。追加的事件会先发出。

    怎么用?

    Observable.just(5, 6, 7)
    .startWithArray(2, 3, 4)
    startWith(1)
    .subscribe(new Consumer<Integer>(){
        @Override
        public void accept(Integer integer) throws Exception{
            Log.d(TAG, "==accept" + integer);
        }
    });
    

    打印结果:

    05-22 17:08:21.282 4505-4505/com.example.rxjavademo D/chan: ================accept 1
    ================accept 2
    ================accept 3
    ================accept 4
    ================accept 5
    ================accept 6
    ================accept 7
    

    3.10 count()

    方法预览

    public final Single<Long> count()
    

    有什么用?

    返回被观察者发送事件的数量

    怎么用?

    Observable.just(1, 2, 3)
    .count()
    .subscribe(new Consumer<Long>(){
        @Override
        public void accept(Long aLong) throws Exception{
            Log.d(TAG, "==aLong" + aLong);
        }
    });
    

    打印结果:

    05-22 20:41:25.025 14126-14126/? D/chan: =======================aLong 3
    
  • 相关阅读:
    含字母的数字排序
    ci中简单实用的权限管理
    时间戳和日期转换
    prop
    定时器
    centos查看防火墙策略是firewall还是iptables
    centos上安装rabbitmq服务器
    springcloud微服务feign消费模式解决 com.netflix.client.ClientException: Load balancer does not have available server for client:xxx
    java正则去掉json字符串key的引号
    centos安装redis
  • 原文地址:https://www.cnblogs.com/hyeri/p/14266745.html
Copyright © 2011-2022 走看看