一:RxJava执行流程:
RxJava简单使用
private final String tag = getClass().getSimpleName(); //数据源,被观察对象 Observable<String> obser = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { Log.d(tag,"emit 1"); emitter.onNext("t1"); Log.d(tag,"emit 2"); emitter.onNext("t2"); Log.d(tag,"emit 3"); emitter.onNext("t3"); Log.d(tag,"emit "); emitter.onComplete(); } }); private void observer_test(){ Observer<String> dnObser = new Observer<String>() {//观察者,处理对应事件 @Override public void onSubscribe(Disposable d) { Log.d(tag,"onSubscribe"); } @Override public void onNext(String s) { Log.d(tag,"onNext "+s); } @Override public void onError(Throwable e) { Log.e(tag,"onError"); } @Override public void onComplete() { Log.d(tag,"onComplete"); } }; obser.subscribe(dnObser); }); }
从例子中看出RxJava主要组成:
Observable:被观察者,被观察者本身
ObservableOnSubscribe:通知观察者执行哪些行为
Observer:观察者,通过实现对应方法做具体处理
订阅过程处理:
@Override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); //开始调用ObservableOnSubscribe subscribe subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
protected abstract void subscribeActual(Observer<? super T> observer);
查看subscribe方法为抽象方法,具体实现为ObservableCreate,从Observabel的create方法可以知道
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
查看Observable内的源码
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {//这里调用ObservableOnSubscribe 的subscribe方法,开始执行事件流程
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
......
}
二:数据转换
收到Observable的消息之前我们有可能会对数据流进行处理,例如map()、flatMap()、fllter()等方法,
这里使用了map()方法,它接收了observeable的数据并将通过该方法将数据进行转换后的新数据发出去,即做了中间转化
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); }
rxjava2使用了Function接口提供转换功能,
public interface Function<T, R> { //将T类型数据转化为R处理 @NonNull R apply(@NonNull T t) throws Exception; }
具体操作交给ObservableMap内部类MapObserver处理
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> { final Function<? super T, ? extends U> function; public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) { super(source); this.function = function; } @Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); } static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> { final Function<? super T, ? extends U> mapper; MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) { super(actual); this.mapper = mapper; } @Override public void onNext(T t) { ....... U v; try { //调用Function apply处理 v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; }//将转换后的类型再传递给原Obsever actual.onNext(v); } ...... } }
MapObserver实现Observer,持有传入的Observer,通过Function的mapper.apply(t)进行转换后再传递给原observer onNext()
三:任务调度(scheduler)
通过使用subscribeOn()、observeOn()方法传入对应的Scheduler去指定每个操作应该运行在何种线程之中
Observable.create(...)
...
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
...
.subscribeOn(Schedulers.newThread())
...
.subscribeOn(Schedulers.computation())
...
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(...)
@CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }
创建了一个新的Observable
,并为新的Observable
创建了新的计划表ObservableSubscribeOn对象,新的计划表保存了原始Observable
对象和调度器scheduler
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); //调用了Scheduler的shedule方法,创建Runable内部执行原obseverable sbscribe parent.setDisposable(scheduler.scheduleDirect(new Runnable() { @Override public void run() { source.subscribe(parent); } })); } ... }
以IOScheduler为例
Scheduler schdule
@NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); //调用Work 切换执行线程 w.schedule(new Runnable() { @Override public void run() { try { decoratedRun.run(); } finally { w.dispose(); } } }, delay, unit); return w; }
@NonNull @Override public Worker createWorker() { return new EventLoopWorker(pool.get()); } public int size() { return pool.get().allWorkers.size(); } static final class EventLoopWorker extends Scheduler.Worker { private final CompositeDisposable tasks; private final CachedWorkerPool pool; private final ThreadWorker threadWorker; final AtomicBoolean once = new AtomicBoolean(); EventLoopWorker(CachedWorkerPool pool) { this.pool = pool; this.tasks = new CompositeDisposable(); this.threadWorker = pool.get(); } 。。。。 @NonNull @Override public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { if (tasks.isDisposed()) { // don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; } return threadWorker.scheduleActual(action, delayTime, unit, tasks); } } static final class ThreadWorker extends NewThreadWorker { private long expirationTime; ThreadWorker(ThreadFactory threadFactory) { super(threadFactory); this.expirationTime = 0L; } 。。。 }
我们从缓存池里拿到需要的worker并作了一层封装成为EventLoopWorker:最后调用NewThreadWorker 的scheduleActual
看NewThreadWorker实现:
public class NewThreadWorker extends Scheduler.Worker implements Disposable { private final ScheduledExecutorService executor;//线程执行器 ... public NewThreadWorker(ThreadFactory threadFactory) { executor = SchedulerPoolFactory.create(threadFactory); } ...... //通过Execotor来执行上面传递过来的Runable对象,达到在不同类型线程来执行调用Observer方法 public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); try { Future<?> f; if (delayTime <= 0) { f = executor.submit(decoratedRun);//提交到线程池执行 } else { f = executor.schedule(decoratedRun, delayTime, unit); } return Disposables.fromFuture(f); } catch (RejectedExecutionException ex) { RxJavaPlugins.onError(ex); return EmptyDisposable.INSTANCE; } }
再看看observeOn
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); }
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; final boolean delayError; final int bufferSize; public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) { super(source); this.scheduler = scheduler; this.delayError = delayError; this.bufferSize = bufferSize; } @Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } } static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable { private static final long serialVersionUID = 6576896619930983584L; final Observer<? super T> actual; final Scheduler.Worker worker; final boolean delayError; final int bufferSize; SimpleQueue<T> queue; Disposable s; Throwable error; volatile boolean done; volatile boolean cancelled; int sourceMode; boolean outputFused; ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) { this.actual = actual; this.worker = worker; this.delayError = delayError; this.bufferSize = bufferSize; } @Override public void onSubscribe(Disposable s) { ... actual.onSubscribe(this); } } @Override public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); } @Override public void onError(Throwable t) { if (done) { RxJavaPlugins.onError(t); return; } error = t; done = true; schedule(); } @Override public void onComplete() { if (done) { return; } done = true; schedule(); } ... void schedule() {//任务调度,交给线程池回调Runable if (getAndIncrement() == 0) { worker.schedule(this); } } @Override
public void run() {//回调处理,代理调用原Observer方法
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
.......
a.onNext(v);
......
}
} ... } }
这里通过ObservableObserveOn代理,实现Observer observeOn线程切换处理
未完待续。。。