zoukankan      html  css  js  c++  java
  • RxJava2.0学习笔记1 2018年3月23日 星期五

    参考博文:给初学者的RxJava2.0教程-简书     源码 :https://github.com/ssseasonnn/RxJava2Demo

    1 若是发送多个onError, 则收到第二个onError事件会导致程序会崩溃:

    package com.shiqing.rxjava2;
    import android.os.Bundle; import android.support.v7.app.AppCompatActivity; import android.util.Log; import io.reactivex.*;
    public class MainActivity extends AppCompatActivity { public static final String TAG = "sqrxjava MainActivity"; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); // emitter.onComplete();
              //连续两次执行 onError emitter.onError(new Throwable("抛出异常...1")); emitter.onError(new Throwable("抛出异常...2")); } }); Observer observer = new Observer() { @Override public void onSubscribe(Disposable disposable) { Log.d(TAG, "subscribe"); } @Override public void onNext(Object value) { Log.d(TAG, "onNext value " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "error"); } @Override public void onComplete() { Log.d(TAG, "complete"); } }; observable.subscribe(observer); } }

    崩溃日志:

    ObservableEmitter: Emitter是发射器的意思,那就很好猜了,这个就是用来发出事件的,它可以发出三种类型的事件,通过调用emitter的onNext(T value)onComplete()onError(Throwable error)就可以分别发出next事件、complete事件和error事件。

    但是,请注意,并不意味着你可以随意乱七八糟发射事件,需要满足一定的规则:

    • 上游可以发送无限个onNext, 下游也可以接收无限个onNext.
    • 当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送, 而下游收到onComplete事件之后将不再继续接收事件.
    • 当上游发送了一个onError后, 上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收事件.
    • 上游可以不发送onComplete或onError.
    • 最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然

    注: 关于onComplete和onError唯一并且互斥这一点, 是需要自行在代码中进行控制, 如果你的代码逻辑中违背了这个规则, **并不一定会导致程序崩溃. ** 比如发送多个onComplete是可以正常运行的, 依然是收到第一个onComplete就不再接收了, 但若是发送多个onError, 则收到第二个onError事件会导致程序会崩溃.

    以上几个规则用示意图表示如下:

    只发送onNext事件
     发送onComplete事件  
     发送onError事件  

     

    2 Disposable 好比两根管道之间的一个机关, 当调用它的dispose()方法时, 它就会将两根管道切断, 从而导致下游收不到事件,但是上游仍会继续发送剩余的事件。

    subscribe()有多个重载的方法:

        public final Disposable subscribe() {}
        public final Disposable subscribe(Consumer<? super T> onNext) {}
        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
        public final void subscribe(Observer<? super T> observer) {}

    最后一个带有Observer参数的我们已经使用过了,这里对其他几个方法进行说明.

    • 不带任何参数的subscribe() 表示下游不关心任何事件,你上游尽管发你的数据去吧, 老子可不管你发什么.
    • 带有一个Consumer参数的方法表示下游只关心onNext事件, 其他的事件我假装没看见, 因此我们如果只需要onNext事件可以这么写:
     observable.subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "onNext: " + integer);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    Log.d(TAG, "onError: " + throwable.getMessage());
                }
            }, new Action() {
                @Override
                public void run() throws Exception {
                    Log.d(TAG, "onComplete");
                }
            }, new Consumer<Disposable>() {
                @Override
                public void accept(Disposable disposable) throws Exception {
                    Log.d(TAG, "onSubscribe");
                }
            });

    输出值:

    03-23 07:13:12.438 3997-3997/com.shiqing.rxjava2 D/sqrxjava MainActivity: onSubscribe... 
    03-23 07:13:12.438 3997-3997/com.shiqing.rxjava2 D/sqrxjava MainActivity: onNext... : 1
    03-23 07:13:12.438 3997-3997/com.shiqing.rxjava2 D/sqrxjava MainActivity: onNext... : 2
    03-23 07:13:12.438 3997-3997/com.shiqing.rxjava2 D/sqrxjava MainActivity: onNext... : 3
    03-23 07:13:12.438 3997-3997/com.shiqing.rxjava2 D/sqrxjava MainActivity: onComplete... 

    4 在RxJava中, 已经内置了很多线程选项供我们选择, 例如有

    • Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
    • Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
    • Schedulers.newThread() 代表一个常规的新线程
    • AndroidSchedulers.mainThread() 代表Android的主线程。

    这些内置的Scheduler已经足够满足我们开发的需求, 因此我们应该使用内置的这些选项, 在RxJava内部使用的是线程池来维护这些线程, 所有效率也比较高.

  • 相关阅读:
    三大主流负载均衡软件对比(LVS+Nginx+HAproxy)
    nginx 提示the "ssl" directive is deprecated, use the "listen ... ssl" directive instead
    centos安装nginx并配置SSL证书
    hadoop创建目录文件失败
    The server time zone value 'EDT' is unrecognized or represents more than one time zone.
    脚本启动SpringBoot(jar)
    centos做免密登录
    数据库远程连接配置
    Bash 快捷键
    TCP三次握手四次断开
  • 原文地址:https://www.cnblogs.com/jooy/p/8630238.html
Copyright © 2011-2022 走看看