zoukankan      html  css  js  c++  java
  • 【知识整理】这可能是最好的RxJava 2.x 入门教程(四)

     

    这可能是最好的RxJava 2.x入门教程系列专栏

    文章链接:

    这可能是最好的RxJava 2.x 入门教程(完结版)【强力推荐】

    这可能是最好的RxJava 2.x 入门教程(一)

    这可能是最好的RxJava 2.x 入门教程(二)

    这可能是最好的RxJava 2.x 入门教程(三)

    这可能是最好的RxJava 2.x 入门教程(四)

    GitHub 代码同步更新:https://github.com/nanchen2251/RxJava2Examples

    为了满足大家的饥渴难耐,GitHub将同步更新代码,主要包含基本的代码封装,RxJava 2.x所有操作符应用场景介绍和实际应用场景,后期除了RxJava可能还会增添其他东西,总之,GitHub上的Demo专为大家倾心打造。传送门:https://github.com/nanchen2251/RxJava2Examples

    一、前言

          最近很多小伙伴私信我,说自己很懊恼,对于RxJava 2.x 系列一看就能明白,但自己写却又写不出来。如果 LZ 能放上实战情景教程就最好不过了。也是哈,单讲我们的操作符,也让我们的教程不温不火,但 LZ 自己选择的路,那跪着也要走完呀。所以,也就让我可怜的小伙伴们忍忍了,操作符马上就讲完了。

    二、正题

    16、Single

    顾名思义,Single 只会接收一个参数,而SingleObserver 只会调用onError 或者onSuccess。

     1 Single.just(new Random().nextInt())
     2                 .subscribe(new SingleObserver<Integer>() {
     3                     @Override
     4                     public void onSubscribe(@NonNull Disposable d) {
     5 
     6                     }
     7 
     8                     @Override
     9                     public void onSuccess(@NonNull Integer integer) {
    10                         mRxOperatorsText.append("single : onSuccess : "+integer+"
    ");
    11                         Log.e(TAG, "single : onSuccess : "+integer+"
    " );
    12                     }
    13 
    14                     @Override
    15                     public void onError(@NonNull Throwable e) {
    16                         mRxOperatorsText.append("single : onError : "+e.getMessage()+"
    ");
    17                         Log.e(TAG, "single : onError : "+e.getMessage()+"
    ");
    18                     }
    19                 });

    输出:

    17、distinct

    去重操作符,简单的作用就是去重。

    1 Observable.just(1, 1, 1, 2, 2, 3, 4, 5)
    2                 .distinct()
    3                 .subscribe(new Consumer<Integer>() {
    4                     @Override
    5                     public void accept(@NonNull Integer integer) throws Exception {
    6                         mRxOperatorsText.append("distinct : " + integer + "
    ");
    7                         Log.e(TAG, "distinct : " + integer + "
    ");
    8                     }
    9                 });

    输出:

    很明显,发射器发送的事件,在接收的时候被去重了。

    18、debounce

    去除发送频率过快的项,看起来好像没啥用处,但你信我,后面绝对有地方很有用武之地。

     

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
                    // send events with simulated time wait
                    emitter.onNext(1); // skip
                    Thread.sleep(400);
                    emitter.onNext(2); // deliver
                    Thread.sleep(505);
                    emitter.onNext(3); // skip
                    Thread.sleep(100);
                    emitter.onNext(4); // deliver
                    Thread.sleep(605);
                    emitter.onNext(5); // deliver
                    Thread.sleep(510);
                    emitter.onComplete();
                }
            }).debounce(500, TimeUnit.MILLISECONDS)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(@NonNull Integer integer) throws Exception {
                            mRxOperatorsText.append("debounce :" + integer + "
    ");
                            Log.e(TAG,"debounce :" + integer + "
    ");
                        }
                    });

    输出:

    代码很清晰,去除发送间隔时间小于500毫秒的发射事件,所以1 和 3 被去掉了。

    19、defer

    简单地时候就是每次订阅都会创建一个新的Observable,并且如果没有被订阅,就不会产生新的Observable

     1 Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<Integer>>() {
     2             @Override
     3             public ObservableSource<Integer> call() throws Exception {
     4                 return Observable.just(1, 2, 3);
     5             }
     6         });
     7 
     8 
     9         observable.subscribe(new Observer<Integer>() {
    10             @Override
    11             public void onSubscribe(@NonNull Disposable d) {
    12 
    13             }
    14 
    15             @Override
    16             public void onNext(@NonNull Integer integer) {
    17                 mRxOperatorsText.append("defer : " + integer + "
    ");
    18                 Log.e(TAG, "defer : " + integer + "
    ");
    19             }
    20 
    21             @Override
    22             public void onError(@NonNull Throwable e) {
    23                 mRxOperatorsText.append("defer : onError : " + e.getMessage() + "
    ");
    24                 Log.e(TAG, "defer : onError : " + e.getMessage() + "
    ");
    25             }
    26 
    27             @Override
    28             public void onComplete() {
    29                 mRxOperatorsText.append("defer : onComplete
    ");
    30                 Log.e(TAG, "defer : onComplete
    ");
    31             }
    32         });

    输出:

    20、last

    last 操作符仅取出可观察到的最后一个值,或者是满足某些条件的最后一项。

    1 Observable.just(1, 2, 3)
    2                 .last(4)
    3                 .subscribe(new Consumer<Integer>() {
    4                     @Override
    5                     public void accept(@NonNull Integer integer) throws Exception {
    6                         mRxOperatorsText.append("last : " + integer + "
    ");
    7                         Log.e(TAG, "last : " + integer + "
    ");
    8                     }
    9                 });

    输出:

    21、merge

    merge 顾名思义,熟悉版本控制工具的你一定不会不知道 merge 命令,而在 Rx 操作符中,merge 的作用是把多个 Observable 结合起来,接受可变参数,也支持迭代器集合。注意它和 concat 的区别在于,不用等到 发射器 A 发送完所有的事件再进行发射器 B 的发送。

    1 Observable.merge(Observable.just(1, 2), Observable.just(3, 4, 5))
    2                 .subscribe(new Consumer<Integer>() {
    3                     @Override
    4                     public void accept(@NonNull Integer integer) throws Exception {
    5                         mRxOperatorsText.append("merge :" + integer + "
    ");
    6                         Log.e(TAG, "accept: merge :" + integer + "
    " );
    7                     }
    8                 });

    输出:

    22、reduce

    reduce 操作符每次用一个方法处理一个值,可以有一个 seed 作为初始值。

     1 Observable.just(1, 2, 3)
     2                 .reduce(new BiFunction<Integer, Integer, Integer>() {
     3                     @Override
     4                     public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
     5                         return integer + integer2;
     6                     }
     7                 }).subscribe(new Consumer<Integer>() {
     8             @Override
     9             public void accept(@NonNull Integer integer) throws Exception {
    10                 mRxOperatorsText.append("reduce : " + integer + "
    ");
    11                 Log.e(TAG, "accept: reduce : " + integer + "
    ");
    12             }
    13         });

    输出:

    可以看到,代码中,我们中间采用 reduce ,支持一个 function 为两数值相加,所以应该最后的值是:1 + 2 = 3 + 3 = 6 , 而Log 日志完美解决了我们的问题。

    23、scan

    scan 操作符作用和上面的 reduce 一致,唯一区别是 reduce 是个只追求结果的坏人,而 scan 会始终如一地把每一个步骤都输出。

     1 Observable.just(1, 2, 3)
     2                 .scan(new BiFunction<Integer, Integer, Integer>() {
     3                     @Override
     4                     public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
     5                         return integer + integer2;
     6                     }
     7                 }).subscribe(new Consumer<Integer>() {
     8             @Override
     9             public void accept(@NonNull Integer integer) throws Exception {
    10                 mRxOperatorsText.append("scan " + integer + "
    ");
    11                 Log.e(TAG, "accept: scan " + integer + "
    ");
    12             }
    13         });

    输出:

    看日志,没毛病。

    24、window

    按照实际划分窗口,将数据发送给不同的Observable

     1 mRxOperatorsText.append("window
    ");
     2         Log.e(TAG, "window
    ");
     3         Observable.interval(1, TimeUnit.SECONDS) // 间隔一秒发一次
     4                 .take(15) // 最多接收15个
     5                 .window(3, TimeUnit.SECONDS)
     6                 .subscribeOn(Schedulers.io())
     7                 .observeOn(AndroidSchedulers.mainThread())
     8                 .subscribe(new Consumer<Observable<Long>>() {
     9                     @Override
    10                     public void accept(@NonNull Observable<Long> longObservable) throws Exception {
    11                         mRxOperatorsText.append("Sub Divide begin...
    ");
    12                         Log.e(TAG, "Sub Divide begin...
    ");
    13                         longObservable.subscribeOn(Schedulers.io())
    14                                 .observeOn(AndroidSchedulers.mainThread())
    15                                 .subscribe(new Consumer<Long>() {
    16                                     @Override
    17                                     public void accept(@NonNull Long aLong) throws Exception {
    18                                         mRxOperatorsText.append("Next:" + aLong + "
    ");
    19                                         Log.e(TAG, "Next:" + aLong + "
    ");
    20                                     }
    21                                 });
    22                     }
    23                 });

    输出:

    三、写在最后

          至此,大部分 RxJava 2.x 的操作符就告一段落了,当然还有一些没有提到的操作符,不是说它们不重要,而是 LZ 也要考虑大家的情况,接下来就会根据实际应用场景来对 RxJava 2.x 发起冲锋。如果想看更多的数据,请移步 GitHub:https://github.com/nanchen2251/RxJava2Examples

  • 相关阅读:
    MySQL STR_TO_DATE函数
    mybatis的一种批量更新方法【我】
    ON DUPLICATE KEY UPDATE单个增加更新及批量增加更新的sql
    Unity寻路的功能总结
    Unity3d大会的部分总结
    支付宝Unity
    [Firefly引擎][学习笔记三][已完结]所需模块封装
    [Firefly引擎][学习笔记二][已完结]卡牌游戏开发模型的设计
    [Firefly引擎][学习笔记一][已完结]带用户验证的聊天室
    [Firefly引擎][学习笔记四][已完结]服务器端与客户端的通讯
  • 原文地址:https://www.cnblogs.com/liushilin/p/7081715.html
Copyright © 2011-2022 走看看