RxJava是响应式程序设计的一种实现。在响应式程序设计中,当数据到达的时候,消费者做出响应。响应式编程可以将事件传递给注册了的observer。今天我们就来学习一下rxJava,并分析一下它源码感受一下它的观察者模式。
RxJava的简单使用
一、mavan的pom.xml中增加rxjava的依赖
这里我们用的是rxjava1.3.0,目前最新的已经更新到2了。
<dependency> <groupId>io.reactivex</groupId> <artifactId>rxjava</artifactId> <version>1.3.0</version> </dependency>
二、测试的用类
import rx.Observable; import rx.Subscriber; /** * @author huhx */ public class RxJavaTest { public static void main(String[] args) { Observable<String> observable = Observable.just("One", "Two", "Three"); observable.subscribe(new Subscriber<String>() { @Override public void onCompleted() { System.out.println("onCompleted"); } @Override public void onError(Throwable item) { System.out.println("onError"); } @Override public void onNext(String item) { System.out.println("Item is " + item); } }); } }
三、打印的结果如下:
Item is One Item is Two Item is Three onCompleted
RxJava源码的简单分析
根据上述的代码,我们简单分析一下程序的流程。
一、首先Observable.just("One", "Two", "Three")代码:
just是一个工厂方法,用心构建一个Observable对象。
public static <T> Observable<T> just(T t1, T t2, T t3) { return from((T[])new Object[] { t1, t2, t3 }); }
from方法的代码如下:
public static <T> Observable<T> from(T[] array) { int n = array.length; if (n == 0) { return empty(); } else if (n == 1) { return just(array[0]); } return unsafeCreate(new OnSubscribeFromArray<T>(array)); }
二、我们重点是看unsafeCreate方法:
public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) { return new Observable<T>(RxJavaHooks.onCreate(f)); }
对于RxJavaHooks.onCreate()方法,第一次会先执行RxJavaHooks静态块的代码。
static { init(); // 初始化了很多RxJavaHooks跟事件有着的变量 }
onCreate方法中,这个没有怎么看懂后续再会。
public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) { Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate; if (f != null) { return f.call(onSubscribe); } return onSubscribe; }
三、对于observable.subscribe()代码
public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this); }
Observable的subscribe方法代码如下:
1 static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { 2 if (subscriber == null) { 3 throw new IllegalArgumentException("subscriber can not be null"); 4 } 5 if (observable.onSubscribe == null) { 6 throw new IllegalStateException("onSubscribe function can not be null."); 7 } 8 9 subscriber.onStart(); 10 11 if (!(subscriber instanceof SafeSubscriber)) { 12 subscriber = new SafeSubscriber<T>(subscriber); 13 } 14 15 try { 16 RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); 17 return RxJavaHooks.onObservableReturn(subscriber); 18 } catch (Throwable e) { 19 Exceptions.throwIfFatal(e); 20 if (subscriber.isUnsubscribed()) { 21 RxJavaHooks.onError(RxJavaHooks.onObservableError(e)); 22 } else { 23 try { 24 subscriber.onError(RxJavaHooks.onObservableError(e)); 25 } catch (Throwable e2) { 26 Exceptions.throwIfFatal(e2); 27 RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); 28 RxJavaHooks.onObservableError(r); 29 throw r; // NOPMD 30 } 31 } 32 return Subscriptions.unsubscribed(); 33 } 34 }
四、整个的正常流程会走到16行的代码,这个是我们重点的分析地方。如果是异常情况,则只会执行onError方法
对于这个例子中,RxJavaHooks.onObservableStart(observable, observable.onSubscribe)得到的结果OnSubscribeFromArray这个类。调用它的call方法。
public void call(Subscriber<? super T> child) { child.setProducer(new FromArrayProducer<T>(child, array)); }
setProducer方法执行的代码如下:
1 public void setProducer(Producer p) { 2 long toRequest; 3 boolean passToSubscriber = false; 4 synchronized (this) { 5 toRequest = requested; 6 producer = p; 7 if (subscriber != null) { 8 // middle operator ... we pass through unless a request has been made 9 if (toRequest == NOT_SET) { 10 // we pass through to the next producer as nothing has been requested 11 passToSubscriber = true; 12 } 13 } 14 } 15 // do after releasing lock 16 if (passToSubscriber) { 17 subscriber.setProducer(producer); 18 } else { 19 // we execute the request with whatever has been requested (or Long.MAX_VALUE) 20 if (toRequest == NOT_SET) { 21 producer.request(Long.MAX_VALUE); 22 } else { 23 producer.request(toRequest); 24 } 25 } 26 }
这里会走到21行的代码,跟进去FromArrayProducer的request方法:
public void request(long n) { if (n < 0) { throw new IllegalArgumentException("n >= 0 required but it was " + n); } if (n == Long.MAX_VALUE) { if (BackpressureUtils.getAndAddRequest(this, n) == 0) { fastPath(); } } else if (n != 0) { if (BackpressureUtils.getAndAddRequest(this, n) == 0) { slowPath(n); } } }
对于fastPath()的代码如下:
void fastPath() { final Subscriber<? super T> child = this.child; for (T t : array) { if (child.isUnsubscribed()) { return; } child.onNext(t); } if (child.isUnsubscribed()) { return; } child.onCompleted(); }
执行onNext()方法,其中t就是Observable.just()方法的参数。child就是observable.subscribe的定义的参数。