zoukankan      html  css  js  c++  java
  • Operators一句话介绍(RxJava版)

    Cold Observables

    在第一个subscriber订阅后才执行事件发送的Observables,默认普通Observables都是这个类型
    Cold Observables对于每个订阅的subscriber执行一次事件发送过程的重演,每次事件实体将重新生成,尤其对于每次随机生成的数值将不保证保持一致性
    参考:Observable vs ConnectableObservable

    Hot Observables

    从创建一刻开始立即发送事件,此后进行订阅的subscribers仅能接收在订阅后发送的事件
    Hot Observables发送的事件实体对象在所有subscribers之间进行共享

    Connectable Observables

    connect()调用时进行事件发送动作,不论是否有subscriber执行了订阅。实际上是把该Observable转换为Hot Observable,发送的事件实体对象在所有subscribers之间进行共享

    Creating

    • create / just(T..) / from(Iterable/Array) 常用Observable构建方式
    • defer(Func0) 延迟生成Observable,每次subscribe时生成新的Observable
    • range(start, len) 发送从start开始的数字序列,长度为len,值区间为[start, start+len-1]
    • repeat(n) 非直接构造器,将上个构造好的Observable重复n次发送(前一次Observable发送onCompleted后再启动下一次的重订阅),不传参时表示无限repeat
    • repeatWhen(Func1<Observable<Void>, Observable) 通过函数判断是否进行下一次重订阅,可参考retryWhen(Func1)解释
    • interval(time) 周期性发送一个序号数字(从0开始的正整数序列顺序)
    • timer(time) 在指定时间后发送数字0

    Transforming

    • map(Func1) / cast(class) 将单个事件转型为另一类型事件或据此构造新的次级Observable并汇入单一事件流
    • flatMap / flatMapIterable 依据单个事件构造新的次级Observable或Iterable并汇入单一事件流,flatMap不保证发送顺序 (允许交叉发送)
    • concatMap / switchMap concatMap是有序化版本的flatMap,switchMap是强时效版本的flatMap(下个次级Observable开始发送事件时,前面的次级Observable将自动被终止发送)
    • buffer(count/time) 按数量、时间缓存前后多个事件为一个个小组,再按小组为单位逐次发送
    • window(count/time) 类似于buffer,但缓存的小组以子Observable形式作为事件发送 (需要二次subscribe分发)
    • groupBy(KeyFunc, ValueFunc) 通过函数获得每个事件的Key值进行分组,并将事件转换为Value类型,返回发送GroupedObservable类型事件的包裹Observable (需要二次subscribe分发)
    • scan(Func2(x1, x2)) 前一次实际发送的事件x1与本次该发送的源事件x2通过函数计算,其结果作为本次实际发送的的事件,第0个初始事件不经过函数直接发送

    Filtering

    • filter(Func1) 通过函数判断事件是否可发送
    • throttleWithTimeout(time) / debounce(time) 每次事件发送时启动独立时效计时器,计时结束前有新事件发送则抛弃旧事件并重新启动计时,计时结束后事件未被抛弃才执行实际发出
    • debounce(Func1) 以函数返回的临时Observable的(虚拟)事件发送完毕为时间标记点进行事件有效计时判断。本次事件产生的临时Observable发送结束前未有新的临时Observable产生并发送,则本次事件执行实际发出
    • throttleFirst(time) 发送每个时间周期片段内第一个发送的事件
    • sample(time) / throttleLast(time) 发送每个时间周期片段内最后一个发送的事件
    • distinct 去除事件集合中的所有重复项
    • distinctUntilChanged 去除事件集合中连续重复部分的重复项 (允许ABAB交叉发送)
    • skip(count) / skipLast() 抛弃前面/后面n个事件
    • take(count) / takeLast(count) 仅发送前面/后面n个事件
    • first(Func1) / last(Func1) 仅发送首个、最后一个满足函数条件的事件
    • elementAt(index) 仅发送指定下标位置的事件
    • firstOrDefault() / lastOrDefault() / elementAtOrDefault() 在事件数量不足时,发送一个默认代替事件
    • ofType(class) 仅发送指定子类型的事件
    • ignoreElements() 删除所有事件的发送,仅发送 onCompleted / onError 结束通知

    Combining

    • startWith(T.../Iterable/Observable) 在事件发送前追加优先发送的事件
    • merge(Observable...) 最简单的事件组合操作符,将任意多个Observable发送的事件混合到单一事件流中(类似flatMap),所有Observable发送的事件类型必须相同,允许交叉发送,任一成员Observable发送 onError 时,将终止其他所有Observable的事件发送
    • meregeDelayError(Observable...) onError 事件会缓存到merge结束后发送的版本
    • concat 每个Observable按加入顺序发送的merge版本
    • switchOnNext(Observable<Observable>) 将一个发送多个次级Observable的父Observable转换为单一事件流的Observable(类似flatMap),当下一个子Observable被父Observable发送出来时,前一个子Observable将被终止发送事件 (可视为Observable构造器)
    • zip(Observable..., FuncN) 组合任意多个Observable的事件,每个成员Observable发送的第n次事件进行组合,通过FuncN函数合成出本次(第n次)发送的事件数据,事件合成、发送时机取决于最后一个发送第n次事件的成员Observable,任一成员Observable发送完毕时触发整体发送 onCompleted 事件 (可视为Observable构造器)
    • zipWith(Iterable/Observable, Func2) 组合另一个Observable
    • combineLatest(Observable..., FuncN) 组合任意多个Observable最后一次发送的事件(每个Observable至少发送一次事件才有效),当任一成员Observable发送新事件时,将触发一次FuncN函数将其他各Observable最后一次发送的事件进行组合,返回本次需要实际发送的事件对象
    • join(Observable, leftTimer, rightTimer, resultFunc) 当前Observable与目标Observable进行组合, leftTimer / rightTimer 函数产生一个用于判定时间窗口长度的临时Observable,两个Observable发送的事件在各自窗口期内遇到对方Observable发送新事件,则触发 resultFunc 函数进行两个事件的组合,产生本次发送的事件对象,同一窗口期内遇多次事件发送,则分别进行多次组合发送
    • groupJoin() 返回group组合的Observable对象作为事件的join版本

    Error Handling

    • onErrorReturn(Func1) 发生错误时,发出一个替身事件取代异常抛出
    • onErrorResumeNext(Observable) 发生错误时,使用另一个替身Observable取代当前Observable继续发送事件
    • onExceptionResumeNext(Observable) 发生Exception类型错误时,使用另一个替身Observable取代当前Observable继续发送事件,否则将错误正常抛出到 onError 方法
    • retry(count/Func2) 通过次数或函数判断是否需要重启Observable(重订阅),不满足retry条件则将错误正常抛出到 onError 方法,不传参时表示无条件retry
    • retryWhen(Func1<Observable<Throwable> errorOb, Observable notiOb>) 通过函数判断是否进行下一次重启Observable(重订阅),函数需要返回一个以errorOb为根的Observable对象notiOb,若errorOb本次发送的Throwable事件(从源Observable的onError截获)触发notiOb发送 onNext 事件(发送事件类型无关)则触发本次retry,若notiOb发送 onCompleted / onError 事件则不再触发retry,此时将错误正常抛出到 onError 方法
      可考虑通过errorOb.flatMap进行throwable->Observable的转换,或通过zipWith(Observable.range)实现次数限制等,必须以errorOb作为返回的源Observable,直接返回errorOb时与简单调用retry()效果相同

    参考:对RxJava中.repeatWhen()和.retryWhen()操作符的思考

    Utility

    • delay(time) / delaySubscription(time) 延迟事件发送 / 订阅的过程
    • timeout(time) 对每个发送的事件设置超时计时器,计时结束时下个事件未被发出,则抛出超时异常等操作
    • timeInterval() 将发送的 onNext 事件替换为仅包含距离上次事件发送的时间间隔TimeInterval类型事件
    • timeStamp() 将发送的 onNext 事件与时间戳一起打包成Timestamped类型事件
    • do..(ActionX) 在Observable的生命周期的各个阶段添加回调监听动作
      • doOnEach = doOnNext + doOnError + doOnCompleted
      • doOnNext onNext 发生的时候触发回调
      • doOnError onError 发生的时候触发回调
      • doOnCompleted onCompleted 发生的时候触发回调
      • doOnTerminate doOnError / doOnCompleted 后触发,完成后触发subscriber相关回调
      • finallyDo Observable结束后触发回调
      • doOnSubscribe subscribe 执行时触发回调
      • doOnUnsubscribe unsubscribe / onError / onCompleted 的时候触发回调
        doOnSubscribe -> doOnEach -> doOnNext -> subscriber.onNext -> doOnEach -> doOnCompleted -> doOnUnsubscribe
        doOnEach -> doOnError -> doOnTerminate -> subscriber.onError -> finallyDo
    • materialize() 将每个事件(包括 onCompleted / onError )分别打包为一个Notification类型事件进行发送
    • dematerialize() materialize的反向操作
    • subscribOn/observerOn 线程指定
    • using(Func0<Resource> create, Func1<Resource, Observable> timer, Action1<Resource> dispose) 创建临时性资源,通过timer返回的Observable的终止时间来控制资源有效期,资源到期后回调dispose方法进行销毁
    • single / singleOrDefault 如果Observable发送超过一个事件,则抛出异常或发送指定的事件
    • serialize 强制Observable的所有事件以同步方式发送(杜绝同时发送多个事件的可能性)
    • cache 缓存所有发送过的事件实体对象,用于在将来添加订阅的Subscribers中回放(使用旧的已发送过的事件对象,与ConnectableObservable的replay有区别),默认情况下不使用cache,则新添加订阅的Subscriber会启动新的Observable事件发送过程,使用同一个Observable对象以相同的过程发送新创建的事件对象

    Conditional and Boolean 条件限定

    • all(Func1) 判断所有发送的事件是否符合指定条件,并最终返回发送单一个Boolean类型事件
    • contains(T) 判断是否发送过指定事件
    • exists / isEmpty 判断Observable是否发送过事件
    • sequenceEqual(Observable, Observable) 判断两个Observable发送的事件序列(包括结束状态)是否完全一致
    • amb(Observable...) / ambWith(Observable) 选取率先发送第一个事件的Observable作为源Observable进行事件发送,其他Observable将被抛弃
    • defaultIfEmpty() Observable没有发送事件时,则发送一个指定默认事件
    • skipUntil(Observable mark) 当mark没有发送事件前,忽略源Observable发出的一切事件
    • skipWhile(Func1) 当函数返回false前,忽略Observable发出的一切事件
    • takeUntil / takeWhileskipUntil / skipWhile 相反

    Aggregate 集合

    • concat 每个Observable按加入顺序发送的merge版本
    • count 统计Observable一共会进行发射的事件总数,事件会被抛弃忽略,仅在最后发送一个统计结果事件
    • reduce(Func2(x1, x2))scan相同的运算方式,但仅发送最终的单个结果事件
    • collect(Func0 collector, Action2 collection) 事件收集器,实际运作方式与reduce类似,通过collection方法将每个发送的事件收集到collector方法产生的集合对象,并仅在最后发送这个收集完毕的集合对象事件
    • toList / toSortedList / toMap / toMultimap 将所有发送的事件对象收集为List、Map等集合对象,并在最后进行一次集合事件发送

    Blocking

    阻塞型Observable,通过一系列Operators对事件数据的发射进行阻塞。
    可通过 Observable.toBlocking()BlockingObservable.from() 将普通Observable转换为BlockingObservable

    • forEach 对每个发射的事件应用一个函数,该函数会被阻塞直到Observable完成
    • first 阻塞第一个事件的发射,直到Observable发射下一个事件(或结束事件)
    • firstOrDefault 如果Observable没有发射下一个事件(或结束事件),则发射一个默认事件
    • last / lastOrDefault 阻塞最后一个事件的发射,直到Observable发射一个结束事件
    • mostRecent 返回一个指向最近发射的事件序列的iterable
    • next 返回一个指向下一个发射的事件的iterable,阻塞获取直到Observable执行发射动作
    • latest 返回一个iterable,阻塞获取直到Observable发射了一个iterable没有返回的值,然后返回这个值
    • single / singleOrDefault 如果Observable发送超过一个事件,则抛出异常或发送指定的事件
    • toFuture Observable转换为Future对象
    • toIterable / getIterator Observable发送的所有事件转换为单一Iterable / Iterator对象

    Connectable Observable

    connect()调用时进行事件发送动作的Observable

    • publish 将一个Observable转换为ConnectableObservable
    • replay 将一个Observable转换为ConnectableObservable,对于事件发送后新加入订阅的Observer进行过去的事件发送过程的重放(发出新创建的事件实体对象,区别于cache),保证任何时间加入订阅的Observer都能接收到完整的整个事件序列,可传入一函数对源Observable转换为一个事件变形的新Observable,可参数指定buffer大小以及timeout时长
    • connect 令一个ConnectableObservable开始进行事件发送
    • refCount 令一个ConnectableObservable以普通Observable方式发送事件(不再以connect方法为发送开始标记),但仍保持所有Observer之间共享事件实体对象(保持与ConnectableObservable的链接,但至少需要保留一个Observer订阅)
    • share 相当于publish+refCount应用到一个Observable(至少需要存在一个Observer订阅),事件实体将在所有Observer之间进行共享

    String Observable

    StringObservable类提供的基于String类型事件的额外Operators

    • byLine 把源String事件进行统合再处理,通过换行符作为新String事件的分割位点,转换为每一独立行为单位的新事件序列
    • decode 将多字节字符流转换为字节数组事件类型的Observable
    • encode 将字符串事件Observable转换为字节数组Observable
    • from 将字符流或Reader对象转换为字节数组或String类型Observable
    • stringConcat 将String事件类型Observable的所有String事件数据进行整合,以单一String形式整体发送
    • joinstringConcat方式串联源Observable的所有String事件,但以指定的String作为切分标记
    • split 将String事件类型Observable的所有String事件进行整合,并重新以正则表达式进行切分

    其他辅助库

    • rxjava-async 用于生成异步事件发送的Observables
    • rxjava-joins 提供额外的Observables组合机制,如 and(), then(), when()
    • rxjava-computation-expressions 提供额外的条件判断Operators
    • rxjava-math 提供一系列用于数值型事件的简单数学统计、分析结果Operators

    参考:

  • 相关阅读:
    python两个类之间变量和函数的调用
    ubuntu远程桌面设置
    ROS节点分布式运行方法
    pandaboard串口通信调试
    linux下查看cpu使用情况
    树莓派LED指示灯说明
    python多线程实践小结
    关系模型关系模型
    栈和队列的应用
    栈和队列
  • 原文地址:https://www.cnblogs.com/wavky/p/RxJava-Operators.html
Copyright © 2011-2022 走看看