zoukankan      html  css  js  c++  java
  • 响应式编程RxJava1.3.8

    一、响应式编程

      响应式编程是一种基于异步数据流概念的编程模式。数据流就像一条河:它可以被观测,被过滤,被操作,或者为新的消费者与另外一条流合并为一条新的流。响应式编程的一个关键概念是事件。事件可以被等待,可以触发过程,也可以触发其它事件。Rx提供了一系列的操作符,你可以使用它们来过滤(filter)、选择(select)、变换(transform)、结合(combine)和组合(compose)多个Observable,这些操作符让执行和复合变得非常高效。

      Rx的目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,响应式编程的解决方案、观察者设计模式、一个实现异步操作的库。RxJava是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。目前已经更新到了3.1.3版本,hystrix使用的还是1.3.8的版本,本文先从1.3.8版本开始介绍他的API,后续会持续更新其他的版本。

      RxJava是通过扩展的观察者模式来实现的,所以你要先了解观察者模式,不懂得可以先看一下我之前写的设计模式。

    二、基本原理

    1、观察者

    Observer

    public interface Observer<T> {
        //完成事件
        void onCompleted();
        //出错事件
        void onError(Throwable e);
        //普通事件
        void onNext(T t);
    
    }

    onNext():普通的事件,用于一般的事件发送。
    onCompleted(): 完成事件,当不再有新的onNext()事件时调用onCompleted方法作为整个事件队列结束的标志,在一个事件队列当中可以有多个onCompleted方法,但是只能触发一次,一旦触发之后,后续的事件会继续发送,但是观察者将不会接受事件。
    onError(): 事件队列异常。在事件处理过程中出现异常时,onError()会被触发,同事队列自动终止,与onCompleted()方法相同,onError事件也可以存在多个,但是只会触发一次。

    需要注意的是onCompleteonError存在互斥的关系, 两个方法可以同时存在于事件队列当中,但是一旦触发其中一个方法之后,后续将不再接收事件。

    Subscriber

    public abstract class Subscriber<T> implements Observer<T>, Subscription {
    
        @Override
        public final void unsubscribe() {
            subscriptions.unsubscribe();
        }
        //
        @Override
        public final boolean isUnsubscribed() {
            return subscriptions.isUnsubscribed();
        }
        //
        public void onStart() {
            // do nothing by default
        }
    
    }

    Subscriber是Observer的增强实现类,因此Observer经常被转成Subscriber使用,也多出了一些功能。

    onStart():用于在 subscribe 刚开始,而事件还未发送之前被调用。可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe() 方法。

    unsubscribe(): 用于取消订阅,在这个方法被调用后,Subscriber 将不再接收事件。一般在这个方法调用前,可以使用 isUnsubscribed() 先判断一下状态。 unsubscribe() 这个方法很重要,因为在 subscribe() 之后, Observable 会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。

    2、被观察者

    Observable即被观察者,它决定什么时候触发事件以及触发怎样的事件。create() 方法是 RxJava 最基本的创造事件序列的方法。基于这个方法, RxJava 还提供了一些方法用来快捷创建事件队列。

    Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("Hello");
            subscriber.onNext("jack");
            subscriber.onNext("how are you");
            subscriber.onCompleted();
        }
    });
    //等同于上面的调用流程
    Observable observable = Observable.just("Hello", "jack", "how are you");
    //等同于上面的调用流程
    String[] words = {"Hello", "jack", "how are you"};
    Observable observable = Observable.from(words);

    3、订阅

    observable.subscribe(observer);
    observable.subscribe(subscriber);

    三、操作流程

      被观察者(Observable)通过订阅(Subscribe)按顺序发送事件给观察者(Observer),观察者(Observer)按顺序接收事件&作出对应的响应动作。

    1、create

     private Subscriber getSubscriber(){
            return new Subscriber<String>() {
                //新增onStart方法,用来做一些初始化操作
                @Override
                public void onStart() {
                    super.onStart();
                    System.out.println("on start");
                }
                //被观察者调用onCompleted时触发
                @Override
                public void onCompleted() {
                    System.out.println("on complete");
                }
                //被观察者调用onError时触发
                @Override
                public void onError(Throwable e) {
                    System.out.println("on error");
                }
                //被观察者调用onNext时触发
                @Override
                public void onNext(String s) {
                    System.out.println("on next");
                    System.out.println(s);
                }
            };
        }
        @Test
        public void test1(){
            Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
                @Override
                public void call(Subscriber<? super String> subscriber) {
                    System.out.println("call");
                    //定义事件队列
                    subscriber.onNext("aaaa");
                    subscriber.onCompleted();
                }
            });
            Subscriber<String> subscriber = getSubscriber();
            observable.subscribe(subscriber);
        }
    on start
    call
    on next
    aaaa
    on complete

    2、just

      使用just(),将为你创建一个Observable并自动为你调用onNext( )发射数据。通过just( )方式 直接触发onNext(),just中传递的参数将直接在Observer的onNext()方法中接收到。
        @Test
        public void test2(){
            Subscriber<String> subscriber = getSubscriber();
            Observable<String> observable =  Observable.just("hello","jack");
            observable.subscribe(subscriber);
        }
    on start
    on next
    hello
    on next
    jack
    on complete

    3、from

      from()方法将传入的数组Iterable拆分成具体对象后,自动调用onNext方法依次发送。
        @Test
        public void test3() {
            //定义要发送的事件集合
            List<String> mList = new ArrayList<>();
            mList.add("hello");
            mList.add("jack");
            //定义Observable
            Observable<String> observable = Observable.from(mList);
            //进行订阅,开始发送事件
            Subscriber<String> subscriber = getSubscriber();
            observable.subscribe(subscriber);
        }

    4、defer

      通过defer()方法创建Observable,当观察者订阅时,才创建Observable,并且针对每个观察者创建都是一个新的Observable。以何种方式创建这个Observable对象,当满足回调条件后,就会进行相应的回调。

     @Test
        public void test4() {
            //定义一个被观察者
            Observable<String> observable = Observable.defer(new Func0<Observable<String>>() {
                @Override
                public Observable<String> call() {
                    Observable<String> mObservable = Observable.create(new Observable.OnSubscribe<String>() {
                        @Override
                        public void call(Subscriber<? super String> subscriber) {
                            subscriber.onNext("事件订阅开始");
                        }
                    });
                    System.out.println("被观察者创建:" + mObservable);
                    return mObservable;
                }
            });
            //订阅事件1,每产生一个订阅就会生成一个新的observable对象
            observable.subscribe(new Action1<String>() {
                @Override
                public void call(String s) {
                    System.out.println("观察者2订阅事件    " + s);
                }
            });
            //订阅事件2,每产生一个订阅就会生成一个新的observable对象
            observable.subscribe(new Action1<String>() {
                @Override
                public void call(String s) {
                    System.out.println("观察者1订阅事件    " + s);
                }
            });
        }
    被观察者创建:rx.Observable@2eda0940
    观察者2订阅事件    事件订阅开始
    被观察者创建:rx.Observable@545997b1
    观察者1订阅事件    事件订阅开始

    这里用到了不完整定义的回调Action,它就相当于我们之前定义的subscriber,但是其只针对onNext事件做处理。

    5、interval

      创建一个按固定时间间隔发射整数序列的Observable,可用作定时器。即按照固定时长调用onNext()方法。
        @Test
        public void test5() throws InterruptedException {
            Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS);
            observable.subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    System.out.println("订阅时长为:"+aLong);
                }
            });
            TimeUnit.SECONDS.sleep(3);
        }

    6、timer

      该方法可以在一定延迟之后发送特定事件。

     @Test
        public void test6() throws InterruptedException {
            //定义被观察者,在2000毫秒后开始发送事件
            Observable<Long> observable = Observable.timer(2000,TimeUnit.MILLISECONDS);
            System.out.println("发送事件开始:"+System.currentTimeMillis());
            observable.subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    System.out.println("收到订阅结束:"+System.currentTimeMillis());
                }
            });
            TimeUnit.SECONDS.sleep(3);
        }

    四、不完全定义回调

      先来简单看一下subscribe()方法的源码

    static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
            subscriber.onStart();
            observable.onSubscribe.call(subscriber);
            return Subscription;
        }

    1、首先是传进来的两个参数Subscriber和Observable分别对应我们调用时的subscriber和observable。
    2、订阅开始后,首先执行的是subscriber.onStart()方法,也就是我们观察者的onStart方法,这也就解释了为什么onStart方法只可以在默认线程当中执行,因为我们指定线程只可以指定observable所对应线程以及observable对应直属下级Subscriber所在线程。
    3、接下来调用observable当中的onSubscribe.call方法,我们的整个事件序列就定义在这里,调用call方法开始发送事件。
    4、返回值Subscription,之前在介绍观察者Subscriber时我们可以看到,Subscriber实现了这个接口,用于提供解除订阅以及判断订阅是否连接的方法,在订阅完成之后返回Subscription供外部调用。

    subscribe方法的参数除了SubscriberObserver之外,还有Action类,这就是将要介绍的不完整定义回调

        @Test
        public void test7(){
            Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
                @Override
                public void call(Subscriber<? super String> subscriber) {
                    subscriber.onNext("aaaaa");
    //                subscriber.onError(new NullPointerException());
                    subscriber.onCompleted();
                }
            });
            //只对事件序列中的onNext做出响应
            Action1<String> action1 = new Action1<String>() {
                @Override
                public void call(String s) {
                    System.out.println("no next");
                }
            };
            //只对事件序列中的onError做出响应
            Action1<Throwable> action2 = new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    System.out.println("on error"+throwable.getMessage());
                }
            };
            //只对事件序列中的onCompleted做出响应
            Action0 action0 = new Action0() {
                @Override
                public void call() {
                    System.out.println("on complete");
                }
            };
            observable.subscribe(action1,action2,action0);
        }
    //订阅事件,只处理onNext事件
    observable.subscribe(action1);
    //订阅事件,只处理onNext和onError事件
    observable.subscribe(action1,action2);
    //订阅事件,处理onNext、onError和onCompleted事件
    observable.subscribe(action1,action2,action0);

    这里的onError和onCompleted也只会执行其中的一个。

    五、链式调用

        @Test
        public void test8() {
            //定义将要发射的数组
            String[] strs = {"1", "2", "3"};
            //from操作符可以把数组当中的数据逐条发送出去
            Observable.from(strs)
                    //调用flatMap操作符把String类型转换为新的Observable<String>并开始发送事件。
                    .flatMap(new Func1<String, Observable<String>>() {
                        @Override
                        public Observable<String> call(String s) {
                            System.out.println("testRX---map:" + s);
                            //需要把String类型转换成的Observable对象
                            return Observable.create(new Observable.OnSubscribe<String>() {
                                @Override
                                public void call(Subscriber<? super String> subscriber) {
                                    subscriber.onNext("test");
                                }
                            });
                        }
                    }).subscribe(new Subscriber<String>() {
                @Override
                public void onCompleted() {
                    //处理完成事件
                    System.out.println("on complete");
                }
    
                @Override
                public void onError(Throwable e) {
                    //处理error事件
                    System.out.println("on error");
                }
    
                @Override
                public void onNext(String s) {
                    //处理普通的onNext事件
                    System.out.println("on next");
                    System.out.println(s);
                }
            });
        }
    testRX---map:1
    on next
    test
    testRX---map:2
    on next
    test
    testRX---map:3
    on next
    test

    六、变换操作符

      变换操作符的作用是对Observable发射的数据按照一定规则做一些变换操作,然后将变换后的数据发射出去。变换操作符有map,flatMap,switchMap,filter,buffer,groupBy等。

    1、map

      map操作符就是通过制定一个Func1对象,将原Observable对象转换为另一个Observable对象并发射。

    @Test
        public void test1() {
            //定义初始事件序列数组
            Integer[] ints = {1, 2, 3};
            //调用from逐个发射数据
            //map方法把Intger类型转换为String类型
            Observable.from(ints).map(new Func1<Integer, String>() {
                @Override
                public String call(Integer i) {
                    //对Integer数据进行处理,转换成String类型返回
                    return i + "号玩家";
                }
            }).subscribe(new Action1<String>() {
                @Override
                public void call(String s) {
                    System.out.println(s + "加入游戏");
                }
            });
        }

    map方法怎么就把一种数据类型转换成了另一种数据类型呢,我们看一下map的源码

    public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return lift(new OperatorMap<T, R>(func));
    }
    
    //这里新建了一个Operator对象,核心功能代码如下
    public final class OperatorMap<T, R> implements Operator<R, T> {
        final Func1<? super T, ? extends R> transformer;
    
        public OperatorMap(Func1<? super T, ? extends R> transformer) {
            this.transformer = transformer;
        }
    
        @Override
        public Subscriber<? super T> call(final Subscriber<? super R> o) {
            MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
            o.add(parent);
            return parent;
        }
    }
    
    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
            return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
        }
    这里新建一个newSubscriber并与原observable订阅的Subscriber关联起来,left方法本身也返回一个新建的被观察者newObservable。由此变为,当产生事件订阅时,实际上是newObservable订阅了事件,之后而通知原observable开始发送事件,原observable发送的事件发送向newSubscriber,再发送给Subscriber
    2、flatMap
      flatMap也是用来做类型转换,不同于map的是,flatMap转换后得到的是一个Observable对象。
        @Test
        public void test2() throws InterruptedException {
            //interval方法创建Observable对象,每隔1秒发送一个事件
            //经过flatMap方法变化,将long类型的事件变换为一个新的Observable对象发射出去
            Subscription subscription = Observable.interval(1, TimeUnit.SECONDS).flatMap(new Func1<Long, Observable<String>>() {
                @Override
                public Observable<String> call(final Long aLong) {
                    return Observable.create(new Observable.OnSubscribe<String>() {
                        @Override
                        public void call(Subscriber<? super String> subscriber) {
                            System.out.println("testRX---next:" + aLong);
                            subscriber.onNext(aLong + "_s");
                        }
                    });
                }
                //定义Subscriber对变换后的事件进行接收
            }).subscribe(new Subscriber<String>() {
                @Override
                public void onCompleted() {
                    System.out.println("on complete");
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.println("on error");
                }
    
                @Override
                public void onNext(String s) {
                    System.out.println("on next");
                }
            });
            TimeUnit.SECONDS.sleep(3);
        }

    看一下源码

    public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
        if (getClass() == ScalarSynchronousObservable.class) {
            return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
        }
        return merge(map(func));
    }

    首先在这里它调用了map方法,在前面我们分析过,map方法会生成一个新的Observable对象,并改变事件的订阅顺序,接下来执行到merge方法,merge方法做了什么呢?

     public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
            return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
        }
        
    public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
        
        public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
            this.parent = parent;
            this.operator = operator;
        }
    
        @Override
        public void call(Subscriber<? super R> o) {
           Subscriber<? super T> st = hook.onLift(operator).call(o);
                     // new Subscriber created and being subscribed with so 'onStart' it
            st.onStart();
            parent.call(st);
        }
    }
        

    这里会把每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。

    3、buffer
      buffer操作符将源Observable变换为一个新的Observable,这个新的Observable每次发射一组列表值而不是一个一个发射。和buffer操作类似的还有window操作符,只不过window操作符发射的是Observable而不是数据列表。
        @Test
        public void test3() {
            Observable.just(1, 2, 3, 4, 5, 6).buffer(3).subscribe(new Action1<List<Integer>>() {
                @Override
                public void call(List<Integer> integers) {
                    for (Integer integer : integers) {
                        System.out.println("buffer" + integer);
                    }
                    System.out.println("---------");
                }
            });
        }
    buffer1
    buffer2
    buffer3
    ---------
    buffer4
    buffer5
    buffer6
    ---------

    4、fliter

      filter()操作符是根据自己想过滤的数据加入相应的逻辑判断,返回true则表示数据满足条件,返回false则表示数据需要被过滤。最后过滤出的数据将加入到新的Observable对象中,方便传递给Observer想要的数据形式。
        @Test
        public void test4(){
            Observable.just(1,2,3,4,5,6).filter(new Func1<Integer, Boolean>() {
                @Override
                public Boolean call(@NonNull Integer integer){
                    if(integer > 3){
                        return true;
                    }
                    return false;
                }
            }).subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer){
                    System.out.println(integer);
                }
            });
        }

    5、lift

      lift()方法可以变换原Observable生成新的Observable

        @Test
        public void testLift() {
            Observable.from(new Integer[]{1, 2, 3})
                    .lift(new Observable.Operator<String, Integer>() {
                        @Override
                        public Subscriber<? super Integer> call(final Subscriber<? super String> child) {
                            return new Subscriber<Integer>() {
                                @Override
                                public void onCompleted() {
                                    child.onCompleted();
                                }
    
                                @Override
                                public void onError(Throwable e) {
                                    child.onError(e);
                                }
    
                                @Override
                                public void onNext(Integer integer) {
                                    String s = "tom" + integer;
                                    child.onNext(s);
                                }
                            };
                        }
                    })
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            System.out.println("hi "+s);
                        }
                    });
        }

    七、线程调度

      默认情况下,在哪个线程调subscribe(),就会在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,则需要 Scheduler(调度器)。在RxJava 中,Scheduler,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景。

    Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
    Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
    Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
    Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
    AndroidSchedulers.mainThread():Android 开发专用的 ,它指定的操作将在 Android 主线程运行。

      有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。 

    subscribeOn(): 指定Observable(被观察者)所在的线程,或者叫做事件产生的线程。
    observeOn(): 指定 Observer(观察者)所运行在的线程,或者叫做事件消费的线程。

        @Test
        public void test1() {
            Observable.create(new Observable.OnSubscribe<String>() {
                @Override
                public void call(Subscriber<? super String> subscriber) {
                    System.out.println(Thread.currentThread().getName());
                    subscriber.onNext("aaaa");
                    subscriber.onCompleted();
                }
            }).subscribeOn(Schedulers.io())
                    .observeOn(Schedulers.newThread())
                    .subscribe(new Subscriber<String>() {
                        @Override
                        public void onCompleted() {
                            System.out.println("on complete");
                            System.out.println(Thread.currentThread().getName());
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onNext(String s) {
                            System.out.println("on next " + s);
                        }
                    });
        }
    RxIoScheduler-2
    on next aaaa
    on complete
    RxNewThreadScheduler-1

    参考文章:https://www.jianshu.com/p/e5d1380c8343

    参考文章:https://blog.csdn.net/alex_xfboy/article/details/89576152

  • 相关阅读:
    css常见小问题(个人的积累总结)
    YQBlogs1.2的解读与使用VS2012&LocalDB版本(跟进贴)
    asp.net mvc 用自定义的RazorViewEngine实现主题的自由切换遇到的问题!
    求解?命令行下操作LocalDb的诸多问题
    CentOS7下docker安装
    IIS 应用程序池设置,避免IIS假死
    .NET 将数据输出到WORD、EXCEL、TXT、HTM
    iframe高度动态自适应
    .net利用SQLBulkCopy进行数据库之间的大批量数据传递
    Datatable.Select()用法简介
  • 原文地址:https://www.cnblogs.com/sglx/p/15773864.html
Copyright © 2011-2022 走看看