zoukankan      html  css  js  c++  java
  • RXJAVA之变换操作

    RXJAVA提供了以下变换操作,对Observable的消息进行变换操作:

    1.window

    定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项。

    Observable<String> observable = Observable.just("123", "456","789","abc");

        observable.window(3).subscribeWith(new Observer<Observable<String>>(){

    @Override

    public void onComplete() {

    System.out.println("complete");

    }

    @Override

    public void onError(Throwable arg0) {

    System.out.println("error");

     

    }

    @Override

    public void onNext(Observable<String> arg0) {

    arg0.subscribeWith(new Observer<String>(){

    @Override

    public void onSubscribe(Disposable d) {

    System.out.println("onSubscribe");

    }

     

    @Override

    public void onNext(String t) {

    System.out.println(t);

    }

     

    @Override

    public void onError(Throwable e) {

    System.out.println("error");

    }

     

    @Override

    public void onComplete() {

    System.out.println("complete");

    }});

     

    }

    @Override

    public void onSubscribe(Disposable arg0) {

    System.out.println("onSubscribe");

     

    }

    });

        }

    输出结果

    onSubscribe

    onSubscribe

    123

    456

    789

    complete

    onSubscribe

    abc

    complete

    complete

    2.map

    变换接收到的数据,重新发放出去。map函数只有一个参数,参数一般是Func1,Func1的<I,O>I,O模版分别为输入和输出值的类型,实现Func1的call方法对I类型进行处理后返回O类型数据。

    Observable.just("123", "456","789").map(new Function<String,Integer>(){

    @Override

    public Integer apply(String t) throws Exception {

    return Integer.parseInt(t);

    }}).subscribeWith(new Observer<Integer>(){

     

    @Override

    public void onSubscribe(Disposable d) {

    System.out.println("onSubscribe");

    }

     

    @Override

    public void onNext(Integer t) {

    System.out.println(t);

    }

     

    @Override

    public void onError(Throwable e) {

    System.out.println("onError");

    }

     

    @Override

    public void onComplete() {

    System.out.println("onComplete");

    }});

        }

     输出结果

    onSubscribe

    123

    456

    789

    onComplete

    3.flatmap 

    将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,内部采用merge合并。

    Observable<String> observable = Observable.just("123", "456","789","abc");

        observable.flatMap(new Function<String,Observable<String>>(){

     

    @Override

    public Observable<String> apply(String t) throws Exception {

    return Observable.just(t+"flatmap");

    }}

        ).subscribeWith(new Observer<String>(){

     

    @Override

    public void onSubscribe(Disposable d) {

    System.out.println("onSubscribe");

    }

     

    @Override

    public void onNext(String t) {

    System.out.println(t);

    }

     

    @Override

    public void onError(Throwable e) {

    System.out.println("onError");

    }

     

    @Override

    public void onComplete() {

    System.out.println("onComplete");

    }});

        }

    输出结果

    onSubscribe

    123flatmap

    456flatmap

    789flatmap

    abcflatmap

    onComplete

  • 相关阅读:
    J2SE总结
    OSI模型与TCP/IP协议族
    poppler交叉编译
    摆脱技术思维,转向产品思维——寻找“万能”IDC的苦恼
    面向自由职业者和小型企业的开源开票工具
    3星|《中国做对了什么》:十几年前的文章集了,依旧不过时
    2星|《巴菲特致股东的信》:标题党,实际是1996年一次研讨会的发言记录,没有致股东的信
    3星|《不会被机器替代的人》:人在被服务的时候,更喜欢面对面跟人打交道,而不是跟机器打交道
    3星|《提高职场执行力》:执行力难关的根源是对话的无效性
    2星|《工业X.0》:物联网的资料汇编
  • 原文地址:https://www.cnblogs.com/zhangwanhua/p/7591803.html
Copyright © 2011-2022 走看看