zoukankan      html  css  js  c++  java
  • Android RxJava2 浅析

    原文地址:http://blog.csdn.net/maplejaw_/article/details/52442065 

    Observable

    在RxJava1.x中,最熟悉的莫过于Observable这个类了,笔者刚使用RxJava2.x时,创建一个Observable后,顿时是懵逼的。因为我们熟悉的Subscriber居然没影了,取而代之的是ObservableEmitter,俗称发射器。此外,由于没有了Subscriber的踪影,我们创建观察者时需使用Observer。而Observer也不是我们熟悉的那个Observer,其回调的Disposable参数更是让人摸不到头脑。

    废话不多说,从会用开始,还记得使用RxJava的三部曲吗? 
    第一步:初始化一个Observable

    1.        Observable<Integer> observable=Observable.create(new ObservableOnSubscribe<Integer>() {
    2.  
    3.             @Override
    4.             public void subscribe(ObservableEmitter<Integer> e) throws Exception {
    5.                 e.onNext(1);
    6.                 e.onNext(2);
    7.                 e.onComplete();
    8.             }
    9.         });

    第二步:初始化一个Observer

    1.         Observer<Integer> observer= new Observer<Integer>() {
    2.  
    3.             @Override
    4.             public void onSubscribe(Disposable d) {
    5.  
    6.             }
    7.  
    8.             @Override
    9.             public void onNext(Integer value) {
    10.  
    11.  
    12.             }
    13.  
    14.             @Override
    15.             public void onError(Throwable e) {
    16.  
    17.             }
    18.  
    19.             @Override
    20.             public void onComplete() {
    21.             }
    22.         }

    第三部:建立订阅关系

    1.     observable.subscribe(observer); //建立订阅关系

    不难看出,与RxJava1.x还是存在着一些区别的。首先,创建Observable时,回调的是ObservableEmitter,字面意思即发射器,用于发射数据(onNext)和通知(onError/onComplete)。其次,创建的Observer中多了一个回调方法onSubscribe,传递参数为Disposable ,Disposable相当于RxJava1.x中的Subscription,用于解除订阅。你可能纳闷为什么不像RxJava1.x中订阅时返回Disposable,而是选择回调出来呢。官方说是为了设计成Reactive-Streams架构。不过仔细想想这么一个场景还是很有用的,假设Observer需要在接收到异常数据项时解除订阅,在RxJava2.x中则非常简便,如下操作即可。

    1.   Observer<Integer> observer = new Observer<Integer>() {
    2.             private Disposable disposable;
    3.  
    4.             @Override
    5.             public void onSubscribe(Disposable d) {
    6.                 disposable = d;
    7.             }
    8.  
    9.             @Override
    10.             public void onNext(Integer value) {
    11.                 Log.d("JG", value.toString());
    12.                 if (value > 3) {   // >3 时为异常数据,解除订阅
    13.                     disposable.dispose();
    14.                 }
    15.             }
    16.  
    17.             @Override
    18.             public void onError(Throwable e) {
    19.  
    20.             }
    21.  
    22.             @Override
    23.             public void onComplete() {
    24.  
    25.  
    26.  
    27.             }
    28.         };

    此外,RxJava2.x中仍然保留了其他简化订阅方法,我们可以根据需求,选择相应的简化订阅。只不过传入的对象改为了Consumer。`

    1.    Disposable disposable = observable.subscribe(new Consumer<Integer>() {
    2.             @Override
    3.             public void accept(Integer integer) throws Exception {
    4.                   //这里接收数据项
    5.             }
    6.         }, new Consumer<Throwable>() {
    7.             @Override
    8.             public void accept(Throwable throwable) throws Exception {
    9.               //这里接收onError
    10.             }
    11.         }, new Action() {
    12.             @Override
    13.             public void run() throws Exception {
    14.               //这里接收onComplete。
    15.             }
    16.         });

    不同于RxJava1.x,RxJava2.x中没有了一系列的Action/Func接口,取而代之的是与Java8命名类似的函数式接口,如下图: 

    image_1arse84b03rfoo89mr1m53151a9.png

    其中Action类似于RxJava1.x中的Action0,区别在于Action允许抛出异常。

    1. public interface Action {
    2.     /**
    3.      * Runs the action and optionally throws a checked exception
    4.      * @throws Exception if the implementation wishes to throw a checked exception
    5.      */
    6.     void run() throws Exception;
    7. }

    而Consumer即消费者,用于接收单个值,BiConsumer则是接收两个值,Function用于变换对象,Predicate用于判断。这些接口命名大多参照了Java8,熟悉Java8新特性的应该都知道意思,这里也就不再赘述了。

    线程调度

    关于线程切换这点,RxJava1.x和RxJava2.x的实现思路是一样的。这里就简单看下相关源码。

    subscribeOn

    同RxJava1.x一样,subscribeOn用于指定subscribe()时所发生的线程,从源码角度可以看出,内部线程调度是通过ObservableSubscribeOn来实现的。

    1.    public final Observable<T> subscribeOn(Scheduler scheduler) {
    2.         ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    3.         return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    4.     }

    ObservableSubscribeOn的核心源码在subscribeActual方法中,通过代理的方式使用SubscribeOnObserver包装Observer后,设置Disposable来将subscribe切换到Scheduler线程中

    1.     @Override
    2.     public void subscribeActual(final Observer<? super T> s) {
    3.         final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    4.  
    5.         s.onSubscribe(parent); //回调Disposable
    6.  
    7.         parent.setDisposable(scheduler.scheduleDirect(new Runnable() { //设置`Disposable`
    8.             @Override
    9.             public void run() {
    10.                 source.subscribe(parent); //使Observable的subscribe发生在Scheduler线程中
    11.             }
    12.         }));
    13.     }

    observeOn

    observeOn方法用于指定下游Observer回调发生的线程。

    1.     public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    2.          //..
    3.          //验证安全
    4.         return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    5.     }

    主要实现在ObservableObserveOn中的subscribeActual,可以看出,不同于subscribeOn,没有将suncribe操作全部切换到Scheduler中,而是通过ObserveOnSubscriber与Scheduler配合,通过schedule()达到切换下游Observer回调发生的线程,这一点与RxJava1.x实现几乎相同。关于ObserveOnSubscriber的源码这里不再重复描述了,有兴趣的可以查看本人RxJava源码解读这篇文章

    1.     @Override
    2.     protected void subscribeActual(Observer<? super T> observer) {
    3.         if (scheduler instanceof TrampolineScheduler) {
    4.             source.subscribe(observer);
    5.         } else {
    6.             Scheduler.Worker w = scheduler.createWorker();
    7.  
    8.             source.subscribe(new ObserveOnSubscriber<T>(observer, w, delayError, bufferSize));
    9.         }
    10.     }

    Flowable

    Flowable是RxJava2.x中新增的类,专门用于应对背压(Backpressure)问题,但这并不是RxJava2.x中新引入的概念。所谓背压,即生产者的速度大于消费者的速度带来的问题,比如在Android中常见的点击事件,点击过快则会造成点击两次的效果。 
    我们知道,在RxJava1.x中背压控制是由Observable完成的,使用如下:

    1.   Observable.range(1,10000)
    2.             .onBackpressureDrop()
    3.             .subscribe(integer -> Log.d("JG",integer.toString()));

    而在RxJava2.x中将其独立了出来,取名为Flowable。因此,原先的Observable已经不具备背压处理能力。 
    通过Flowable我们可以自定义背压处理策略。 

    image_1arsktnsv1uk810abcki15ljp91m.png

    测试Flowable例子如下:

    1.   Flowable.create(new FlowableOnSubscribe<Integer>() {
    2.  
    3.             @Override
    4.             public void subscribe(FlowableEmitter<Integer> e) throws Exception {
    5.  
    6.                 for(int i=0;i<10000;i++){
    7.                     e.onNext(i);
    8.                 }
    9.                 e.onComplete();
    10.             }
    11.         }, FlowableEmitter.BackpressureMode.ERROR) //指定背压处理策略,抛出异常
    12.                 .subscribeOn(Schedulers.computation())
    13.                 .observeOn(Schedulers.newThread())
    14.                 .subscribe(new Consumer<Integer>() {
    15.                     @Override
    16.                     public void accept(Integer integer) throws Exception {
    17.                         Log.d("JG", integer.toString());
    18.                         Thread.sleep(1000);
    19.                     }
    20.                 }, new Consumer<Throwable>() {
    21.                     @Override
    22.                     public void accept(Throwable throwable) throws Exception {
    23.                         Log.d("JG",throwable.toString());
    24.                     }
    25.                 });

    或者可以使用类似RxJava1.x的方式来控制。

    1.   Flowable.range(1,10000)
    2.                 .onBackpressureDrop()
    3.                 .subscribe(integer -> Log.d("JG",integer.toString()));

    其中还需要注意的一点在于,Flowable并不是订阅就开始发送数据,而是需等到执行Subscription#request才能开始发送数据。当然,使用简化subscribe订阅方法会默认指定Long.MAX_VALUE。手动指定的例子如下:

    1.         Flowable.range(1,10).subscribe(new Subscriber<Integer>() {
    2.             @Override
    3.             public void onSubscribe(Subscription s) {
    4.                 s.request(Long.MAX_VALUE);//设置请求数
    5.             }
    6.  
    7.             @Override
    8.             public void onNext(Integer integer) {
    9.  
    10.             }
    11.  
    12.             @Override
    13.             public void onError(Throwable t) {
    14.  
    15.             }
    16.  
    17.             @Override
    18.             public void onComplete() {
    19.  
    20.             }
    21.         });

    Single

    不同于RxJava1.x中的SingleSubscriber,RxJava2中的SingleObserver多了一个回调方法onSubscribe。

    1. interface SingleObserver<T> {
    2.     void onSubscribe(Disposable d);
    3.     void onSuccess(T value);
    4.     void onError(Throwable error);
    5. }

    Completable

    同Single,Completable也被重新设计为Reactive-Streams架构,RxJava1.x的CompletableSubscriber改为CompletableObserver,源码如下:

    1. interface CompletableObserver<T> {
    2.     void onSubscribe(Disposable d);
    3.     void onComplete();
    4.     void onError(Throwable error);
    5. }

    Subject/Processor

    Processor和Subject的作用是相同的。关于Subject部分,RxJava1.x与RxJava2.x在用法上没有显著区别,这里就不介绍了。其中Processor是RxJava2.x新增的,继承自Flowable,所以支持背压控制。而Subject则不支持背压控制。使用如下:

    1.         //Subject
    2.         AsyncSubject<String> subject = AsyncSubject.create();
    3.         subject.subscribe(-> Log.d("JG",o));//three
    4.         subject.onNext("one");
    5.         subject.onNext("two");
    6.         subject.onNext("three");
    7.         subject.onComplete();
    8.  
    9.        //Processor
    10.         AsyncProcessor<String> processor = AsyncProcessor.create();
    11.         processor.subscribe(-> Log.d("JG",o)); //three
    12.         processor.onNext("one");
    13.         processor.onNext("two");
    14.         processor.onNext("three");
    15.         processor.onComplete();

    操作符

    关于操作符,RxJava1.x与RxJava2.x在命名和行为上大多数保持了一致,部分操作符请查阅文档。

    最后

    RxJava1.x 如何平滑升级到RxJava2.x? 
    由于RxJava2.x变化较大无法直接升级,幸运的是,官方提供了RxJava2Interop这个库,可以方便地将RxJava1.x升级到RxJava2.x,或者将RxJava2.x转回RxJava1.x。地址:https://github.com/akarnokd/RxJava2Interop

  • 相关阅读:
    NOIP 2008 立体图
    JDOJ 1606 数字三角形
    JDOJ 1133 分段公司利润
    NOIP 2011 选择客栈
    NOIP 2014 无线网络发射器选址
    USACO Party Invitations
    NOI 2001 食物链
    NOI 2002 银河英雄传说
    BOI 2003 团伙
    NOIP 2017 奶酪
  • 原文地址:https://www.cnblogs.com/zhujiabin/p/8193028.html
Copyright © 2011-2022 走看看