一、概述
本小节从一个简单的当前线程绑定的例子说起,讲解一下RxJava2的被观察者和观察者是如何实现绑定的。搞明白原理以后就能够更好的理解:为什么上游只要执行了onNext方法,下游就能立马接收到。这就是传说中的“知其然,还有知其所以然”。嘎嘎。
二、最简单的绑定代码
绑定流程1.使用Observable.create(ObservableOnSubscribe source)方法生成一个Observable对象。然后利用Observable的subscribe(consumer)执行订阅方法。绑定最终会执行ObservableOnSubscrib实例的subscribe(ObservableEmitter emitter)方法。执行了emitter.onNext后就会执行Consumer.accept方法。下面就针对这个流程进行源码分析。
下面给出一个最简单的绑定例子
private void currentThreadRxJavaTest() { //创建一个观察者对象 Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(99); } }); //订阅 Disposable disposable = observable.subscribe(new Consumer<Integer>() {//consumer消费者 @Override public void accept(Integer integer) throws Exception { MyLog.log(integer); } }); }
上述代码在运行后会打印出:99
三、源代码分析
1.创建被观察者源码分析:Observable.create(ObservableOnSubscribe source);
以上源码逻辑:将ObservableOnSubscribe实例传入Observable的create函数。并将其包装成为ObservableCreate。ObservableCreate从而持有了ObservableOnSubscribe实例。RxJavaPlugins.onAssembly对ObservableCreate实例进行hook处理,如果没有处理则直接返回source。RxJavaPlugins.onAssembly的代码如下所示
到此被观察者Observable的创建就已经完成了。
2.被观察者与观察者进行绑定:通过Observable.subscribe进行绑定
subscribe的代码如下
我们重点看下第二个截图:即有LambdaObserver实例的截图。我们发现到这一步创建了一个叫做LambdaObserver的观察者对象,并且此观察者对象持有Consumer接口的实例。需要注意的是Consumer仅仅是一个普通的接口,真正的观察者其实LambdaObserver实例。
接着,LambdaObserver对象会传入subscribe方法执行下一步操作,如下所示:
在这段代码中真正重要的只有一行代码subscribeActual(observer).这个方法是Observable类中的抽象方法。其真正的实现是在ObservableCreate(其继承了Observable)类的subscribeActual方法中。如下所示:
方法参数中的observer其实就是上面的LambdaObserver,source就是Observable.create(source)中的实例。CreateEmitter实现了ObservableEmitter接口。source.subscribe(parent)对应的就是上面简单例子中的ObservableOnSubscribe实例的subscribe方法。
到此简单的绑定流程就结束了。
3.被观察者发生改变的时候是如何通知观察者的呢?即:ObservableOnSubscribe.subscribe(ObservableEmitter emitter)方法执行emitter.onNext方法的时候。Observable.subscribe(Consumer consumer)的consumer.accept方法是如何触发的。
大致流程是这样的:ObservableEmitter是一个接口其真正的实现是CreateEmitter。而CreateEmitter持有观察者LambdaObserver的实例。CreateEmitter调用其onNext方法其实是在调用LambdaObserver的onNext方法。而LambdaObserver的onNext方法其实调用的是consumer的accept方法。如下图LambdaObserver代码所示
其中onNext方法中的onNext其实就是Consumer,也就是Observable.subscribe(Consumer consumer)的参数实例。
总结:到此RxJava2从创建Observable、绑定Observer到调用发射器从而让accept接收的三个流程线路已经分析完成了。相信大家在开发过程中再遇到类似代码的时候就不仅仅知其然,而且应该知其所以然了。