zoukankan      html  css  js  c++  java
  • RxJava简介

    RxJava简介

    本文为前段时间学习RxJava时留下的历史遗留笔记,仅作纪念,科学的大神教学帖子在这里-> 给 Android 开发者的 RxJava 详解

    通过链式调用序列实现基于事件流的异步编程库。

    核心思想是把一切重复化的(尤其多重嵌套异步)调用逻辑转换为基于事件(数据)流传递过程的观察者模式。

    优势在于便捷安全的多种线程切换策略,构建异步事件逻辑,通过事件流(数据流)的切面分解实现逻辑平面化,解除多重异步回调的深层嵌套(Callback Hell)。

    可在任意复杂场合下保持O(1)的链式结构复杂度,替换AsyncTask / Handler 等异步接口,而后者的结构复杂度往往是O(n)级的嵌套调用。

    主要概念

    Observable

    被观察者,事件发送方,如杂志

    • create(OnSubscribe) 最基本的事件发送构造器,用于定义事件(异步)发送序列(通过异步API回调观察者方法)

    • from(Iterable / T[]) create的便捷变体方法,从数组或迭代器类型中读取数据

    • just(T..) create的便捷变体方法,从可变长参数中读取数据,事件序列为依次onNext发送每个实参,最后onCompleted()结束

    • filter(Func1<T, Boolean>) 过滤事件,返回false的事件将被抛弃

    • map(Func) 事件参数类型映射变换器,可将输入的事件参数类型通过自定义函数解析变换为另一实体类型

    • flatMap(Func) 事件参数类型分解映射变换器,可将输入的单个事件类型解析、分解、变换为一个新的次级Observable对象(包含可发送不定量异步事件),直观上是把每个单一事件映射为每一组新的(异步)事件,用于事件分解或插入新的异步操作

    • lift(Operator) 一切操作器(map、flatMap等)的实际工作函数,RxJava不推荐直接使用,仅限理解工作原理

    • compose(Transformer) 作业编排器,用于将多重Observable变换过程(map、flatMap、lift等组合)通用化为单一可重用作业对象,减少代码重复

    • subscribe(Observer / Action..) 执行订阅,或者称为subscribeBy被订阅更合乎思维逻辑,指定事件接收器

    • doOnSubscribe(Action) 预初始化,基于lift方法在subscribe调用时upstream回溯过程中被执行,运行在subscribeOn指定的线程中

    • doOnNext(Action) Subscriber.onNext执行前进行额外的操作,无返回

    map、flatMap等变换通过lift函数实现,将产生一个包裹前一级observable对象的新observable(及对应的包裹subscriber),进行逐级调用上层OnSubscribe发送事件

    • subscribeOn(Scheduler) 指定发送事件的线程,影响doOnSubscribe以及Observable.create代码块(onSubscribe.call())。基于OnSubscribe.call()进行线程切换。
    • observeOn(Scheduler) 指定Observer(或下级变换处理产生的包裹Observer)执行事件处理逻辑(onNext或事件变换等)的线程,可依据需要多次指定切换每个处理过程所工作的线程。基于lift()在Subscriber的动作(onNext等)上进行线程切换。

    subscribeOn指定的事件发送线程从upstream流向downstream过程中维持不变,直到遇到第一个observeOn为止,若不指定observeOn,则事件变换、消费过程将运行在上次指定的subscribeOn线程中。依据upstream回溯流向,subscribeOn从前一个Operator开始改变上游的执行线程。

    Observer

    Observer(观察者,事件接收、处理方,如读者)
    |- Subscriber(订阅者,Observer接口的抽象类,实际上更常用这个类)

    Observer

    • onNext(T) 在构建Observable时定义的OnSubscribe中指定发送事件所需回调的主要方法
    • onComplete() 所有onNext事件发送完毕(序列处理完毕)时进行回调的通知方法
    • onError(Throwable) 事件序列中途处理异常时回调的通知方法,此后不再发送事件

    Subscriber

    • onStart() 预初始化,subscribe()订阅瞬间触发的通知方法,与subscribe()执行时所在线程相同
    • unsubscribe() 取消订阅,用于onPause() onStop() 等方法中,及时解除引用防止内存泄露

    ActionX

    半成品的观察者,仅拥有一个包含X个参数的call方法,用于单独定义Observer中的onNext / onComplete等方法,通过组合方式进行订阅

    Action0 onCompleteAction = new ..{.. call(){ Log(complete) } };
    Action1<Throwable> onErrorAction = new ..{.. call(Throwable e){ Log(error) } };
    Action1<T> onNextAction = new ..{.. call(T t){ Log(next event) } };
    

    FuncX

    变换函数接口,仅拥有一个包含X个参数的call方法,用于定义输入事件类型的变换结果类型,如从String到URL、Bitmap或Observable类型

    Func1<String, Bitmap>{
        // 从Sring类型的路径参数解析出Bitmap对象
        public Bitmap call(String path) {
            .. return bitmap;
        }
    }
    

    Scheduler

    线程调度器,指定每个环节执行的线程

    Schedulers.immediate() 默认调度器,在当前线程下执行
    Schedulers.newThread() 无条件开启新线程执行
    Schedulers.io() 使用IO线程池执行,线程池大小无上限
    Schedulers.computation() 使用计算型线程池执行,线程池大小为CPU核心数量
    AndroidSchedulers.mainThread() Android的主线程(UI线程)
    

    发布事件流程

    observable.subscribe(observer/subscriber)  // 注册观察者
     -> subscriber.onStart  // 观察者onStart方法在subscribe()相同线程上被立即执行
     -> this.onSubscribe.call(subscriber)  // 调用创建observable时构建的onSubscribe对象的call方法
    
          // onSubscribe.call方法内,在upstream时通过subscribeOn指定的线程执行事件发送
          // onNext内部的事件消费代码则在observeOn指定的线程中执行
          -> subscriber.onNext(event); ..
          -> subscriber.onCompleted();
    

    使用方法小例

    Observer订阅:
    Observable.create(onSubscribe).subscribe(observer);

    ActionX订阅:
    observable.subscribe(onNextAction[, onErrorAction[, onCompleteAction]]);

    doOnSubscribe初始化:

    observable.
    .subscribeOn(Schedulers.io()) // 指定 doOnSubscribe() 在 IO 线程初始化
    .doOnSubscribe(new Action..)
    .subscribeOn(AndroidSchedulers.mainThread()) // 指定在主线程发送事件
    .subscribe(subscriber);
    

    Scheduler调度:

    observable
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
    .subscribe(..);
    

    map变换:

    Observable
    .just(pathStr) // 事件类型为String
    .map(new Func1<String, Bitmap>() { .. }) // 事件类型变换为Bitmap
    .subscribe(new Action1<Bitmap>() { .. }); // 接收处理Bitmap类型事件
    

    Scheduler多次调度:

    observable
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
    .observeOn(Schedulers.newThread()) // 指定下级map变换的线程为新开线程
    .map(..) // 在新线程中变换
    .observeOn(Schedulers.io()) // 指定下级map变换的线程为IO线程
    .map(..) // 在IO线程中变换
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
    .subscribe(..);
    

    compose作业编排:

    Transformer<ParamType, ResultType> processAll = new ..{
        public Observable<ResultType> call(Observable<ParamType> observable) {
            return observable.map1().map2().flatMap1().lift1()..;
        }
    }
    

    // 以下每个observable都将执行processAll定义的变换过程

    observable1.compose(processAll).subscribe(observer1);
    observable2.compose(processAll).subscribe(observer2);
    observable3.compose(processAll).subscribe(observer3);
    observable4.compose(processAll).subscribe(observer4);
    

    相关其他开源库

    • Retrofit异步网络库充分解耦的API,原生内置对RxJava的支持
    • Volley可考虑RxVolley库,相当于RxJava+Volley集成版
    • RxAndroid仅保留AndroidSchedulers,提供一些UI线程等的切换入口
    • RxLifecycle(从RxAndroid分离)提供BindActivity和BindFragment方法,可在Activity或者Fragment结束的时候自动通知被观察者停止发出新的事件。
    • RxBinding(从RxAndroid分离)用于以可扩展的方式绑定View各种事件监听器
    • RxPreferences(从RxAndroid分离)用于监听SharedPreferences的变化
    • RxBus模式,是一种基于RxJava实现的EventBus模式

    参考

    给 Android 开发者的 RxJava 详解

    RxJava学习资料集合

    RxJava官方指南

    Rx组织官方指南:各种事件流操作方法解释列表

    用一张图解释RxJava中的线程控制

  • 相关阅读:
    软件测试理论基础
    使用Pycharm官方统计代码行插件统计代码总行数
    Jmeter的配置文件解析
    python异常整理
    python2与python3的区别
    tomcat的server.xml配置
    异常:Error response from daemon: conflict: unable to delete 6fa48e047721 (cannot be forced)
    前端 -- 定位和z-index
    前端 -- background
    前端 -- 超链接导航栏案例
  • 原文地址:https://www.cnblogs.com/wavky/p/RxJava.html
Copyright © 2011-2022 走看看