zoukankan      html  css  js  c++  java
  • RxJava--Buffer,GroupBy 对比

    Buffer

    • 设定收集n个元素为一组,以下方代码为例,三个为一组,则当组满三个元素时,返回一次List数据
    • 没组满三个元素时,如果调用onComplete,直接发送剩余元素,没调用onComplete,一直等待
       PublishSubject<String> subject = PublishSubject.create();
           Disposable disposable = subject
                   .buffer(3)//获取三个为一组发送
                   .subscribe(new Consumer<List<String>>() {
                       @Override
                       public void accept(List<String> s) throws Exception {
                           StringBuilder content = new StringBuilder();
                           for (String index : s) {
                               content.append(index).append(",");
                           }
                           LogUtils.e("rxJavaBuffer==" + content);
                       }
                   });
           subject.onNext("1");
           subject.onNext("2");
           subject.onNext("3");
           subject.onNext("4");
           subject.onNext("5");
           subject.onNext("6");
           subject.onNext("7");
           subject.onNext("8");
           subject.onNext("9");
           subject.onNext("10");
           subject.onComplete();
    
    复制代码

    GroupBy

    • 很有意思的操作符。先将获取的元素分组(自己分配key),生成对应的GroupedObservable
    • GroupedObservable有点类似HashMap,包含key(自己分配的)和元素
    • 应该注意的是,当每组GroupedObservable首次订阅新的订阅者后,后续同组元素直接将数据发送给新的订阅者。看代码
      PublishSubject<String> subject = PublishSubject.create();
          Disposable disposable = subject
                  .groupBy(new Function<String, String>() {
                      @Override
                      public String apply(String s) throws Exception {
                          // ?  第一步。 数据分类,分配不同的key
                          if (Integer.valueOf(s) < 4) {
                              return "one";
                          } else if (Integer.valueOf(s) < 7) {
                              return "two";
                          }
                          if (Integer.valueOf(s) < 10) {
                              return "three";
                          }
                          return "other";
                      }
                  })
                  .subscribe(new Consumer<GroupedObservable<String, String>>() {
                      @Override
                      public void accept(GroupedObservable<String, String> sub) throws Exception {
                           // ?  第二步。 根据不同的key,绑定新的订阅者。
                           //               如果改组已经订阅了新的订阅者,直接发送给新的订阅者
                          LogUtils.e(sub.getKey());
                          switch (sub.getKey()) {
                              case "one":
                                  sub.subscribe(new Consumer<String>() {
                                      @Override
                                      public void accept(String s) throws Exception {
                                          LogUtils.e("GroupedObservable==one" + s);
                                      }
                                  });
                                  break;
                              case "two":
                                  sub.subscribe(new Consumer<String>() {
                                      @Override
                                      public void accept(String s) throws Exception {
                                          LogUtils.e("GroupedObservable==two" + s);
                                      }
                                  });
                                  break;
                              case "three":
                                  sub.subscribe(new Consumer<String>() {
                                      @Override
                                      public void accept(String s) throws Exception {
                                          LogUtils.e("GroupedObservable==three" + s);
                                      }
                                  });
                                  break;
                              default:
                                  sub.subscribe(new Consumer<String>() {
                                      @Override
                                      public void accept(String s) throws Exception {
                                          LogUtils.e("GroupedObservable==other" + s);
                                      }
                                  });
                                  break;
                          }
                      }
                  });
          subject.onNext("1");
          subject.onNext("2");
          subject.onNext("3");
          subject.onNext("4");
          subject.onNext("5");
          subject.onNext("6");
          subject.onNext("7");
          subject.onNext("8");
          subject.onNext("9");
          subject.onNext("10");
          subject.onComplete();
     
    复制代码

  • 相关阅读:
    angular typescript 引入js文件
    (转)WEB页面导出为Word文档后分页&横向打印的方法
    aspx页面,后端通过Attributes.Add给textbox添加事件时,传参失效问题。
    aspx.designer.cs没有自动生成代码(没有自动注册)
    .net core 在CentOS环境下将微信公众号语音文件amr转化成mp3
    Sign in with Apple 后端验证(C#)
    C# 调用腾讯即时通信 IM
    LINQ入门笔记----LINQ To Object<Take(),TakeWhile(),Skip(),SkipWhile()>
    LINQ入门笔记----LINQ To Object<SelectMany()>
    初识LINQ
  • 原文地址:https://www.cnblogs.com/twodog/p/12134981.html
Copyright © 2011-2022 走看看