zoukankan      html  css  js  c++  java
  • RXJava之链式调用(一)

    阿里P7移动互联网架构师进阶视频(每日更新中)免费学习请点击:https://space.bilibili.com/474380680

    Rxjava中链式调用怎么实现的?

            Observable.just("a")     //Observable1
                    .map(new Func1<String, String>() {  //Observable2   
                        @Override
                        public String call(String s) {
                            System.out.print(Thread.currentThread().getName() + ":first--" + s +"
    ");
                            return s + s;
                        }
                    })
                    .subscribe(new Subscriber<String>() { //代码⑥ Subscriber
                        @Override
                        public void onCompleted() {
                            System.out.print(Thread.currentThread().getName()+"
    ");
                            System.out.print("completed"+"
    ");
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            System.out.print("error");
                        }
    
                        @Override
                        public void onNext(String s) {
                            System.out.println(s);
                        }
                    });
        }
    

    先说说自己的理解,若把整个链条看成一个整体对象,那么just创建被观察者对象,而subscribe()里的Subscriber作为观察者;若每一步都分开看,just()和subscribe()中间的操作符即是观察者,又是被观察者。

    Observable中每个操作符基本都会创建出一个新的Observable;因此可以解理成后一级的操作符去观察前一个Observable对象;以上例来说,.subscribe的Subscriber所观察的对象就是.map返回的Observable2,而.map的Subscriber所观察的对象就是 Observable.just("a")得到的对象Observable1;

    下面介绍实现代码,整个链式调用真正开始的地方是.subscribe(),我们就从这里开始。省略掉一些代码,只看关键部分如下:

    private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
          ...
          try {
                // allow the hook to intercept and/or decorate
                hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); //代码①
                return hook.onSubscribeReturn(subscriber);
            }
            ...
    }
    

    hook.onSubscribeStart(observable, observable.onSubscribe)得到的对象就是observable.onSubscribe,而此处的observable明显就是this,也就是上例中的observable2对象,即把subscriber传入到了observable2里面以供其调用。

    再跟着代码进入observable2(.map操作符)的实现。其主要实现是lift和OperatorMap。如下:

        public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
            return lift(new OperatorMap<T, R>(func));
        }
    

    lift和OperatorMap各自干了什么事情呢?先看OperatorMap,Func1也作为构造参数传入。关键代码:

        @Override
        public Subscriber<? super T> call(final Subscriber<? super R> o) {  //代码②
            return new Subscriber<T>(o) {
    
                @Override
                public void onCompleted() {
                    o.onCompleted();
                }
    
                @Override
                public void onError(Throwable e) {
                    o.onError(e);
                }
    
                @Override
                public void onNext(T t) {
                    try {
                        o.onNext(transformer.call(t));
                    } catch (Throwable e) {
                        Exceptions.throwOrReport(e, this, t);
                    }
                }
    
            };
        }
    

    这里new出了一个观察者对象Subscriber,它实现了什么功能通过 o.onNext(transformer.call(t));即将例子中的Func1代码执行后将结果传入到下一层。即这里运行了Func1的代码。

    再看lift()操作符,看其返回值也就是我们定义的observable2对象。因此subscribe里的"代码①"的call即是此处observable2里OnSubscribe的call方法;再看call方法,“代码④”部分则是调用到了observable1对象里OnSubscribe的call方法,而“代码③”将Func1操作动作转变为Subscriber,通过call(o)完成对下一级Subscriber的引用。

     public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
            return new Observable<R>(new OnSubscribe<R>() {
                @Override
                public void call(Subscriber<? super R> o) {
                    try {
                        Subscriber<? super T> st = hook.onLift(operator).call(o); //代码③
                        try {
                            // new Subscriber created and being subscribed with so 'onStart' it
                            st.onStart();
                            onSubscribe.call(st);  //代码④
                        } catch (Throwable e) {
                            // localized capture of errors rather than it skipping all operators 
                            // and ending up in the try/catch of the subscribe method which then
                            // prevents onErrorResumeNext and other similar approaches to error handling
                            Exceptions.throwIfFatal(e);
                            st.onError(e);
                        }
                    } catch (Throwable e) {
                        Exceptions.throwIfFatal(e);
                        // if the lift function failed all we can do is pass the error to the final Subscriber
                        // as we don't have the operator available to us
                        o.onError(e);
                    }
                }
            });
        }
    

    到这里“代码④”执行,即到了observable1对象,也就是例子中 Observable.just("a")所得到对象的OnSubscribe的call()方法,如下:

      public final static <T> Observable<T> just(final T value) {
            return ScalarSynchronousObservable.create(value);
        }
    ScalarSynchronousObservable类代码如下:
    
     public static final <T> ScalarSynchronousObservable<T> create(T t) {
            return new ScalarSynchronousObservable<T>(t);
     }
     protected ScalarSynchronousObservable(final T t) {
            super(new OnSubscribe<T>() {
    
                @Override
                public void call(Subscriber<? super T> s) {
                    /*
                     *  We don't check isUnsubscribed as it is a significant performance impact in the fast-path use cases.
                     *  See PerfBaseline tests and https://github.com/ReactiveX/RxJava/issues/1383 for more information.
                     *  The assumption here is that when asking for a single item we should emit it and not concern ourselves with 
                     *  being unsubscribed already. If the Subscriber unsubscribes at 0, they shouldn't have subscribed, or it will 
                     *  filter it out (such as take(0)). This prevents us from paying the price on every subscription. 
                     */
                    s.onNext(t);  //代码⑤
                    s.onCompleted();
                }
    
            });
            this.t = t;
        }
    

    其中"代码⑤"是关键点,t即是我们just传入的"a",s则是代码④传入的st,它其实是observable2的Subscriber(观察者),相当于observable1持有observable2的引用。通过 s.onNext(t),完成了observable1向下一层的observable2的回调,也就是Func1对象所在的Subscriber(OperatorMap),再通过 o.onNext(transformer.call(t));回到例子中“代码⑥”,至此,整个调用链完成。

    上面的分析比较混乱,重新梳理代码执行流程 :
    1、subscribe里,hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); //代码①
    2、map里,通过lift()将Func1操作符生成Subserber,Subscriber<? super T> st = hook.onLift(operator).call(o); //代码③
    onSubscribe.call(st); //代码④
    3、just里create(), s.onNext(t); //代码⑤
    4、map里, OperatorMap里对象, o.onNext(transformer.call(t));
    5、subscribe 的Subscriber();

    Observable的所有链式调用,知道两个其两个关键点即可梳理清楚整个数据流传递原理;

    Observable.onSubscribe对象,完成以call方法来向上一层传递;
    Subserber向下一层的Subserber调用;

    原文链接:https://www.jianshu.com/p/b5ca80311746

    阿里P7移动互联网架构师进阶视频(每日更新中)免费学习请点击:https://space.bilibili.com/474380680

  • 相关阅读:
    【Vegas原创】使用nid更改DBName和sid
    【Vegas2009】3月3日ubuntu上虚拟xp
    【Vegas原创】rebuild em (Database Control)【Windows/Linux版】
    【Vegas原创】ORA01113: file 1 needs media recovery解决
    【Vegas原创】LogMiner使用实践
    【Vegas原创】如何配置 SQL Server 2005 以允许远程连接
    Linux查看电脑硬件配置情况
    【Vegas原创】使用RMAN Duplicate进行本机对本机的复制
    【Vegas原创】修改oracle用户缺省主目录
    【Vegas原创】Oracle DB 截获造成CPU占用率很高的SQL(Windows)
  • 原文地址:https://www.cnblogs.com/Android-Alvin/p/12102896.html
Copyright © 2011-2022 走看看