正在看rxjava,看到lift,在阅读了源码和网上的一些文章,整理了下思路。下文着重不是直接分析源代码,而是从lift解决什么问题和如何解决角度分析lift应该是做什么/怎么做的问题。具体源码实现请参考rxjava,网上很多文章分析的很详细。
Observable的本质上就是异步获取/加工数据(OnSubscribe的call方法),然后通知observer(Observer的几个方法)的一个框架。每个Observable都有一个OnSubscribe(继承Action1接口)对象。在调用Observable的subscribe方法创建,一旦subscribe后,Observable就开始工作。
举例来说,对于一个Observalbe<JSONObject>的对象,可以看作它最总是发射JSONObject数据,要求下游提供一个Subscriber<JSONObject>(Subscriber实现了Observer)来接收数据,而Subscriber<JSONObject>则放在OnSubscribe的参数。
rxjava中的lift是各种操作符的核心所在,具体操作符提供不同的如map,filter等效果。lift的代码设计比较精细,其实只要理解了上面Observable的本质,lift的实现也就迎刃而解了。
刚才讲Observable要获取/加工数据,那么它是怎么获取/加工数据呢,方式很多,如最基本的例子
Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext(new String()); } });
这个是最简单的,但是new String()可说是一个“获取数据的例子”,当然这样写毫无意义。而可能的一种实现是网络获取如
Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(final Subscriber<? super String> subscriber) { RemoteApi.getInstance().getCurrentUserName(new Callback<String>(){ public void onSuccess(String username){ subscriber.onNext(username); subscriber.onCompleted(); } public void onFail(int code, String detail){ subscriber.onError(new Exception(detail)); } }); } });
这都很好理解。
而lift本质是一个Observable数据是从另一个Observable获取应该怎么处理呢?一言不合直接先上代码
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator)); }
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> { static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook(); final OnSubscribe<T> parent; final Operator<? extends R, ? super T> operator; public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) { this.parent = parent; this.operator = operator; } @Override public void call(Subscriber<? super R> o) { try { Subscriber<? super T> st = hook.onLift(operator).call(o); try { // new Subscriber created and being subscribed with so 'onStart' it st.onStart(); parent.call(st); } catch (Throwable e) { // localized capture of errors rather than it skipping all operators // and ending up in the try/catch of the subscribe method which then // prevents onErrorResumeNext and other similar approaches to error handling Exceptions.throwIfFatal(e); st.onError(e); } } catch (Throwable e) { Exceptions.throwIfFatal(e); // if the lift function failed all we can do is pass the error to the final Subscriber // as we don't have the operator available to us o.onError(e); } } }
我们来分析下。
Observable1(O1)调用lift返回Observable2(这是个新的对象O2),此时O2要从O1获取数据,O2是消费者,O1是生产者
O2调用另外一个O1获取数据实际上要做3件事
1. 让O1开始获取数据
2. 获取数据后,发射给O2。
3. O2得到数据后,要发射给O2的消费者
先看第1,如何让O1开始获取数据?记得开始我们所讲的Observable,它有个OnSubscribe对象,它的call方法是获取数据的地方。因此,很简单,调用该方法。
不过,“调用该方法”这么简单一句话一般是产品经理的说得,作为一个程序员,当你脑子想到“调用该方法”时候,就需要落实到实现:要用到的对象从哪来,方法参数是什么,方法参数从哪来,是否有返回值,返回值怎么处理等。
在这里,O2就是通过调用O1的OnSubscribe对象的call方法让O1开始工作的。O1的OnSubscribe对象是在创建O2是传入的,代码清晰可见。
OnSubscribe的call对象要接收一个O2的Subscriber对象,这个就是我们关注的第二件事:“获取数据后,通知Observable2”。
而这里O2传给O1 OnSubscribe对象的Subscriber对象从哪来的?这就是lift的参数Operator的作用了。Operator就是负责提供给生产者(O1)监听回调Subscriber的作用,它实际是泛型为Subscriber的Func1的接口。不同的操作符实质是不同的Operator。比如map方法是OperatorMap,filter的是OperatorFilter,observeOn的是OperatorObserveOn(同样形式实现了线程切换,NB吧)
因此一个调用流程是
1. 第一步:O2的OnSubscribe的call -> 第二步:O2用operator构造Subscriber -> 第三步:O2用该Subscriber调用O1的OnSubscribe的call
按照这三步依次上溯,直到最后一个没有parent的Observable
2. 顶级Observable获取数据,调用下游Observable传来的Subscriber发射数据
3. 如果该Subscriber是你写的(通过subscribe方法),这个就结束了;如果是级联Observer,则
4. 上一步的Subscriber是O2的Operator构造出来,这个Subscriber一个任务,就是对收到的数据进行处理,然后在通知O2的下游消费者(因为下游消费者的Subscriber对象会保存在operator返回的Subscriber中)
5. 如此,2,4,反复调用,直至到第三步,一切game over
借用扔物线文章的图片如下