zoukankan      html  css  js  c++  java
  • RxJava2详细攻略(一)

    0.简介

    RxJava其实就是提供一套异步编程的API,这套API是基于观察者模式的,而且是链式调用的,所以使用RxJava编写的代码的逻辑会非常简介。

    RxJava有三个基本元素:

    1.被观察者(Observable)

    2.观察者(Observer)

    3.订阅(subscribe)

    下面来说说以上三者是如何写作的:
    首先在gradle文件中添加依赖:

    implementation 'io.reactivex.rxjava2:rejava.2.1.4'
    implementation 'io.reactivex.rxjava2.2.0.0'
    

    1.创建被观察者

    Observable observable = Observable.create(new ObservableOnSubscribe<Integer>(){
        @Override
        public void subscribe(ObservableEmitter<Integer> e)throws Exception{
            Log.d(TAG, "==currentThread name: " + Thread.currentThread().getName());
            e.onNext(1);
            e.onNext(2);
            e.onComplete();
        }
    });
    

    2.创建观察者

    Observer observer = new Observer<Integer>(){
        @Override
        public void onSubscribe(Disposable d){
            Log.d(TAG, "==onSubscribe");
        }
        
        @Override
        public void onNext(Integer integer){
            Log.d(TAG, "==onNext" + integer);
        }
        
        @Override
        public void onError(Throwable e){
            Log.d(TAG, "==onError");
        }
        
        @Override
        public void onComplete(){
            Log.d(TAG, "==onComplete");
        }
    };
    

    3.订阅

    observable.subscribe(observer);
    

    或者使用链式调用

    Observable.create(new ObservableOnSubscribe < Integer > () {
        @Override
        public void subscribe(ObservableEmitter < Integer > e) throws Exception {
            Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onComplete();
        }
    })
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "======================onSubscribe");
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "======================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "======================onError");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "======================onComplete");
        }
    });
    

    被观察者发送的事件有以下几种,总结如下表:

    事件种类 作用
    onNext() 发送该事件时,观察者会回调onNext()方法
    onError() 发送该事件时,观察者会回调onError()方法,当发送该事件之后,其他事件不会继续发送
    onComplete() 发送该事件时,观察者会调用onComplete()方法,当发送该事件之后,其他事件将不会发送
    其实可以将RxJava比喻成一个做果汁的过程,家里有很多种水果(要发送的原始数据),你想榨果汁喝,这时候你就要想究竟要喝什么果汁呢?如果你想喝牛油果雪梨柠檬汁,那你就要把这三种水果混在一起榨汁(使用各种操作符变换你想发送给观察者的数据),榨完后,你就可以喝上你想要的果汁了(把处理好的数据发送给观察者)。

    总结如下图:

    下面就来讲解RxJava各种操作的操作符。

    1.创建操作符

    以下就是讲解创建被观察者的各种操作符

    1.1 create()

    方法预览

    public static <T> Observable <T> create(ObservableOnSubscribe<T> source)
    

    有什么用

    创建一个被观察者

    怎么用

    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>(){
        @Override
        public void subscribe(ObservableEmitter<String> e)throws Exception{
            e.onNext("Hello Observer");
            e.onComplete;
        }
    });
    

    以上的代码非常简单,创建ObservableOnSubscribe并重写其subscribe方法,就可以通过ObservableEmitter发射器向观察者发送事件。

    以下创建一个观察者,来验证这个被观察者是否成功创建。

    Observer<String> observer = new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
    
        }
    
        @Override
        public void onNext(String s) {
            Log.d("chan","=============onNext " + s);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
            Log.d("chan","=============onComplete ");
        }
    };
            
    observable.subscribe(observer);
            
    

    打印结果:

    05-20 16:16:50.654 22935-22935/com.example.louder.rxjavademo D/chan: =============onNext Hello Observer
    =============onComplete
    

    1.2 just()

    方法预览

    public static <T> Observable<T> just(T item) 
    ......
    public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)
    

    有什么用
    创建一个被观察者,并发送事件,发送的事件不可以超过10个以上。

    怎么用

    Observable.just(1,2,3)
    .subscribe(new Observer <Integer>(){
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "=================onSubscribe");
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "=================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "=================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "=================onComplete ");
        }
    })
    

    上面的代码直接使用链式调用,代码也非常简单,这里就不细说了,看看打印结果:

    05-20 16:27:26.938 23281-23281/? D/chan: =================onSubscribe
    =================onNext 1
    =================onNext 2
    =================onNext 3
    =================onComplete 
    

    1.3 From操作符

    1.3.1 fromArray()

    方法预览

    public static <T> Observable<T> fromArray(T---items)
    

    有什么用?

    这个方法和just()类似,只不过fromArray可以传入多于10个的变量,并且可以传入一个数组。

    怎么用

    Integer array[] = {1,2,3,4};
    Observable.fromArray(array)
    .subscribe(new Observer<Integer>(){
         @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "=================onSubscribe");
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "=================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "=================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "=================onComplete ");
        }
    });
    

    代码和just()基本上一样,直接看打印结果:

    05-20 16:35:23.797 23574-23574/com.example.louder.rxjavademo D/chan: =================onSubscribe
    =================onNext 1
    =================onNext 2
    =================onNext 3
    =================onNext 4
    =================onComplete 
    

    1.3.2 fromCallable()

    方法预览

    public static <T> Observable <T> fromCallable(Callable<? extends T> supplier)
    

    有什么用

    这里的 Callable 是 java.util.concurrent 中的 Callable, Callable和 Runnable的用法基本一致,只是它会返回一个结果值,这个结果值就是发送给观察者的。

    怎么用

    Observable.fromCallable(new Callable<Integer>(){
        @Override
        public Integer call() throws Exception{
            return 1;
        }
    })
    .subscribe(new Consumer<Integer>(){
        @Override
        public void accept(Integer integer) throws Exception{
            Log.d(TAG, "==accept" + integer);
        }
    });
    

    打印结果

    05-26 13:01:43.009 6890-6890/? D/chan: ================accept 1
    

    1.3.3 fromFuture()

    方法预览

    public static <T> Observable<T> fromFuture(Future<? extends T> future)
    

    有什么用?

    参数中的Future是 java.util.concurrent 中的Future,Future的作用是增加了cancel()等方法操作Callable,它可以通过get()方法来获取Callable返回的值。

    怎么用

    FutureTask<String> futureTask = new FutureTask<> (new Callable<String>(){
        @Override
        public String call() throws Exception{
            Log.d(TAG, "CallableDemo is Running");
            return "返回结果";
        }
    });
    
    Observable.fromFuture(futureTask)
        .doOnSubscribe(new Consumer<Disposable>(){
            @Override
            public void accept(Disposable disposable)throws Exception{
                futureTask.run();
            }
        })
        .subscribe(new Consumer<String>(){
            @Override
            public void accept(String s) throws Exception{
                Log.d(TAG, "==accept" + s);
            }
        });
    

    doOnSubscribe()的作用就是只有订阅时才会发送事件,具体会在下面讲解。

    打印结果

    05-26 13:54:00.470 14429-14429/com.example.rxjavademo D/chan: CallableDemo is Running
    ================accept 返回结果
    

    1.3.4 fromIterable()

    方法预览

    public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
    

    有什么用

    直接发送一个List集合数据给观察者

    怎么用

    List<Integer> list = new ArrayList<>();
    list.add(0);
    list.add(1);
    list.add(2);
    Observable.fromIterable(list)
    .subscribe(new Observable<Integer>(){
        @Override
        public void onSubscribe(Disposable d){
            Log.d(TAG, "==onSubscribe");
        }
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "=================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "=================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "=================onComplete ");
        }
    });
    

    打印结果如下:

    05-20 16:43:28.874 23965-23965/? D/chan: =================onSubscribe
    =================onNext 0
    =================onNext 1
    =================onNext 2
    =================onComplete 
    

    1.4 defer()

    方法预览

    public static <T> Observable<T> defer(Callable<> extends ObservableSource<? extends T>> supplier)
    

    有什么用

    这个方法的作用是直到被观察者被订阅后才会创建被观察者。

    怎么用

    //i要定义为成员变量
    Interger i = 100;
    
    Observable<Integer> observable = Observable.deger(new Callable<ObservabelSource<? extends Integer>>){
        @Override
        public ObservableSource<? extends Integer>call()throws Exception{
            return Observable.just(i);
        }
    });
    
    i = 200;
    
    Observer observer = new Observer<Integer>(){
        @Override
        public void onSubscribe(Disposable d){
            
        }
         @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    };
    
    observable.subscribe(observer);
    
    i = 300;
    
    observable.subscribe(observer);
    

    打印结果如下:

    05-20 20:05:01.443 26622-26622/? D/chan: ================onNext 200
    ================onNext 300
    

    因为defer()只有观察者订阅的时候才会创建新的被观察者,所以每订阅一次就会打印一次,并且都是打印i最新的值。

    1.5 timer()

    方法预览

    public static Observable<Long> timer(long delay, TimeUnit unit)
    

    有什么用

    当到指定时间后就会发送一个OL的值给观察者。

    怎么用

    Observable.timer(2, TimeUnit.SECONDS)
    .subscribe(new Observer<Long>(){
        @Override
        public void onSubscribe(Disposable d) {
    
        }
    
        @Override
        public void onNext(Long aLong) {
            Log.d(TAG, "===============onNext " + aLong);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    

    打印结果:

    05-20 20:27:48.004 27204-27259/com.example.louder.rxjavademo D/chan: ===============onNext 0
    

    1.6 interval()

    方法预览

    public static Observable<Long> interval(long period, TimeUnit unit)
    public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)
    

    有什么用
    每隔一段时间就会发送一个事件,这个事件是从0开始,不断增1的数字。

    怎么用

    Observable.interval(4, TimeUnit.SECONDS)
    .subscribe(new Observer<Long>(){
        @Override
          public void onSubscribe(Disposable d) {
            Log.d(TAG, "==============onSubscribe ");
        }
    
        @Override
        public void onNext(Long aLong) {
            Log.d(TAG, "==============onNext " + aLong);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    

    打印结果:

    05-20 20:48:10.321 28723-28723/com.example.louder.rxjavademo D/chan: ==============onSubscribe 
    05-20 20:48:14.324 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 0
    05-20 20:48:18.324 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 1
    05-20 20:48:22.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 2
    05-20 20:48:26.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 3
    05-20 20:48:30.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 4
    05-20 20:48:34.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 5
    

    从时间就可以看出每隔4秒就会发出一次数字递增1的事件。这里说下interval()第三个方法的initialDelay参数,这个参数的意思是onSubscribe回调之后,再次回调onNext的间隔时间。

    1.7 intervalRange()

    方法预览

    public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
    public static Observable<Long>intervalRange(long start, long connt, long initialDelay long period, TimeUnit unit, Scheduler scheduler)
    

    有什么用
    可以指定发送事件的开始值和数量,其他与interval()的功能一样。

    怎么用

    Observable.intervalRange(2, 5, 2, 1, TimeUnit.SECONDS)
    .subscribe(new Observer<Long>(){
        @Override
            public void onSubscribe(Disposable d) {
            Log.d(TAG, "==============onSubscribe ");
        }
    
        @Override
        public void onNext(Long aLong) {
            Log.d(TAG, "==============onNext " + aLong);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    

    打印结果:

    05-21 00:03:01.672 2504-2504/com.example.louder.rxjavademo D/chan: ==============onSubscribe 
    05-21 00:03:03.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 2
    05-21 00:03:04.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 3
    05-21 00:03:05.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 4
    05-21 00:03:06.673 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 5
    05-21 00:03:07.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext
    

    可以看出收到5次onNext事件,并且都是从2开始的。

    1.8 range()

    方法预览

    public static Observable<Integer> range(final int start, final int count)
    

    有什么用

    同时发送一定范围的事件序列

    怎么用

    Observable.range(2, 5)
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==============onSubscribe ");
        }
    
        @Override
        public void onNext(Integer aLong) {
            Log.d(TAG, "==============onNext " + aLong);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    

    打印结果:

    05-21 00:09:17.202 2921-2921/? D/chan: ==============onSubscribe 
    ==============onNext 2
    ==============onNext 3
    ==============onNext 4
    ==============onNext 5
    ==============onNext 6
    

    1.9 rangeLong()

    方法预览

    public static Observable<Long> rangeLong(long start, long count)
    

    有什么用

    作用与range()一样,只是数据类型为Long

    怎么用

    用法与range()一样,这里不做赘述。

    1.10 empty()&never()&error()

    方法预览

    public static <T> Observable<T> empty()
    public static <T> Observable<T> never()
    public static <T> Observable<T> error(final Throwable exception)
    

    有什么用

    1. empty():直接发送onComplete()事件
    2. never():不发送任何事件
    3. error():发送onError()事件

    怎么用

    Observable.empty()
    .subscribe(new Observer<Object>(){
          @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==================onSubscribe");
        }
    
        @Override
        public void onNext(Object o) {
            Log.d(TAG, "==================onNext");
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "==================onError " + e);
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "==================onComplete");
        }
    });
    

    打印结果:

    05-26 14:06:11.881 15798-15798/com.example.rxjavademo D/chan: ==================onSubscribe
    ==================onComplete
    

    换成never()的打印结果:

    05-26 14:12:17.554 16805-16805/com.example.rxjavademo D/chan: ==================onSubscribe
    

    换成onError()的打印结果:

    05-26 14:12:58.483 17817-17817/com.example.rxjavademo D/chan: ==================onSubscribe
    ==================onError java.lang.NullPointerException
    

    2.转换操作符

    2.1 map()

    方法预览

    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper)
    

    有什么用

    map可以将被观察者发送的数据类型转变成其他的类型

    怎么用?

    以下代码将Integer类型的数据转换成String

    Observable.just(1, 2, 3)
    .map(new Function<Integer, String>(){
        @Override
        public String apply(Integer integer)throws Exception{
            return "I'm " + integer;
        }
    })
    .subscribe(new Observer<String>(){
        @Override
        public void onSubscribe(Disposable d){
            Log.e(TAG, "==onSubscribe");
        }
        
        @Override
        public void onNext(String s){
            Log.e(TAG, "==onNext" + s);
        }
        
        @Override
        public void onError(Throwable e){
            
        }
        
        @Override
        public void onComplete(){
            
        }
    });
    

    打印结果:

    05-21 09:16:03.490 5700-5700/com.example.rxjavademo E/chan: ===================onSubscribe
    ===================onNext I'm 1
    ===================onNext I'm 2
    ===================onNext I'm 3
    

    2.2 flatMap()

    方法预览

    public final<R> Observable flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
    

    有什么用

    这个方法可以将事件序列中的元素进行整合加工,返回一个新的被观察者。

    怎么用?

    flatMap()其实与map()类似,但是faltMap()返回的是一个Observable。现在用一个例子来说明flatmap()的用法。



    假设有一个Person类,这个类的定义如下:

    public class Person{
        private String name;
        private List<Plan> planList = new ArrayList<>();
        
        public Person(String name, List<Plan> planList){
            this.name = name;
            this.planList = planList;
                public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public List<Plan> getPlanList() {
            return planList;
        }
    
        public void setPlanList(List<Plan> planList) {
            this.planList = planList;
        }
    }
    

    Person类有一个name和planList两个变量,分别代表的是人名和计划清单。



    Plan类的定义如下:

    public class Plan{
        private String time;
        private String content;
        private List<String> actionList = new ArrayList<>();
        
        public Plan(String time, String content){
            this.name = name;
            this.content = content;
        }
        public String getTime() {
            return time;
        }
    
        public void setTime(String time) {
            this.time = time;
        }
    
        public String getContent() {
            return content;
        }
    
        public void setContent(String content) {
            this.content = content;
        }
    
        public List<String> getActionList() {
            return actionList;
        }
    
        public void setActionList(List<String> actionList) {
            this.actionList = actionList;
        }
    }
    

    现在有一个需求就是要将Person集合中的每个元素中的Plan的action打印出来。首先用map()来实现这个需求:

    Observable.fromIterable(personList)
    .map(new Function <Person, List<Plan>>(){
        @Override
        public List <Plan> apply(Person person) throws Exception{
            return person.getPlanList();
        }
    })
    .subscribe(new Observer<List <Plan>>(){
        @Override
        public void onSubscribe(Disposable d){
            
        }
        
        @Override
        public void onNext(List<Plan> plans){
            for(Plan plan : plans){
                List<String> planActionList = planActionList();
                for(String action: planActionList){
                    Log.d(TAG, "==action" + action);
                }
            }
        }
        
        @Override
        public void onError(Throwable e){
            
        }
        
        @Override
        public void onComplete(){
            
        }
    });
    

    可以看到onNext()用了嵌套for循环来实现,如果代码逻辑复杂起来,可能需要多重循环才能实现。



    现在看下使用flatMap()实现:

    Observable.fromIterable(personList)
    .flatMap(new Function<Person, ObservableSourfe<Plan>> (){
        @Override
        public ObservableSource<Plan> apply(Person person){
            return Observable.formIterable(person.getPlanList());
        }
    })
    .flatMap(new Function<Plan, ObservableSource<String>>(){
        @Override
        public ObservableSource<String> apply(Plan plan) throws Exception{
            return Observable.fromIterable(plan.getActionList());
        }
    })
    .subscribe(new Observer<String>(){
        @Override
        public void onSubscribe(Disposable d){
            
        }
        
        @Override
        public void onNext(String s){
            Log.d(TAG, "==action" + s);
        }
        
        @Override
        public void onError(Throwable e){
            
        }
        
        @Override
        public void onComplete(){
            
        }
    });
    

    从代码可以看出,只需要两个flatMap()就可以完成需求,并且代码逻辑非常清晰。

    2.3 concatMap()

    方法预览

    public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
    public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, int prefetch)
    

    有什么用?

    concatMap()和flatMap()基本上是一样的,只不过concatMap()转发出来的事件是有序的,而flatMap()是无序的。

    怎么用

    Observable.fromIterable(personList)
    .flatMap(new Function<Person, ObservableSource<Plan>>(){
        @Override
        public ObservableSource<Plan> apply(Person person){
            if("chan".equals(person.getName())){
                return Observable.fromIterable(person.getPlanList()).delay(10, TimeUnit.MILLISECONDS);
        }
        return Observable.fromIterable(person.getPlanList());
        }
    })
    .subscribe(new Observer<Plan>(){
        @Override
        public void onSubscribe(Disposable d){
            
        }
        
        @Override
        public void onNext(Plan plan){
            Log.d(TAG, "==plan" + plan.getContent());
        }
        
        @Override
        public void onError(Throwable e){
            
        }
        
        @Override
        public void onComplete(){
            
        }
    });
    

    为了更好地验证flatMap是无序的,使用了一个delay()方法来延迟,直接看打印结果:

    05-21 13:57:14.031 21616-21616/com.example.rxjavademo D/chan: ==================plan chan 上课
    ==================plan chan 写作业
    ==================plan chan 打篮球
    05-21 13:57:14.041 21616-21641/com.example.rxjavademo D/chan: ==================plan Zede 开会
    ==================plan Zede 写代码
    ==================plan Zede 写文章
    

    本来Zede的事件发送顺序是排在chan事件之前,但是经过延迟后,这两个事件序列发送顺序互换了。



    现在来验证下concatMap()是否是有序的,使用上面同样的代码,只是把flatMap()换成concatMap(),打印结果如下:

    05-21 13:58:42.917 21799-21823/com.example.rxjavademo D/chan: ==================plan Zede 开会
    ==================plan Zede 写代码
    ==================plan Zede 写文章
    ==================plan chan 上课
    ==================plan chan 写作业
    ==================plan chan 打篮球
    

    这就代表concatMap()转换后发送的事件序列是有序的了。

    2.4 buffer()

    方法预览

    public final Observable<List<T>> buffer(int count, int skip)
    

    有什么用?

    从需要发送的事件当中获取一定数量的事件,并将这些事件放到缓冲区当中一并发出。

    怎么用?

    buffer有两个参数,一个是count,另一个skip。count缓冲区元素的数量,skip就代表缓冲区满了之后,发送下一次事件序列的时候要跳过多少元素。这样说可能还是有点抽象,直接看代码:

    Observable.just(1, 2, 3, 4, 5)
    .buffer(2, 1)
    .subscribe(new Observer<List <Integer>>(){
        @Override
        public void onSubscribe(Disposable d){
            
        }
        
        @Override
        public void onNext(List<Integer> integers){
            Log.d(TAG, "==缓冲区大小:" + integers.size());
            for(Integer i: integers){
                Log.d(TAG, "==元素:" + i);
            }
        }
        
        @Override
        public void onError(Throwable e){
            
        }
        
        @Override
        public void onComplete(){
            
        }
    });
    

    打印结果:

    05-21 14:09:34.015 22421-22421/com.example.rxjavademo D/chan: ================缓冲区大小: 2
    ================元素: 1
    ================元素: 2
    ================缓冲区大小: 2
    ================元素: 2
    ================元素: 3
    ================缓冲区大小: 2
    ================元素: 3
    ================元素: 4
    ================缓冲区大小: 2
    ================元素: 4
    ================元素: 5
    ================缓冲区大小: 1
    ================元素: 5
    

    从结果可以看出,每次发送事件,指针都会往后移动一个元素再取值,直到指针移动到没有元素的时候就会停止取值。

    2.5 groupBy()

    方法预览

    public final <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T, ? extends K> keySelector)
    

    有什么用

    将发送的数据进行分组,每个分组都会返回一个被观察者。

    怎么用

    Observable.just(5, 2, 3, 4, 1, 6, 8, 9, 7, 10)
    .groupBy(new Function<Integer, Integer>(){
        @Override
        public Integer apply(Integer integer) throws Exception{
            return integer % 3;
        }
    })
    .subscribe(new Observer<GroupedObservable<Integer, Integer>>(){
        @Override
        public void onSubscribe(Disposable d){
            Log.d(TAG, "==onSubscribe");
        }
        
        @Override
        public void onNext(GroupedObservable<Integer, Integer> integerIntegerGroupedObservable){
            Log.d(TAG, "==onNext");
            integerIntegerGroupObservable.subscribe(new Observer<Integer>(){
                @Override
                public void onSubscribe(Disposable d){
                    Log.d(TAG, "==GroupedObservable onSubscribe");
                }
                
                @Override
                public void onNext(Integer integer){
                    Log.d(TAG, "==GroupedObservable onNext groupName: " + integerIntegerGroupedObservable.getKey() + " value" + integer);
                    
                    @Override
                    public void onError(Throwable e){
                        Log.d(TAG, "==GroupedObservable onError");
                    }
                }
                
                @Override
                public void onComplete(){
                    Log.d(TAG, "==GroupedObservable onComplete");
                }
            });
        }
        
        @Override
        public void onError(Throwable e){
            Log.d(TAG, "==onError");
        }
        
        @Override
        public void onComplete(){
            Log.d(TAG, "==onComplete");
        }
    });
    

    在groupBy()方法返回的参数是分组的名字,每返回一个值,那就代表会创建一个组,以上的代码就是将1~10的数据分成3组,查看打印结果:

    05-26 14:38:02.062 21451-21451/com.example.rxjavademo D/chan: ====================onSubscribe 
    05-26 14:38:02.063 21451-21451/com.example.rxjavademo D/chan: ====================onNext 
    ====================GroupedObservable onSubscribe     ====================GroupedObservable onNext  groupName: 2 value: 5
    ====================GroupedObservable onNext  groupName: 2 value: 2
    ====================onNext 
    ====================GroupedObservable onSubscribe 
    ====================GroupedObservable onNext  groupName: 0 value: 3
    05-26 14:38:02.064 21451-21451/com.example.rxjavademo D/chan: ====================onNext 
    ====================GroupedObservable onSubscribe 
    ====================GroupedObservable onNext  groupName: 1 value: 4
    ====================GroupedObservable onNext  groupName: 1 value: 1
    ====================GroupedObservable onNext  groupName: 0 value: 6
    ====================GroupedObservable onNext  groupName: 2 value: 8
    ====================GroupedObservable onNext  groupName: 0 value: 9
    ====================GroupedObservable onNext  groupName: 1 value: 7
    ====================GroupedObservable onNext  groupName: 1 value: 10
    05-26 14:38:02.065 21451-21451/com.example.rxjavademo D/chan: ====================GroupedObservable onComplete 
    ====================GroupedObservable onComplete 
    ====================GroupedObservable onComplete 
    ====================onComplete 
    

    可以看到返回的结果中是有三个组的。

    2.6 scan()

    方法预览

    public final Observable<T> scan(BiFunction<T, T, T> accumulator)
    

    有什么用?

    将数据以一定的逻辑聚合起来

    怎么用?

    Observable.just(1, 2, 3, 4, 5)
    .scan(new BiFunction<Integer, Integer, Integer>(){
        @Override
        public Integer apply(Integer integer, Integer integer2)throws Exception{
            Log.d(TAG, "==apply");
            Log.d(TAG, "==integer" + integer);
            Log.d(TAG, "==integer2" + integer2);
            return integer + integer2;
        }
    })
    .subscribe(new Consumer<Integer>(){
        @Override
        public void accept(Integer integer) throws Exception{
            Log.d(TAG, "==accept" + integer);
        }
    });
    

    打印结果:

    05-26 14:45:27.784 22519-22519/com.example.rxjavademo D/chan: ====================accept 1
    ====================apply 
    ====================integer 1
    ====================integer2 2
    ====================accept 3
    ====================apply 
    05-26 14:45:27.785 22519-22519/com.example.rxjavademo D/chan: ====================integer 3
    ====================integer2 3
    ====================accept 6
    ====================apply 
    ====================integer 6
    ====================integer2 4
    ====================accept 10
    ====================apply 
    ====================integer 10
    ====================integer2 5
    ====================accept 15
    

    2.7 window()

    方法预览

    public final Observable<Observable<T>> window(long count)
    

    有什么用?

    发送指定数量的事件时,就将这些事件分为一组。Window中的count的参数就是代表指定的数量,例如将count指定为2,那么每发2个数据就会将这两个数据分成一组。

    怎么用?

    Observable.just(1, 2, 3, 4, 5)
    .window(2)
    .subscribe(new Observer<Observable<Integer>>(){
        @Override
        public void onSubscribe(Disposable d){
            Log.d(TAG, "==onSubscribe");
        }
        
        @Override
        public void onNext(Observable<Integer> integerObservable){
            integerObservable.subscribe(new Observer<Integer>(){
                @Override
                public void onSubscribe(Disposable d){
                    Log.d(TAG, "==integerObservable onSubscribe");
                }
                
                @Override
                public void onNext(Integer integer){
                    Log.d(TAG, "==integerObservable onNext" + integer);
                }
                
                @Override
                public void onError(Throwable e){
                    Log.d(TAG, "==integerObservable onError");
                }
                
                @Override
                public void onComplete(){
                    Log.d(TAG, "==integerObservable onComplete");
                }
            });
        }
        
          @Override
        public void onError(Throwable e) {
            Log.d(TAG, "=====================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "=====================onComplete ");
        }
    });
    

    打印结果:

    05-26 15:02:20.654 25838-25838/com.example.rxjavademo D/chan: =====================onSubscribe 
    05-26 15:02:20.655 25838-25838/com.example.rxjavademo D/chan: =====================integerObservable onSubscribe 
    05-26 15:02:20.656 25838-25838/com.example.rxjavademo D/chan: =====================integerObservable onNext 1
    =====================integerObservable onNext 2
    =====================integerObservable onComplete 
    =====================integerObservable onSubscribe 
    =====================integerObservable onNext 3
    =====================integerObservable onNext 4
    =====================integerObservable onComplete 
    =====================integerObservable onSubscribe 
    =====================integerObservable onNext 5
    =====================integerObservable onComplete 
    =====================onComplete 
    

    从结果可以发现,window()将1~5的事件分成了3组。

  • 相关阅读:
    Mysql蠕虫复制
    Mysql中如何开启慢查询功能?
    线程的状态以及状态切换
    Java的Unsafe类
    Spring 获取jar内外文件的方式
    RocketMQ学习
    volatile的理解
    快速排序
    JVM的发展史
    nginx安装配置
  • 原文地址:https://www.cnblogs.com/hyeri/p/14262594.html
Copyright © 2011-2022 走看看