zoukankan      html  css  js  c++  java
  • RxJava2.0的使用详解

    RxJava2.0的使用详解


    1,初识RxJava
    RxJava就是一种用Java语言实现的响应式编程,来创建基于事件的异步程序
    RxJava是一个基于事件订阅的异步执行的一个类库,目前比较火的一些技术框架!

    参考资料:
    Github上RxJava的项目地址:
    https://github.com/ReactiveX/RxJava
    技术文档Api:
    http://reactivex.io/RxJava/javadoc/

    RxAndroid,用于 Android 开发:
    https://github.com/ReactiveX/RxAndroid
    简书博客推荐:
    http://www.jianshu.com/p/ba61c047c230
    1.1使用前所添加的依赖(build.gradle):
    compile 'io.reactivex.rxjava2:rxjava:2.1.3'
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'


    1.2作用:
    RxJava的目的就是异步。
    RxJava的特点就是可以非常简便的实现异步调用,可以在逻辑复杂的代码逻辑中以比较轻易的方式实现异步调用。随着逻辑的复杂,需求的更改,代码可依然能保持极强的阅读性


    1.3概念:
    RxJava是利用观察者模式来实现一些列的操作,所以对于观察者模式中的观察者,被观察者,以及订阅、事件需要有一个了解.
    Observable:在观察者模式中称为“被观察者”;
    Observer:观察者模式中的“观察者”,可接收Observable发送的数据;
    subscribe:订阅,观察者与被观察者,通过Observable的subscribe()方法进行订阅;
    Subscriber:也是一种观察者,在2.0中 它与Observer没什么实质的区别,不同的是 Subscriber要与Flowable(也是一种被观察者)联合使用,该部分 内容是2.0新增的,后续文章再介绍。Obsesrver用于订阅Observable,而Subscriber用于订阅Flowable.


    1.4观察者模式的理解:
    A 对象(观察者)对 B 对象(被观察者)的某种变化高度敏感,需要在 B 变化的一瞬间做出反应.
    在程序的观察者模式,观察者不需要时刻盯着被观察者,而是采用注册或者称为订阅的方式,告诉被观察者:我需要你的某某状态,你要在它变化的时候通知我!
    RxJava 有四个基本概念:
    Observable (被观察者)、
    Observer (观察者)、
    subscribe (订阅)
    Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在完成某些操作,获得一些结果后,回调触发事件,即 发出事件来通知 Observer。
    关于回调,如果理解则可以跳过这一段,如果不理解,在RxJava中可以简单的理解为:为了方便Observable和Observer交互,在Observable中,将 Observer对象传入,在完成某些操作后调用Observer对象的方法,此时将触发Observer中具体实现的对应方法。
    注意:Observer是个接口,Observable是个类。

    RxJava中定义的事件方法:
    onNext(),普通事件,按照队列依次进行处理.
    onComplete(),事件队列完结时调用该方法
    onError(),事件处理过程中出现异常时,onError()触发,同时队列终止,不再有事件发出.
    onSubscribe(),RxJava 2.0 中新增的,传递参数为Disposable,可用于切断接收事件
    让Observable (被观察者)开启子线程执行耗操作,完成耗时操作后,触发回调,通知Observer (观察者)进行主线程UI更新

    2,简单使用步骤:
    步骤:
    创建数据发射源,上游Observable
    创建数据接收处,下游Observer
    数据源关联接收处,上游衔接下游!


    3,Observable
    数据发射源,可观察的,被观察的,
    Observable有两种形式启动形式:
    1热启动Observable任何时候都会发送消息,即使没有任何观察者监听它。
    2冷启动Observable只有在至少有一个订阅者的时候才会发送消息

    Observable的几种创建方式:

    01,just()方式
    使用just( ),将创建一个Observable并自动调用onNext( )发射数据。
    也就是通过just( )方式 直接触发onNext(),just中传递的参数将直接在Observer的onNext()方法中接收到。

    02,fromIterable()方式
    使用fromIterable(),遍历集合,发送每个item.多次自动调用onNext()方法,每次传入一个item.
    注意:Collection接口是Iterable接口的子接口,所以所有Collection接口的实现类都可以作为Iterable对象直接传入fromIterable() 方法。

    03,defer()方式
    当观察者订阅时,才创建Observable,并且针对每个观察者创建都是一个新的Observable.
    通过Callable中的回调方法call(),决定使用以何种方式来创建这个Observable对象,当订阅后,发送事件.

    04,interval( )方式
    创建一个按固定时间间隔发射整数序列的Observable,可用作定时器。按照固定时间间隔来调用onNext()方法。

    05,timer( )方式
    通过此种方式创建一个Observable,它在一个给定的延迟后发射一个特殊的值,即表示延迟指定时间后,调用onNext()方法。

    06,range( )方式,range(x,y)
    创建一个发射特定整数序列的Observable,第一个参数x为起始值,第二个y为发送的个数,如果y为0则不发送,y为负数则抛异常。
    range(1,5)
    上述表示发射1到5的数。即调用5次Next()方法,依次传入1-5数字。

    07,repeat( )方式
    创建一个Observable,该Observable的事件可以重复调用。


    部分方法介绍:
    表示下游不关心任何事件,你上游尽管发你的数据
    Disposable subscribe()

    表示下游只关心onNext事件,其他不管
    Disposable subscribe(Consumer<? super T> onNext)

    表示下游只关心onNext事件,onError事件
    Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)

    表示只关心onNext事件,onError事件,onComplete事件
    Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete)

    表示处理所有事件
    subscribe(Observer<? super T> observer)

    4,ObservableEmitter
    Emitter是发射器的意思,就是用来发出事件的,它可以发出三种类型的事件
    通过调用onNext(T value),发出next事件
    通过调用onComplete(),发出complete事件
    通过调用onError(Throwable error),发出error事件

    注意事项:
    onComplete和onError唯一并且互斥
    发送多个onComplete, 第一个onComplete接收到,就不再接收了.
    发送多个onError, 则收到第二个onError事件会导致程序会崩溃.
    不可以随意乱七八糟发射事件,需要满足一定的规则:
    上游可以发送无限个onNext, 下游也可以接收无限个onNext.
    当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送, 而下游收到onComplete事件之后将不再继续接收事件.
    上游发送了一个onError后, 上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收事件.
    上游可以不发送onComplete或onError.
    最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError

    5,Disposable
    一次性,它理解成两根管道之间的一个机关, 当调用它的dispose()方法时, 它就会将两根管道切断, 从而导致下游收不到事件.
    在RxJava中,用它来切断Observer(观察者)与Observable(被观察者)之间的连接,当调用它的dispose()方法时, 它就会将Observer(观察者)与Observable(被观察者)之间的连接切断, 从而导致Observer(观察者)收不到事件。
    注意: 调用dispose()并不会导致上游不再继续发送事件, 上游会继续发送剩余的事件
    我们让上游依次发送1,2,3,complete,4,在下游收到第二个事件之后, 切断水管, 看看运行结果

    Disposable的对象通过观察者获得,具体分为两种方式


    1,Observer接口
    Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
    //此方法接收到Disposable的实例!
    }

    @Override
    public void onNext(String s) {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
    };
    通过创建Observer(观察者)接口,重写onSubscribe方法,当订阅后,建立与Observable(被观察者)的联系后,在onSubscribe(Disposable d)方法中便可以获得Disposable对象。


    2.Consumer等其他函数式接口
    Disposable disposable = Observable.just("你好").subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {

    }
    });
    当调用Observable的subscribe()方法后直接返回一个Disposable 对象


    6,线程控制——Scheduler
    在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。

    Schedulers.immediate():
    直接在当前线程运行,相当于不指定线程。这是默认的Scheduler。

    Schedulers.newThread():
    总是启用新线程,并在新线程执行操作。

    Schedulers.io(): I/O
    操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler。行为模式和newThread()差不多,区别在于io()的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下io()比newThread()更有效率。不要把计算工作放在io()中,可以避免创建不必要的线程。

    Schedulers.computation():
    计算所使用的Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个Scheduler使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在computation()中,否则 I/O 操作的等待时间会浪费 CPU。


    AndroidSchedulers.mainThread(),
    Android专用线程,指定操作在主线程运行。

    如何切换线程呢?RxJava中提供了两个方法:
    subscribeOn() 和 observeOn() ,
    两者的不同点在于:

    subscribeOn(): 指定subscribe()订阅所发生的线程,或者叫做事件产生的线程。

    observeOn(): 指定Observer所运行在的线程,即onNext()执行的线程。或者叫做事件消费的线程。


    7,以Consumer为例,我们可以实现简便式的观察者模式
    Observable.just("hello").subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
    System.out.println(s);
    }
    });
    其中Consumer中的accept()方法接收一个来自Observable的单个值。Consumer就是一个观察者。其他函数式接口可以类似应用


    8,RxJava中的操作符
    01,操作符就是用于在Observable和最终的Observer之间,通过转换Observable为其他观察者对象的过程,修改发出的事件,
    最终将最简洁的数据传递给Observer对象.
    每次调用一次操作符,就进行一次观察者对象的改变,同时将需要传递的数据进行转变,最终Observer对象获得想要的数据。
    以网络加载为例,我们通过Observable开启子线程,进行一些网络请求获取数据的操作,获得到网络数据后,然后通过操作符进行转换,获得我们想要的形式的数据,然后传递给Observer对象

    02,比较常用的操作符:

    map()操作符
    map()操作符,就是把原来的Observable对象转换成另一个Observable对象,同时将传输的数据进行一些灵活的操作,方便Observer获得想要的数据形式。
    举例:
    Observable<Integer> observable = Observable
    .just("hello")
    .map(new Function<String, Integer>() {
    @Override
    public Integer apply(String s) throws Exception {
    return s.length();
    }
    });

    flatMap()操作符
    flatMap()对于数据的转换比map()更加彻底,如果发送的数据是集合,flatmap()重新生成一个Observable对象,并把数据转换成Observer想 要的数据形式。它可以返回任何它想返回的Observable对象。
    举例:
    Observable.just(list)
    .flatMap(new Function<List<String>, ObservableSource<?>>() {
    @Override
    public ObservableSource<?> apply(List<String> strings) throws Exception {
    return Observable.fromIterable(strings);
    }
    });

    filter()操作符
    filter()操作符根据它的test()方法中,根据自己想过滤的数据加入相应的逻辑判断,返回true则表示数据满足条件,返回false则表示数据需要被过滤。
    最后过滤出的数据将加入到新的Observable对象中,方便传递给Observer想要的数据形式。
    举例:
    Observable
    .just(list)
    .flatMap(new Function<List<String>, ObservableSource<?>>() {
    @Override
    public ObservableSource<?> apply(List<String> strings) throws Exception {
    return Observable.fromIterable(strings);
    }
    }).filter(new Predicate<Object>() {
    @Override
    public boolean test(Object s) throws Exception {
    String newStr = (String) s;
    if (newStr.charAt(5) - '0' > 5) {
    return true;
    }
    return false;
    }
    }).subscribe(new Consumer<Object>() {
    @Override
    public void accept(Object o) throws Exception {
    System.out.println((String)o);
    }
    });

    take()操作符
    输出最多指定数量的结果.(接收指定数量的结果)
    举例:
    Observable.just(new ArrayList<String>(){
    {
    for (int i = 0; i < 8; i++) {
    add("data"+i);
    }
    }
    }).flatMap(new Function<List<String>, ObservableSource<?>>() {
    @Override
    public ObservableSource<?> apply(List<String> strings) throws Exception {
    return Observable.fromIterable(strings);
    }
    }).take(5).subscribe(new Consumer<Object>() {
    @Override
    public void accept(Object s) throws Exception {
    DemonstrateUtil.showLogResult(s.toString());
    }
    });

    doOnNext()

    允许我们在每次输出一个元素之前做一些额外的事情
    举例:
    Observable.just(new ArrayList<String>(){
    {
    for (int i = 0; i < 6; i++) {
    add("data"+i);
    }
    }
    }).flatMap(new Function<List<String>, ObservableSource<?>>() {
    @Override
    public ObservableSource<?> apply(List<String> strings) throws Exception {
    return Observable.fromIterable(strings);
    }
    }).take(5).doOnNext(new Consumer<Object>() {
    @Override
    public void accept(Object o) throws Exception {
    DemonstrateUtil.showLogResult("额外的准备工作!");
    }
    }).subscribe(new Consumer<Object>() {
    @Override
    public void accept(Object s) throws Exception {
    DemonstrateUtil.showLogResult(s.toString());
    }
    });


    8,Flowable的理解
    Flowable是一个被观察者,与Subscriber(观察者)配合使用,解决Backpressure问题
    Backpressure(背压)。所谓背压,即生产者的速度大于消费者的速度带来的问题。

    什么情况下才会产生Backpressure问题?
    1.如果生产者和消费者在一个线程的情况下,无论生产者的生产速度有多快,每生产一个事件都会通知消费者,等待消费者消费完毕,再生产下一个事件。
    所以在这种情况下,根本不存在Backpressure问题。即同步情况下,Backpressure问题不存在。
    2.如果生产者和消费者不在同一线程的情况下,如果生产者的速度大于消费者的速度,就会产生Backpressure问题。
    即异步情况下,Backpressure问题才会存在。

    现象演示说明:
    被观察者是事件的生产者,观察者是事件的消费者.假如生产者无限生成事件,而消费者以很缓慢的节奏来消费事件,会造成事件无限堆积,形成背压,最后造成OOM!
    Flowable悠然而生,专门用来处理这类问题。
    Flowable是为了应对Backpressure而产生的。Flowable是一个被观察者,
    与Subscriber(观察者)配合使用,解决Backpressure问题。
    注意:处理Backpressure的策略仅仅是处理Subscriber接收事件的方式,并不影响Flowable发送事件的方法。
    即使采用了处理Backpressure的策略,Flowable原来以什么样的速度产生事件,现在还是什么样的速度不会变化,主要处理的是Subscriber接收事件的方式。

    处理Backpressure问题的策略,或者来解决Backpressure问题

    BackpressureStrategy.ERROR
    如果缓存池溢出,就会立刻抛出MissingBackpressureException异常
    request()用来向生产者申请可以消费的事件数量,这样我们便可以根据本身的消费能力进行消费事件.
    虽然并不限制向request()方法中传入任意数字,但是如果消费者并没有这么多的消费能力,依旧会造成资源浪费,最后产生OOM
    at java.lang.OutOfMemoryError.<init>(OutOfMemoryError.java:33)
    在异步调用时,RxJava中有个缓存池,用来缓存消费者处理不了暂时缓存下来的数据,缓存池的默认大小为128,即只能缓存128个事件。
    无论request()中传入的数字比128大或小,缓存池中在刚开始都会存入128个事件。
    当然如果本身并没有这么多事件需要发送,则不会存128个事件。
    应用举例:

    BackpressureStrategy.BUFFER
    是把RxJava中默认的只能存128个事件的缓存池换成一个大的缓存池,支持存更多的数据.
    消费者通过request()即使传入一个很大的数字,生产者也会生产事件,并将处理不了的事件缓存.
    注意:
    这种方式任然比较消耗内存,除非是我们比较了解消费者的消费能力,能够把握具体情况,不会产生OOM。
    BUFFER要慎用

    BackpressureStrategy.DROP
    顾名思义,当消费者处理不了事件,就丢弃!
    例如,当数据源创建了200个事件,先不进行消费临时进行缓存实际缓存128个,我们第一次申请消费了100个,再次申请消费100个,
    那么实际只消费了128个,而其余的72个被丢弃了!

    BackpressureStrategy.LATEST
    LATEST与DROP功能基本一致,当消费者处理不了事件,就丢弃!
    唯一的区别就是LATEST总能使消费者能够接收到生产者产生的最后一个事件。
    例如,当数据源创建了200个事件,先不进行消费临时进行缓存,我们第一次申请消费了100个,再次申请消费100个,
    那么实际只消费了129个,而其余的71个被丢弃了,但是第200个(最后一个)会被消费.

    BackpressureStrategy.MISSING
    生产的事件没有进行缓存和丢弃,下游接收到的事件必须进行消费或者处理!

    在RxJava中会经常遇到一种情况就是被观察者发送消息十分迅速以至于观察者不能及时的响应这些消息
    举例:
    Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
    while (true){
    e.onNext(1);
    }
    }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
    Thread.sleep(2000);
    System.out.println(integer);
    }
    });
    被观察者是事件的生产者,观察者是事件的消费者。上述例子中可以看出生产者无限生成事件,而消费者每2秒才能消费一个事件,这会造成事件无限堆积,最后造成OOM。
    Flowable就是由此产生,专门用来处理这类问题

    代码实现:

      1 public class RxJavaDemo1Activity extends AppCompatActivity implements View.OnClickListener {
      2 
      3     protected Button btnSend1;
      4     protected Button btnSend2;
      5     protected Button btnSend3;
      6     protected Button btnSend4;
      7     protected Button btnSend5;
      8     protected Button btnSend6;
      9 
     10     @Override
     11     protected void onCreate(Bundle savedInstanceState) {
     12         super.onCreate(savedInstanceState);
     13         super.setContentView(R.layout.activity_rx_java_demo1);
     14         initView();
     15     }
     16 
     17     @Override
     18     public void onClick(View view) {
     19         if (view.getId() == R.id.btn_send1) {
     20             test1();//普通使用
     21         } else if (view.getId() == R.id.btn_send2) {
     22             test2();//链式调用
     23         } else if (view.getId() == R.id.btn_send3) {
     24             test3();//发送中,中断.
     25         } else if (view.getId() == R.id.btn_send4) {
     26             test4();//只关心onnext事件的操作
     27         } else if (view.getId() == R.id.btn_send5) {
     28             test5();//几种被观察者的创建方式
     29         } else if (view.getId() == R.id.btn_send6) {
     30             test6();//常用的操作符
     31         }
     32     }
     33 
     34     private void test6() {
     35         DialogUtil.showListDialog(this, "rxjava的操作符号使用", new String[]{
     36                 "0map()操作符",
     37                 "1flatMap()操作符",
     38                 "2filter()操作符",
     39                 "3take()操作符",
     40                 "4doOnNext()操作符",
     41         }, new DialogInterface.OnClickListener() {
     42             @Override
     43             public void onClick(DialogInterface dialog, int which) {
     44                 switch (which) {
     45                     case 0:
     46                         map0();
     47                         break;
     48                     case 1:
     49                         map1();
     50                         break;
     51                     case 2:
     52                         map2();
     53                         break;
     54                     case 3:
     55                         map3();
     56                         break;
     57                     case 4:
     58                         map4();
     59                         break;
     60                 }
     61             }
     62         });
     63     }
     64 
     65     private void map4() {
     66         Observable.just(new ArrayList<String>(){
     67             {
     68                 for (int i = 0; i < 6; i++) {
     69                     add("data"+i);
     70                 }
     71             }
     72         }).flatMap(new Function<List<String>, ObservableSource<?>>() {
     73             @Override
     74             public ObservableSource<?> apply(List<String> strings) throws Exception {
     75                 return Observable.fromIterable(strings);
     76             }
     77         }).take(5).doOnNext(new Consumer<Object>() {
     78             @Override
     79             public void accept(Object o) throws Exception {
     80                 DemonstrateUtil.showLogResult("额外的准备工作!");
     81             }
     82         }).subscribe(new Consumer<Object>() {
     83             @Override
     84             public void accept(Object s) throws Exception {
     85                 DemonstrateUtil.showLogResult(s.toString());
     86             }
     87         });
     88     }
     89 
     90     private void map3() {
     91         Observable.just(new ArrayList<String>(){
     92             {
     93                 for (int i = 0; i < 8; i++) {
     94                     add("data"+i);
     95                 }
     96             }
     97         }).flatMap(new Function<List<String>, ObservableSource<?>>() {
     98             @Override
     99             public ObservableSource<?> apply(List<String> strings) throws Exception {
    100                 return Observable.fromIterable(strings);
    101             }
    102         }).take(10).subscribe(new Consumer<Object>() {
    103             @Override
    104             public void accept(Object s) throws Exception {
    105                 DemonstrateUtil.showLogResult(s.toString());
    106             }
    107         });
    108     }
    109 
    110     private void map2() {
    111         Observable
    112                 .just(new ArrayList<String>(){
    113                     {
    114                         for (int i = 0; i < 5; i++) {
    115                             add("data"+i);
    116                         }
    117                     }
    118                 })
    119                 .flatMap(new Function<List<String>, ObservableSource<?>>() {
    120                     @Override
    121                     public ObservableSource<?> apply(List<String> strings) throws Exception {
    122                         return Observable.fromIterable(strings);
    123                     }
    124                 }).filter(new Predicate<Object>() {
    125             @Override
    126             public boolean test(Object s) throws Exception {
    127                 String newStr = (String) s;
    128                 if (newStr.contains("3")){
    129                     return true;
    130                 }
    131                 return false;
    132             }
    133         }).subscribe(new Consumer<Object>() {
    134             @Override
    135             public void accept(Object o) throws Exception {
    136                 DemonstrateUtil.showLogResult((String)o);
    137             }
    138         });
    139     }
    140 
    141     private void map1() {
    142         Observable.just(new ArrayList<String>(){
    143             {
    144                 for (int i = 0; i < 3; i++) {
    145                     add("data"+i);
    146                 }
    147             }
    148         }).flatMap(new Function<List<String>, ObservableSource<?>>() {
    149             @Override
    150             public ObservableSource<?> apply(List<String> strings) throws Exception {
    151                 return Observable.fromIterable(strings);
    152             }
    153         }).subscribe(new Observer<Object>() {
    154             @Override
    155             public void onSubscribe(Disposable d) {
    156 
    157             }
    158 
    159             @Override
    160             public void onNext(Object o) {
    161                 DemonstrateUtil.showLogResult("flatMap转换后,接收到的"+o);
    162             }
    163 
    164             @Override
    165             public void onError(Throwable e) {
    166 
    167             }
    168 
    169             @Override
    170             public void onComplete() {
    171 
    172             }
    173         });
    174     }
    175 
    176     private void map0() {
    177         Observable.just("hellorxjava")
    178                 .map(new Function<String, Integer>() {
    179                     @Override
    180                     public Integer apply(String s) throws Exception {
    181                         return s.length();
    182                     }
    183                 }).subscribe(new Observer<Integer>() {
    184             @Override
    185             public void onSubscribe(Disposable d) {
    186 
    187             }
    188 
    189             @Override
    190             public void onNext(Integer integer) {
    191                 DemonstrateUtil.showLogResult("接收到被转换的数据结果:"+integer);
    192             }
    193 
    194             @Override
    195             public void onError(Throwable e) {
    196 
    197             }
    198 
    199             @Override
    200             public void onComplete() {
    201 
    202             }
    203         });
    204     }
    205 
    206     private void test5() {
    207         DialogUtil.showListDialog(this, "rxjava的其他操作", new String[]{
    208                 "0just()方式创建Observable",
    209                 "1fromIterable()方式创建Observable",
    210                 "2defer()方式创建Observable",
    211                 "3interval( )方式创建Observable",
    212                 "4timer( )方式创建Observable",
    213                 "5range( )方式创建Observable",
    214                 "6repeat( )方式创建Observable",
    215         }, new DialogInterface.OnClickListener() {
    216             @Override
    217             public void onClick(DialogInterface dialog, int which) {
    218                 switch (which) {
    219                     case 0:
    220                         other0();
    221                         break;
    222                     case 1:
    223                         other1();
    224                         break;
    225                     case 2:
    226                         other2();
    227                         break;
    228                     case 3:
    229                         other3();
    230                         break;
    231                     case 4:
    232                         other4();
    233                         break;
    234                     case 5:
    235                         other5();
    236                         break;
    237                     case 6:
    238                         other6();
    239                         break;
    240                 }
    241             }
    242         });
    243     }
    244 
    245     private void other6() {
    246         Observable.just(123).repeat().subscribe(new Observer<Integer>() {
    247             @Override
    248             public void onSubscribe(Disposable d) {
    249 
    250             }
    251 
    252             @Override
    253             public void onNext(Integer integer) {
    254                 DemonstrateUtil.showLogResult("重复integer" + integer);
    255             }
    256 
    257             @Override
    258             public void onError(Throwable e) {
    259 
    260             }
    261 
    262             @Override
    263             public void onComplete() {
    264 
    265             }
    266         });
    267     }
    268 
    269     private void other5() {
    270         Observable.range(1, 5).subscribe(new Observer<Integer>() {
    271             @Override
    272             public void onSubscribe(Disposable d) {
    273 
    274             }
    275 
    276             @Override
    277             public void onNext(Integer integer) {
    278                 DemonstrateUtil.showLogResult("连续收到:" + integer);
    279             }
    280 
    281             @Override
    282             public void onError(Throwable e) {
    283 
    284             }
    285 
    286             @Override
    287             public void onComplete() {
    288 
    289             }
    290         });
    291     }
    292 
    293     private void other4() {
    294         Observable.timer(5, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
    295             @Override
    296             public void onSubscribe(Disposable d) {
    297 
    298             }
    299 
    300             @Override
    301             public void onNext(Long aLong) {
    302                 DemonstrateUtil.showLogResult("延迟5s后调用了:onNext");
    303             }
    304 
    305             @Override
    306             public void onError(Throwable e) {
    307 
    308             }
    309 
    310             @Override
    311             public void onComplete() {
    312 
    313             }
    314         });
    315     }
    316 
    317     private void other3() {
    318         Observable.interval(3, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
    319             @Override
    320             public void onSubscribe(Disposable d) {
    321 
    322             }
    323 
    324             @Override
    325             public void onNext(Long aLong) {
    326                 DemonstrateUtil.showLogResult("数字是:" + aLong);
    327                 //DemonstrateUtil.showToastResult(RxJavaDemo1Activity.this,"数字是:"+aLong);
    328             }
    329 
    330             @Override
    331             public void onError(Throwable e) {
    332 
    333             }
    334 
    335             @Override
    336             public void onComplete() {
    337 
    338             }
    339         });
    340     }
    341 
    342     private void other2() {
    343         Observable<String> observable = Observable.defer(new Callable<ObservableSource<? extends String>>() {
    344             @Override
    345             public ObservableSource<? extends String> call() throws Exception {
    346                 return Observable.just("hello,defer");
    347             }
    348         });
    349 
    350         //上游衔接下游!
    351         observable.subscribe(new Observer<String>() {
    352             @Override
    353             public void onSubscribe(Disposable d) {
    354 
    355             }
    356 
    357             @Override
    358             public void onNext(String s) {
    359                 DemonstrateUtil.showLogResult(s);
    360                 DemonstrateUtil.showToastResult(RxJavaDemo1Activity.this, s);
    361             }
    362 
    363             @Override
    364             public void onError(Throwable e) {
    365 
    366             }
    367 
    368             @Override
    369             public void onComplete() {
    370 
    371             }
    372         });
    373     }
    374 
    375     private void other1() {
    376         Observable.fromIterable(new ArrayList<String>() {
    377             {
    378                 for (int i = 0; i < 5; i++) {
    379                     add("Hello," + i);
    380                 }
    381             }
    382         }).subscribe(new Observer<String>() {
    383             @Override
    384             public void onSubscribe(Disposable d) {
    385 
    386             }
    387 
    388             @Override
    389             public void onNext(String s) {
    390                 DemonstrateUtil.showToastResult(RxJavaDemo1Activity.this, s);
    391                 DemonstrateUtil.showLogResult(s);
    392             }
    393 
    394             @Override
    395             public void onError(Throwable e) {
    396 
    397             }
    398 
    399             @Override
    400             public void onComplete() {
    401 
    402             }
    403         });
    404     }
    405 
    406     private void other0() {
    407         Observable.just("hello,you hao!").subscribe(new Observer<String>() {
    408             @Override
    409             public void onSubscribe(Disposable d) {
    410 
    411             }
    412 
    413             @Override
    414             public void onNext(String s) {
    415                 DemonstrateUtil.showLogResult(s);
    416                 DemonstrateUtil.showToastResult(RxJavaDemo1Activity.this, s);
    417             }
    418 
    419             @Override
    420             public void onError(Throwable e) {
    421 
    422             }
    423 
    424             @Override
    425             public void onComplete() {
    426 
    427             }
    428         });
    429     }
    430 
    431     private void test4() {
    432         Observable.create(new ObservableOnSubscribe<Integer>() {
    433             @Override
    434             public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
    435                 DemonstrateUtil.showLogResult("emitter 1");
    436                 emitter.onNext(1);
    437 
    438                 DemonstrateUtil.showLogResult("emitter 2");
    439                 emitter.onNext(2);
    440 
    441                 DemonstrateUtil.showLogResult("emitter 3");
    442                 emitter.onNext(3);
    443 
    444                 DemonstrateUtil.showLogResult("complete");
    445                 emitter.onComplete();
    446 
    447                 DemonstrateUtil.showLogResult("emitter 4");
    448                 emitter.onNext(4);
    449             }
    450         }).subscribe(new Consumer<Integer>() {
    451             @Override
    452             public void accept(Integer integer) throws Exception {
    453                 DemonstrateUtil.showLogResult("accept:" + integer);
    454             }
    455         });
    456     }
    457 
    458     private void test3() {
    459         Observable.create(new ObservableOnSubscribe<Integer>() {
    460             @Override
    461             public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
    462                 DemonstrateUtil.showLogResult("emitter 1");
    463                 emitter.onNext(1);
    464 
    465                 DemonstrateUtil.showLogResult("emitter 2");
    466                 emitter.onNext(2);
    467 
    468                 DemonstrateUtil.showLogResult("emitter 3");
    469                 emitter.onNext(3);
    470 
    471                 DemonstrateUtil.showLogResult("complete");
    472                 emitter.onComplete();
    473 
    474                 DemonstrateUtil.showLogResult("emitter 4");
    475                 emitter.onNext(4);
    476             }
    477         }).subscribe(new Observer<Integer>() {
    478             private Disposable mDisposable;
    479             private int i;
    480 
    481             @Override
    482             public void onSubscribe(Disposable d) {
    483                 DemonstrateUtil.showLogResult("subscribe");
    484                 mDisposable = d;
    485             }
    486 
    487             @Override
    488             public void onNext(Integer value) {
    489                 DemonstrateUtil.showLogResult("onNext:" + value);
    490                 i++;
    491                 if (i == 2) {
    492                     DemonstrateUtil.showLogResult("dispose:" + value);
    493                     mDisposable.dispose();
    494                     DemonstrateUtil.showLogResult("isDisposed : " + mDisposable.isDisposed());
    495                 }
    496             }
    497 
    498             @Override
    499             public void onError(Throwable e) {
    500                 DemonstrateUtil.showLogResult("error:");
    501             }
    502 
    503             @Override
    504             public void onComplete() {
    505                 DemonstrateUtil.showLogResult("complete");
    506             }
    507         });
    508 
    509     }
    510 
    511     private void test2() {
    512         //链式调用
    513         Observable.create(new ObservableOnSubscribe<Integer>() {
    514             @Override
    515             public void subscribe(ObservableEmitter<Integer> e) throws Exception {
    516                 e.onNext(1);
    517                 e.onNext(2);
    518                 e.onNext(3);
    519             }
    520         }).subscribe(new Observer<Integer>() {
    521 
    522             @Override
    523             public void onSubscribe(Disposable d) {
    524                 DemonstrateUtil.showLogResult("onSubscribe");
    525             }
    526 
    527             @Override
    528             public void onNext(Integer integer) {
    529                 DemonstrateUtil.showLogResult("onNext-->integer" + integer);
    530             }
    531 
    532             @Override
    533             public void onError(Throwable e) {
    534                 DemonstrateUtil.showLogResult("onError");
    535             }
    536 
    537             @Override
    538             public void onComplete() {
    539                 DemonstrateUtil.showLogResult("onComplete");
    540             }
    541         });
    542     }
    543 
    544     private void test1() {
    545 
    546         //创建上游,数据发射源!
    547         //ObservableOnSubscribe对象作为参数,它的作用相当于一个计划表,当 Observable被订阅的时候,
    548         // ObservableOnSubscribe的subscribe()方法会自动被调用,事件序列就会依照设定依次触发
    549         //ObservableEmitter,发射器,触发事件.
    550         Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
    551 
    552             @Override
    553             public void subscribe(ObservableEmitter<Integer> e) throws Exception {
    554                 e.onNext(1);
    555                 e.onNext(2);
    556                 e.onNext(3);
    557             }
    558         });
    559 
    560         //创建下游,数据接收处!
    561         Observer<Integer> observer = new Observer<Integer>() {
    562 
    563             @Override
    564             public void onSubscribe(Disposable d) {
    565                 DemonstrateUtil.showLogResult("onSubscribe");
    566             }
    567 
    568             @Override
    569             public void onNext(Integer integer) {
    570                 DemonstrateUtil.showLogResult("onNext--integer" + integer);
    571             }
    572 
    573             @Override
    574             public void onError(Throwable e) {
    575                 DemonstrateUtil.showLogResult("onError");
    576             }
    577 
    578             @Override
    579             public void onComplete() {
    580                 DemonstrateUtil.showLogResult("onComplete");
    581             }
    582         };
    583 
    584         //数据源连接接收处,上游衔接下游!
    585         //只有当上游和下游建立连接之后, 上游才会开始发送事件
    586         observable.subscribe(observer);
    587     }
    588 
    589     private void initView() {
    590         btnSend1 = (Button) findViewById(R.id.btn_send1);
    591         btnSend1.setOnClickListener(RxJavaDemo1Activity.this);
    592         btnSend2 = (Button) findViewById(R.id.btn_send2);
    593         btnSend2.setOnClickListener(RxJavaDemo1Activity.this);
    594         btnSend3 = (Button) findViewById(R.id.btn_send3);
    595         btnSend3.setOnClickListener(RxJavaDemo1Activity.this);
    596         btnSend4 = (Button) findViewById(R.id.btn_send4);
    597         btnSend4.setOnClickListener(RxJavaDemo1Activity.this);
    598         btnSend5 = (Button) findViewById(R.id.btn_send5);
    599         btnSend5.setOnClickListener(RxJavaDemo1Activity.this);
    600         btnSend6 = (Button) findViewById(R.id.btn_send6);
    601         btnSend6.setOnClickListener(RxJavaDemo1Activity.this);
    602     }
    603 }
      1 public class RxJavaDemo2Activity extends AppCompatActivity implements View.OnClickListener {
      2 
      3     protected Button btn;
      4     protected ImageView iv;
      5 
      6     @Override
      7     protected void onCreate(Bundle savedInstanceState) {
      8         super.onCreate(savedInstanceState);
      9         super.setContentView(R.layout.activity_rx_java_demo2);
     10         initView();
     11     }
     12 
     13     @Override
     14     public void onClick(View view) {
     15         if (view.getId() == R.id.btn) {
     16             DialogUtil.showListDialog(this, "rxJava操作!", new String[]{
     17                     "0发送事件io线程并变换主线程接收",
     18                     "1子线程发送事件主线程接收",
     19                     "2默认线程发送事件默认线程接收",
     20             }, new DialogInterface.OnClickListener() {
     21                 @Override
     22                 public void onClick(DialogInterface dialog, int which) {
     23                     switch (which) {
     24                         case 0:
     25                             show0();
     26                             break;
     27                         case 1:
     28                             show1();
     29                             break;
     30                         case 2:
     31                             show2();
     32                             break;
     33                     }
     34                 }
     35             });
     36         }
     37     }
     38 
     39     private void show2() {
     40         Observable.create(new ObservableOnSubscribe<Integer>() {
     41             @Override
     42             public void subscribe(ObservableEmitter<Integer> e) throws Exception {
     43                 DemonstrateUtil.showLogResult("发送的线程名称:" + Thread.currentThread().getName());
     44                 DemonstrateUtil.showLogResult("发送的线程id:" + Thread.currentThread().getId());
     45 
     46                 DemonstrateUtil.showLogResult("发送的数据:" + 1);
     47                 e.onNext(1);
     48             }
     49         }).subscribe(new Consumer<Integer>() {
     50             @Override
     51             public void accept(Integer integer) throws Exception {
     52                 DemonstrateUtil.showLogResult("接收的线程:" + Thread.currentThread().getName());
     53                 DemonstrateUtil.showLogResult("接收的线程id:" + Thread.currentThread().getId());
     54                 DemonstrateUtil.showLogResult("接收到的数据:-integer:" + integer);
     55             }
     56         });
     57     }
     58 
     59     private void show1() {
     60         Observable.create(new ObservableOnSubscribe<Integer>() {
     61             @Override
     62             public void subscribe(ObservableEmitter<Integer> e) throws Exception {
     63                 DemonstrateUtil.showLogResult("发送的线程名称:" + Thread.currentThread().getName());
     64                 DemonstrateUtil.showLogResult("发送的线程id:" + Thread.currentThread().getId());
     65 
     66                 DemonstrateUtil.showLogResult("发送的数据:" + 1);
     67                 e.onNext(1);
     68             }
     69         }).subscribeOn(Schedulers.newThread())
     70                 .observeOn(AndroidSchedulers.mainThread())
     71                 .subscribe(new Consumer<Integer>() {
     72                     @Override
     73                     public void accept(Integer integer) throws Exception {
     74                         DemonstrateUtil.showLogResult("接收的线程:" + Thread.currentThread().getName());
     75                         DemonstrateUtil.showLogResult("接收的线程id:" + Thread.currentThread().getId());
     76                         DemonstrateUtil.showLogResult("接收到的数据:-integer:" + integer);
     77                     }
     78                 });
     79     }
     80 
     81     private void show0() {
     82         Observable.create(new ObservableOnSubscribe<Integer>() {
     83             @Override
     84             public void subscribe(ObservableEmitter<Integer> e) throws Exception {
     85                 DemonstrateUtil.showLogResult("所在的线程:", Thread.currentThread().getName());
     86                 DemonstrateUtil.showLogResult("发送的数据:", 1 + "");
     87                 e.onNext(1);
     88             }
     89         }).subscribeOn(Schedulers.io())
     90                 .observeOn(AndroidSchedulers.mainThread())
     91                 .subscribe(new Consumer<Integer>() {
     92                     @Override
     93                     public void accept(Integer integer) throws Exception {
     94                         DemonstrateUtil.showLogResult("所在的线程:", Thread.currentThread().getName());
     95                         DemonstrateUtil.showLogResult("接收到的数据:", "integer:" + integer);
     96                     }
     97                 });
     98     }
     99 
    100     private void initView() {
    101         btn = (Button) findViewById(R.id.btn);
    102         btn.setOnClickListener(RxJavaDemo2Activity.this);
    103         iv = (ImageView) findViewById(R.id.iv);
    104     }
    105 }
      1 public class RxJavaDemo3Activity extends AppCompatActivity implements View.OnClickListener {
      2 
      3     protected Button btnBackpressure;
      4     private Flowable mFlowable;
      5     private Subscriber mSubscriber;
      6     private Subscription mSubscription;
      7     private Flowable flowableLATEST;
      8     private Subscriber subscriberLatest;
      9     private Subscription subscriptionLatest;
     10 
     11     @Override
     12     protected void onCreate(Bundle savedInstanceState) {
     13         super.onCreate(savedInstanceState);
     14         super.setContentView(R.layout.activity_rx_java_demo3);
     15         initView();
     16         init4();
     17         init6();
     18     }
     19 
     20     private void init6() {
     21         flowableLATEST = Flowable.create(new FlowableOnSubscribe<Integer>() {
     22             @Override
     23             public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
     24 
     25                 for (int i = 1; i<=200; i++) {
     26                     emitter.onNext(i);
     27                     DemonstrateUtil.showLogResult("LATEST生产onNext:"+i);
     28                 }
     29             }
     30         }, BackpressureStrategy.LATEST);
     31 
     32         //mSubscription = s;
     33         subscriberLatest = new Subscriber<Integer>() {
     34             @Override
     35             public void onSubscribe(Subscription s) {
     36                 subscriptionLatest = s;
     37                 s.request(100);
     38             }
     39 
     40             @Override
     41             public void onNext(Integer integer) {
     42                 DemonstrateUtil.showLogResult("Latest消费onNext:" + integer);
     43             }
     44 
     45             @Override
     46             public void onError(Throwable t) {
     47                 DemonstrateUtil.showLogResult("onError");
     48                 DemonstrateUtil.showLogResult(t.getMessage());
     49                 DemonstrateUtil.showLogResult(t.toString());
     50             }
     51 
     52             @Override
     53             public void onComplete() {
     54                 DemonstrateUtil.showLogResult("onComplete");
     55             }
     56         };
     57     }
     58 
     59     private void init4() {
     60         mFlowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
     61             @Override
     62             public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
     63 
     64                 for (int i = 1; i <= 200; i++) {
     65                     emitter.onNext(i);
     66                     DemonstrateUtil.showLogResult("生产onNext:"+i);
     67                 }
     68             }
     69         }, BackpressureStrategy.DROP);
     70 
     71         //mSubscription = s;
     72         mSubscriber = new Subscriber<Integer>() {
     73             @Override
     74             public void onSubscribe(Subscription s) {
     75                 mSubscription = s;
     76                 s.request(100);
     77             }
     78 
     79             @Override
     80             public void onNext(Integer integer) {
     81                 DemonstrateUtil.showLogResult("消费onNext:" + integer);
     82             }
     83 
     84             @Override
     85             public void onError(Throwable t) {
     86                 DemonstrateUtil.showLogResult("onError");
     87                 DemonstrateUtil.showLogResult(t.getMessage());
     88                 DemonstrateUtil.showLogResult(t.toString());
     89             }
     90 
     91             @Override
     92             public void onComplete() {
     93                 DemonstrateUtil.showLogResult("onComplete");
     94             }
     95         };
     96 
     97     }
     98 
     99 
    100     @Override
    101     public void onClick(View view) {
    102         if (view.getId() == R.id.btn_backpressure) {
    103             DialogUtil.showListDialog(this, "Flowable的理解使用", new String[]{
    104                     "0事件堆积现象",
    105                     "1正常使用策略ERROR!",
    106                     "2使用策略ERROR出现的异常!",
    107                     "3使用策略BUFFER,更大的缓存池",
    108                     "4使用策略DROP,事件关联100",
    109                     "5使用策略DROP,再申请100",
    110                     "6使用策略LATEST,事件关联100",
    111                     "7使用策略LATEST,再申请100",
    112                     "8使用策略MISSING",
    113             }, new DialogInterface.OnClickListener() {
    114                 @Override
    115                 public void onClick(DialogInterface dialog, int which) {
    116                     switch (which) {
    117                         case 0:
    118                             show0();
    119                             break;
    120                         case 1:
    121                             show1();
    122                             break;
    123                         case 2:
    124                             show2();
    125                             break;
    126                         case 3:
    127                             show3();
    128                             break;
    129                         case 4:
    130                             show4();
    131                             break;
    132                         case 5:
    133                             show5();
    134                             break;
    135                         case 6:
    136                             show6();
    137                             break;
    138                         case 7:
    139                             show7();
    140                             break;
    141                         case 8:
    142                             show8();
    143                             break;
    144                     }
    145                 }
    146             });
    147         }
    148     }
    149 
    150     private void show8() {
    151         Flowable.create(new FlowableOnSubscribe<Integer>() {
    152             @Override
    153             public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
    154                 for (int i = 0; i < 200; i++) {
    155                     DemonstrateUtil.showLogResult("MISSING-生成emitter" + i);
    156                     emitter.onNext(i);
    157                 }
    158             }
    159         }, BackpressureStrategy.MISSING).subscribeOn(Schedulers.io())
    160                 .observeOn(AndroidSchedulers.mainThread())
    161                 .subscribe(new Subscriber<Integer>() {
    162                     @Override
    163                     public void onSubscribe(Subscription s) {
    164                         //mSubscription = s;
    165                         //s.request(0);
    166                     }
    167 
    168                     @Override
    169                     public void onNext(Integer integer) {
    170                         DemonstrateUtil.showLogResult("MISSING-消费onNext" + integer);
    171                     }
    172 
    173                     @Override
    174                     public void onError(Throwable t) {
    175                         DemonstrateUtil.showLogResult("onError" + t.getMessage());
    176                         DemonstrateUtil.showLogResult("onError" + t.toString());
    177                         t.printStackTrace();
    178                     }
    179 
    180                     @Override
    181                     public void onComplete() {
    182                         DemonstrateUtil.showLogResult("onComplete");
    183                     }
    184                 });
    185     }
    186 
    187     private void show7() {
    188         subscriptionLatest.request(100);
    189     }
    190 
    191     private void show6() {
    192         flowableLATEST.subscribeOn(Schedulers.io())
    193                 .observeOn(AndroidSchedulers.mainThread())
    194                 .subscribe(subscriberLatest);
    195     }
    196 
    197     private void show5() {
    198         //128-100-100=  -72.
    199         mSubscription.request(100);
    200     }
    201 
    202     private void show4() {
    203         mFlowable.subscribeOn(Schedulers.io())
    204                 .observeOn(AndroidSchedulers.mainThread())
    205                 .subscribe(mSubscriber);
    206     }
    207 
    208 
    209     private void show3() {
    210         Flowable.create(new FlowableOnSubscribe<Integer>() {
    211             @Override
    212             public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
    213                 for (int i = 0; i < 200; i++) {
    214                     DemonstrateUtil.showLogResult("emitter" + i);
    215                     emitter.onNext(i);
    216                 }
    217             }
    218         }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io())
    219                 .observeOn(AndroidSchedulers.mainThread())
    220                 .subscribe(new Subscriber<Integer>() {
    221                     @Override
    222                     public void onSubscribe(Subscription s) {
    223                         //mSubscription = s;
    224                         //s.request(0);
    225                     }
    226 
    227                     @Override
    228                     public void onNext(Integer integer) {
    229                         DemonstrateUtil.showLogResult("onNext" + integer);
    230                     }
    231 
    232                     @Override
    233                     public void onError(Throwable t) {
    234                         DemonstrateUtil.showLogResult("onError" + t.getMessage());
    235                         DemonstrateUtil.showLogResult("onError" + t.toString());
    236                         t.printStackTrace();
    237                     }
    238 
    239                     @Override
    240                     public void onComplete() {
    241                         DemonstrateUtil.showLogResult("onComplete");
    242                     }
    243                 });
    244     }
    245 
    246     private void show2() {
    247         Flowable.create(new FlowableOnSubscribe<Integer>() {
    248             @Override
    249             public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
    250                 for (int i = 0; i < 200; i++) {
    251                     DemonstrateUtil.showLogResult("emitter" + i);
    252                     emitter.onNext(i);
    253                 }
    254             }
    255         }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
    256                 .observeOn(AndroidSchedulers.mainThread())
    257                 .subscribe(new Subscriber<Integer>() {
    258                     @Override
    259                     public void onSubscribe(Subscription s) {
    260                         //mSubscription = s;
    261                         //s.request(0);
    262                     }
    263 
    264                     @Override
    265                     public void onNext(Integer integer) {
    266                         DemonstrateUtil.showLogResult("onNext" + integer);
    267                     }
    268 
    269                     @Override
    270                     public void onError(Throwable t) {
    271                         DemonstrateUtil.showLogResult("onError" + t.getMessage());
    272                         DemonstrateUtil.showLogResult("onError" + t.toString());
    273                         t.printStackTrace();
    274                     }
    275 
    276                     @Override
    277                     public void onComplete() {
    278                         DemonstrateUtil.showLogResult("onComplete");
    279                     }
    280                 });
    281     }
    282 
    283     private void show1() {
    284         Flowable.create(new FlowableOnSubscribe<Integer>() {
    285             @Override
    286             public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
    287                 for (int i = 0; i < 127; i++) {//128---   0--->126
    288                     DemonstrateUtil.showLogResult("emitter " + i);
    289                     emitter.onNext(i);
    290                 }
    291                 DemonstrateUtil.showLogResult("emitter complete");
    292                 emitter.onComplete();
    293             }
    294         }, BackpressureStrategy.ERROR) //增加了一个参数,设置处理策略.
    295                 .subscribeOn(Schedulers.io())
    296                 .observeOn(AndroidSchedulers.mainThread())
    297                 .subscribe(new Subscriber<Integer>() {
    298                     @Override
    299                     public void onSubscribe(Subscription s) {
    300                         DemonstrateUtil.showLogResult("onSubscribe");
    301                         //用来向生产者申请可以消费的事件数量,这样我们便可以根据本身的消费能力进行消费事件.
    302                         s.request(Long.MAX_VALUE);
    303                     }
    304 
    305                     @Override
    306                     public void onNext(Integer integer) {
    307                         DemonstrateUtil.showLogResult("onNext: " + integer);
    308                     }
    309 
    310                     @Override
    311                     public void onError(Throwable t) {
    312                         DemonstrateUtil.showLogResult("onError: " + t.getMessage());
    313                         DemonstrateUtil.showLogResult("onError: " + t.toString());
    314                         t.printStackTrace();
    315                     }
    316 
    317                     @Override
    318                     public void onComplete() {
    319                         DemonstrateUtil.showLogResult("onComplete: ");
    320                     }
    321                 });
    322     }
    323 
    324     private void show0() {
    325         Observable.create(new ObservableOnSubscribe<Integer>() {
    326             @Override
    327             public void subscribe(ObservableEmitter<Integer> e) throws Exception {
    328                 while (true) {
    329                     for (int i = 0; i < 129; i++) {
    330                         e.onNext(1);
    331                     }
    332                 }
    333             }
    334         }).subscribeOn(Schedulers.io())
    335                 .observeOn(AndroidSchedulers.mainThread())
    336                 .subscribe(new Consumer<Integer>() {
    337                     @Override
    338                     public void accept(Integer integer) throws Exception {
    339                         Thread.sleep(5000);
    340                         DemonstrateUtil.showLogResult("接受到" + integer);
    341                     }
    342                 });
    343     }
    344 
    345     private void initView() {
    346         btnBackpressure = (Button) findViewById(R.id.btn_backpressure);
    347         btnBackpressure.setOnClickListener(RxJavaDemo3Activity.this);
    348     }
    349 }
  • 相关阅读:
    Java中内部类中使用外面变量为什么final修饰?
    Java正则表达式
    Java内部类复习
    MyEclipse建立SpringMVC入门HelloWorld项目
    java中的System类
    java 中的Scanner
    Freemarker判断是否为空
    HQL多种查询实现
    查询功能实现
    EF生成 类型“System.Data.Entity.DbContext”在未被引用的程序集中定义
  • 原文地址:https://www.cnblogs.com/SongYongQian/p/7978653.html
Copyright © 2011-2022 走看看