zoukankan      html  css  js  c++  java
  • RxJava2实战---第六章 条件操作符和布尔操作符

    RxJava2实战---第六章 条件操作符和布尔操作符

    RxJava的条件操作符主要包括以下几个:

    • amb():给定多个Observable,只让第一个发射数据的Obsrvable发射全部数据。
    • defaultIfEmpty():发射来自原始Observable的数据,如果原始Observable没有发射数据,则发射一个默认数据
    • skipUntil():丢弃原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据。
    • skipWhile():丢弃原始Observable发射的数据,直到一个特定的条件为假,然后发射原始Observable的剩余的数据。
    • takeUntil():发射来自原始Observable的数据,直到第二个Observable发射了一个数据或一个通知。
    • takeWhile:发射原始Observable的数据,直到一个特定的条件为真,然后跳过剩余的数据。

    RxJava的布尔操作符主要包括:

    • all():判断是否所有的数据项都满足某个条件。
    • contains():判断Observable是否会发射一个指定的值。
    • exists() and isEmpty():判断Observable是否发射一个值。
    • sequenceEqual():判断两个Observable发射的序列是否相等。

    布尔操作符返回的结果全部为bollean值,条件操作符会根据条件进行数据发射或变换被观察者。

    1 条件操作符

    1.1 amb()

    给定两个或多个Observable,它只发射首先发射数据或者通知的那个Observable的所有数据。

    不管发射的是一项数据还是一个onError或onCompleted通知,amb将忽略和丢弃其它所有Observables的发射物。

    <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources)
    
    <T> Observable<T> ambArray(ObservableSource<? extends T>... sources)
    
            List<Observable<Integer>> list=new ArrayList<>();
            list.add(Observable.just(1,2,3).delay(1,TimeUnit.SECONDS));
            list.add(Observable.just(4,5,6));
            Observable.amb(list)
                  .subscribe(new Consumer<Integer>() {
                      @Override
                      public void accept(Integer integer) throws Exception {
                          System.out.println(integer);
                      }
                  });
             
             执行结果:
             4
             5
             6        
    
            Observable.ambArray(Observable.just(1,2,3).delay(1,TimeUnit.SECONDS),Observable.just(3,4,5))
                  .subscribe(new Consumer<Integer>() {
                      @Override
                      public void accept(Integer integer) throws Exception {
                          System.out.println(integer);
                      }
                  });
                  
    执行结果:
    4
    5
    6
    

    1.2 defaultIfEmpty

    发射来自原始Observable 的数据,如果原始Observable 没有发射数据,就发射一个默认值。

           Observable.empty()
                   .defaultIfEmpty(6)
                   .subscribe(new Consumer<Object>() {
                       @Override
                       public void accept(Object o) throws Exception {
                           System.out.println(o);
                       }
                   });
            //lamand表达式
            Observable.empty()
                    .defaultIfEmpty(6)
                    .subscribe(o -> System.out.println(o));
                    
            执行结果:        
            6
    

    defaultIfEmpty和switchIfEmpty的区别是:

    • defaultIfEmpty操作符只能在被观察者不发送数据时发射一个默认的数据
    • switchIfEmpty操作符在被观察者不发送数据时可以发射更多的数据
           Observable.empty()
                   .switchIfEmpty(Observable.just(1,2,3))
                   .subscribe(new Consumer<Object>() {
                       @Override
                       public void accept(Object o) throws Exception {
                           System.out.println(o);
                       }
                   });
            //lamand表达式
            Observable.empty()
                    .switchIfEmpty(Observable.just(1,2,3))
                    .subscribe(o -> System.out.println(o));
                    
                    
    执行结果:
    1
    2
    3
    

    1.3 skipUntil

    丢弃原始Observable发射的数据,直到第二个Observable发射了一项数据

    默认不在任何特定的调度器

           Observable.intervalRange(1,9,0,1,TimeUnit.SECONDS)
                   .skipUntil(Observable.timer(4,TimeUnit.SECONDS))
                   .subscribe(new Consumer<Long>() {
                       @Override
                       public void accept(Long aLong) throws Exception {
                           System.out.println(aLong);
                       }
                   });
    
    
            //lamand表达式
            Observable.intervalRange(1,9,0,1,TimeUnit.SECONDS)
                    .skipUntil(Observable.timer(4,TimeUnit.SECONDS))
                    .subscribe(aLong -> System.out.println(aLong));
    
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            执行结果:
            5
            6
            7
            8
            9
    

    1.4 skipWhile

    丢弃Observable发射的数据,直到一个指定的条件不成立

    默认不在任何特定的调度器

           Observable.just(1,2,3,4,5)
                   .skipWhile(new Predicate<Integer>() {
                       @Override
                       public boolean test(Integer integer) throws Exception {
                           return integer<=2;
                       }
                   })
                   .subscribe(new Consumer<Integer>() {
                       @Override
                       public void accept(Integer integer) throws Exception {
                           System.out.println(integer);
                       }
                   });
    
    	执行结果:
    	3
    	4
    	5
    
            //lamand表达式
            Observable.just(1,2,3,4,5)
                    .skipWhile(integer -> integer>3)
                    .subscribe(integer -> System.out.println(integer));
    	执行结果:
        1
        2
        3
        4
        5
    

    1.5 takeUntil

    当第二个Observable发射了一项数据或者终止时,丢弃原始Observable发射的任何数据。

    默认不在任何特定的调度器。

    有两个构造函数

    第一个:

    Observable<T> takeUntil(ObservableSource<U> other)

           Observable.intervalRange(1,9,0,1,TimeUnit.SECONDS)
                   .takeUntil(Observable.timer(4,TimeUnit.SECONDS))
                   .subscribe(new Consumer<Long>() {
                       @Override
                       public void accept(Long aLong) throws Exception {
                           System.out.println(aLong);
                       }
                   });
    
            //lamand表达式
            Observable.intervalRange(1,9,0,1,TimeUnit.SECONDS)
                    .takeUntil(Observable.timer(4,TimeUnit.SECONDS))
                    .subscribe(aLong -> System.out.println(aLong));
    
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
    		执行结果:
    		1
    		2
    		3
    		4
    		5
    

    第二个:

    Observable<T> takeUntil(Predicate<? super T> stopPredicate)

           Observable.just(1,2,3,4,5)
                   .takeUntil(new Predicate<Integer>() {
                       @Override
                       public boolean test(Integer integer) throws Exception {
                           return integer>3;
                       }
                   })
                   .subscribe(new Consumer<Integer>() {
                       @Override
                       public void accept(Integer integer) throws Exception {
                           System.out.println(integer);
                       }
                   });
                   
    		执行结果:
            1
            2
            3
            4
    
            //lamand表达式
            Observable.just(1,2,3,4,5)
                    .takeUntil(integer -> integer<3)
                    .subscribe(integer -> System.out.println(integer));
    		
           执行结果:
           1
    

    分析:依次发送1,2,3,4,5 在第一个函数中当发送数据4时,takeUntil内部函数integer>3返回true 所以源数据就不发送了。在第二个函数中,当发射第一个数据1时,takeUntil内部函数integer<3返回true ,所以后面的源数据就都不发送了

    1.6 takeWhile

    发射原始Observable发射的数据,直到一个指定的条件不成立

    默认不在任何特定的调度器

           Observable.just(1,2,3,4,5)
                   .takeWhile(new Predicate<Integer>() {
                       @Override
                       public boolean test(Integer integer) throws Exception {
                           return integer<3;
                       }
                   })
                   .subscribe(new Consumer<Integer>() {
                       @Override
                       public void accept(Integer integer) throws Exception {
                           System.out.println(integer);
                       }
                   });
    
    	执行结果:
    	1
    	2
    
            //lamand表达式
            Observable.just(1,2,3,4,5)
                    .takeWhile(integer -> integer>3)
                    .subscribe(integer -> System.out.println(integer));
    	执行结果:
        因为第一个发射数据1<3   所以后面的数据都丢弃了  
    

    2 布尔操作符

    2.1 all()

    判断Observable发射的所有数据是否都满足某个条件

    all操作符默认不在任何特定的调度器上执行

            Observable.just(1,2,3,4,5)
                    .all(new Predicate<Integer>() {
                        @Override
                        public boolean test(Integer integer) throws Exception {
                            return integer<4;
                        }
                    })
                    .subscribe(new Consumer<Boolean>() {
                        @Override
                        public void accept(Boolean aBoolean) throws Exception {
                            System.out.println(aBoolean);
                        }
                    });
    
            //lamand表达式
            Observable.just(1,2,3,4,5)
                    .all(integer -> integer<4)
                    .subscribe(aBoolean -> System.out.println(aBoolean));
                    
            执行结果:
            false
    

    2.2 contains

    contains操作符将接收一个特定的值作为一个参数,判定原Observable是否发射该值,若已发射,则创建并返回的Observable将发射true,否则发射false。
    判断在发射的所有数据项中是否包含指定的数据,内部调用的其实是exists

    contains操作符默认不在任何特定的调度器上执行

            Observable.just(1,2,3,4,5)
                    .contains(2)
                    .subscribe(new Consumer<Boolean>() {
                        @Override
                        public void accept(Boolean aBoolean) throws Exception {
                            System.out.println(aBoolean);
                        }
                    });
    
            //lamand表达式
            Observable.just(1,2,3,4,5)
                    .contains(2)
                    .subscribe(aBoolean -> System.out.println(aBoolean));
           
           执行结果:
           true
    

    2.4 isEmpty

    isEmpty操作符用于判定原始Observable是否没有发射任何数据。若原Observable未发射任何数据,创建并返回一个发射true的Observable,否则返回一个发射false的Observable。

        public static void main(String[] args) {
           Observable.just(1,2,3,4,5)
                  .isEmpty()
                   .subscribe(new Consumer<Boolean>() {
                       @Override
                       public void accept(Boolean aBoolean) throws Exception {
                           System.out.println(aBoolean);
                       }
                   });
    
    
            //lamand表达式
            Observable.empty()
                    .isEmpty()
                    .subscribe(aBoolean -> System.out.println(aBoolean));
                    
    		执行结果:
            false
            true
    

    2.5 sequenceEqual

    判断两个Observable是否发射相同的数据序列

    默认不在任何特定的调度器上

           Observable.sequenceEqual(
                   Observable.just(1,2,3,4),
                   Observable.just(1,2,3,4)
           ).subscribe(new Consumer<Boolean>() {
               @Override
               public void accept(Boolean aBoolean) throws Exception {
                   System.out.println(aBoolean);
               }
           });
    
            //lamand表达式
            Observable.sequenceEqual(Observable.just(1,2,3),Observable.just(1,2,3,4))
                    .subscribe(aBoolean -> System.out.println(aBoolean));
                    
    执行结果:
    true
    false
    

    带三个参数:可以传递一个函数用于比较两个数据项是否相同,对于复杂对象的比较,用三个参数的版本更为合适。

    <T> Single<Boolean> sequenceEqual(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2,        BiPredicate<? super T, ? super T> isEqual)
    
           Observable.sequenceEqual(
                   Observable.just(1, 2, 3, 4),
                   Observable.just(1, 2, 3, 4),
                   new BiPredicate<Integer, Integer>() {
                       @Override
                       public boolean test(Integer integer, Integer integer2) throws Exception {
                           return integer==integer2;
                       }
                   }
           ).subscribe(new Consumer<Boolean>() {
               @Override
               public void accept(Boolean aBoolean) throws Exception {
                   System.out.println(aBoolean);
               }
           });
    
            //lamand表达式
            Observable.sequenceEqual(
                    Observable.just(1, 2, 3), 
                    Observable.just(2, 3, 4),
                    (integer, integer2) -> (integer+1)==integer2
            ).subscribe(aBoolean -> System.out.println(aBoolean));
            
            执行结果:
            true
            true
    
  • 相关阅读:
    word文档的图片怎么保存到Web编辑器上
    如何在linux下查看gpu信息

    lua注释
    readDouble
    ..在lua中运用
    C++条件分支结构
    C++条件分支结构
    Flink基础(十六):Table API 和 Flink SQL(一)整体介绍
    Flink基础(十四):DS简介(14) 搭建Flink运行流式应用
  • 原文地址:https://www.cnblogs.com/wangjiaghe/p/11890854.html
Copyright © 2011-2022 走看看