zoukankan      html  css  js  c++  java
  • 【知识整理】这可能是最好的RxJava 2.x 入门教程(四)

     

    这可能是最好的RxJava 2.x入门教程系列专栏

    文章链接:

    这可能是最好的RxJava 2.x 入门教程(完结版)【强力推荐】

    这可能是最好的RxJava 2.x 入门教程(一)

    这可能是最好的RxJava 2.x 入门教程(二)

    这可能是最好的RxJava 2.x 入门教程(三)

    这可能是最好的RxJava 2.x 入门教程(四)

    GitHub 代码同步更新:https://github.com/nanchen2251/RxJava2Examples

    为了满足大家的饥渴难耐,GitHub将同步更新代码,主要包含基本的代码封装,RxJava 2.x所有操作符应用场景介绍和实际应用场景,后期除了RxJava可能还会增添其他东西,总之,GitHub上的Demo专为大家倾心打造。传送门:https://github.com/nanchen2251/RxJava2Examples

    一、前言

          最近很多小伙伴私信我,说自己很懊恼,对于RxJava 2.x 系列一看就能明白,但自己写却又写不出来。如果 LZ 能放上实战情景教程就最好不过了。也是哈,单讲我们的操作符,也让我们的教程不温不火,但 LZ 自己选择的路,那跪着也要走完呀。所以,也就让我可怜的小伙伴们忍忍了,操作符马上就讲完了。

    二、正题

    16、Single

    顾名思义,Single 只会接收一个参数,而SingleObserver 只会调用onError 或者onSuccess。

     1 Single.just(new Random().nextInt())
     2                 .subscribe(new SingleObserver<Integer>() {
     3                     @Override
     4                     public void onSubscribe(@NonNull Disposable d) {
     5 
     6                     }
     7 
     8                     @Override
     9                     public void onSuccess(@NonNull Integer integer) {
    10                         mRxOperatorsText.append("single : onSuccess : "+integer+"
    ");
    11                         Log.e(TAG, "single : onSuccess : "+integer+"
    " );
    12                     }
    13 
    14                     @Override
    15                     public void onError(@NonNull Throwable e) {
    16                         mRxOperatorsText.append("single : onError : "+e.getMessage()+"
    ");
    17                         Log.e(TAG, "single : onError : "+e.getMessage()+"
    ");
    18                     }
    19                 });

    输出:

    17、distinct

    去重操作符,简单的作用就是去重。

    1 Observable.just(1, 1, 1, 2, 2, 3, 4, 5)
    2                 .distinct()
    3                 .subscribe(new Consumer<Integer>() {
    4                     @Override
    5                     public void accept(@NonNull Integer integer) throws Exception {
    6                         mRxOperatorsText.append("distinct : " + integer + "
    ");
    7                         Log.e(TAG, "distinct : " + integer + "
    ");
    8                     }
    9                 });

    输出:

    很明显,发射器发送的事件,在接收的时候被去重了。

    18、debounce

    去除发送频率过快的项,看起来好像没啥用处,但你信我,后面绝对有地方很有用武之地。

     

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
                    // send events with simulated time wait
                    emitter.onNext(1); // skip
                    Thread.sleep(400);
                    emitter.onNext(2); // deliver
                    Thread.sleep(505);
                    emitter.onNext(3); // skip
                    Thread.sleep(100);
                    emitter.onNext(4); // deliver
                    Thread.sleep(605);
                    emitter.onNext(5); // deliver
                    Thread.sleep(510);
                    emitter.onComplete();
                }
            }).debounce(500, TimeUnit.MILLISECONDS)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(@NonNull Integer integer) throws Exception {
                            mRxOperatorsText.append("debounce :" + integer + "
    ");
                            Log.e(TAG,"debounce :" + integer + "
    ");
                        }
                    });

    输出:

    代码很清晰,去除发送间隔时间小于500毫秒的发射事件,所以1 和 3 被去掉了。

    19、defer

    简单地时候就是每次订阅都会创建一个新的Observable,并且如果没有被订阅,就不会产生新的Observable

     1 Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<Integer>>() {
     2             @Override
     3             public ObservableSource<Integer> call() throws Exception {
     4                 return Observable.just(1, 2, 3);
     5             }
     6         });
     7 
     8 
     9         observable.subscribe(new Observer<Integer>() {
    10             @Override
    11             public void onSubscribe(@NonNull Disposable d) {
    12 
    13             }
    14 
    15             @Override
    16             public void onNext(@NonNull Integer integer) {
    17                 mRxOperatorsText.append("defer : " + integer + "
    ");
    18                 Log.e(TAG, "defer : " + integer + "
    ");
    19             }
    20 
    21             @Override
    22             public void onError(@NonNull Throwable e) {
    23                 mRxOperatorsText.append("defer : onError : " + e.getMessage() + "
    ");
    24                 Log.e(TAG, "defer : onError : " + e.getMessage() + "
    ");
    25             }
    26 
    27             @Override
    28             public void onComplete() {
    29                 mRxOperatorsText.append("defer : onComplete
    ");
    30                 Log.e(TAG, "defer : onComplete
    ");
    31             }
    32         });

    输出:

    20、last

    last 操作符仅取出可观察到的最后一个值,或者是满足某些条件的最后一项。

    1 Observable.just(1, 2, 3)
    2                 .last(4)
    3                 .subscribe(new Consumer<Integer>() {
    4                     @Override
    5                     public void accept(@NonNull Integer integer) throws Exception {
    6                         mRxOperatorsText.append("last : " + integer + "
    ");
    7                         Log.e(TAG, "last : " + integer + "
    ");
    8                     }
    9                 });

    输出:

    21、merge

    merge 顾名思义,熟悉版本控制工具的你一定不会不知道 merge 命令,而在 Rx 操作符中,merge 的作用是把多个 Observable 结合起来,接受可变参数,也支持迭代器集合。注意它和 concat 的区别在于,不用等到 发射器 A 发送完所有的事件再进行发射器 B 的发送。

    1 Observable.merge(Observable.just(1, 2), Observable.just(3, 4, 5))
    2                 .subscribe(new Consumer<Integer>() {
    3                     @Override
    4                     public void accept(@NonNull Integer integer) throws Exception {
    5                         mRxOperatorsText.append("merge :" + integer + "
    ");
    6                         Log.e(TAG, "accept: merge :" + integer + "
    " );
    7                     }
    8                 });

    输出:

    22、reduce

    reduce 操作符每次用一个方法处理一个值,可以有一个 seed 作为初始值。

     1 Observable.just(1, 2, 3)
     2                 .reduce(new BiFunction<Integer, Integer, Integer>() {
     3                     @Override
     4                     public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
     5                         return integer + integer2;
     6                     }
     7                 }).subscribe(new Consumer<Integer>() {
     8             @Override
     9             public void accept(@NonNull Integer integer) throws Exception {
    10                 mRxOperatorsText.append("reduce : " + integer + "
    ");
    11                 Log.e(TAG, "accept: reduce : " + integer + "
    ");
    12             }
    13         });

    输出:

    可以看到,代码中,我们中间采用 reduce ,支持一个 function 为两数值相加,所以应该最后的值是:1 + 2 = 3 + 3 = 6 , 而Log 日志完美解决了我们的问题。

    23、scan

    scan 操作符作用和上面的 reduce 一致,唯一区别是 reduce 是个只追求结果的坏人,而 scan 会始终如一地把每一个步骤都输出。

     1 Observable.just(1, 2, 3)
     2                 .scan(new BiFunction<Integer, Integer, Integer>() {
     3                     @Override
     4                     public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
     5                         return integer + integer2;
     6                     }
     7                 }).subscribe(new Consumer<Integer>() {
     8             @Override
     9             public void accept(@NonNull Integer integer) throws Exception {
    10                 mRxOperatorsText.append("scan " + integer + "
    ");
    11                 Log.e(TAG, "accept: scan " + integer + "
    ");
    12             }
    13         });

    输出:

    看日志,没毛病。

    24、window

    按照实际划分窗口,将数据发送给不同的Observable

     1 mRxOperatorsText.append("window
    ");
     2         Log.e(TAG, "window
    ");
     3         Observable.interval(1, TimeUnit.SECONDS) // 间隔一秒发一次
     4                 .take(15) // 最多接收15个
     5                 .window(3, TimeUnit.SECONDS)
     6                 .subscribeOn(Schedulers.io())
     7                 .observeOn(AndroidSchedulers.mainThread())
     8                 .subscribe(new Consumer<Observable<Long>>() {
     9                     @Override
    10                     public void accept(@NonNull Observable<Long> longObservable) throws Exception {
    11                         mRxOperatorsText.append("Sub Divide begin...
    ");
    12                         Log.e(TAG, "Sub Divide begin...
    ");
    13                         longObservable.subscribeOn(Schedulers.io())
    14                                 .observeOn(AndroidSchedulers.mainThread())
    15                                 .subscribe(new Consumer<Long>() {
    16                                     @Override
    17                                     public void accept(@NonNull Long aLong) throws Exception {
    18                                         mRxOperatorsText.append("Next:" + aLong + "
    ");
    19                                         Log.e(TAG, "Next:" + aLong + "
    ");
    20                                     }
    21                                 });
    22                     }
    23                 });

    输出:

    三、写在最后

          至此,大部分 RxJava 2.x 的操作符就告一段落了,当然还有一些没有提到的操作符,不是说它们不重要,而是 LZ 也要考虑大家的情况,接下来就会根据实际应用场景来对 RxJava 2.x 发起冲锋。如果想看更多的数据,请移步 GitHub:https://github.com/nanchen2251/RxJava2Examples

  • 相关阅读:
    实用机器学习 跟李沐学AI
    Explicitly drop temp table or let SQL Server handle it
    dotnettransformxdt and FatAntelope
    QQ拼音输入法 禁用模糊音
    (技术八卦)Java VS RoR
    Ruby on rails开发从头来(windows)(七)创建在线购物页面
    Ruby on rails开发从头来(windows)(十三)订单(Order)
    Ruby on rails开发从头来(windows)(十一)订单(Order)
    新员工自缢身亡,华为又站到了风口浪尖
    死亡汽油弹(Napalm Death)乐队的视频和来中国演出的消息
  • 原文地址:https://www.cnblogs.com/liushilin/p/7081715.html
Copyright © 2011-2022 走看看