zoukankan      html  css  js  c++  java
  • RxJava2 源码分析

    前言

    很多项目使用流行的Rxjava2 + Retrofit搭建网络框架,Rxjava现在已经发展到Rxjava2,之前一直都只是再用Rxjava,但从来没有了解下Rxjava的内部实现,接下来一步步来分析Rxjava2的源码,Rxjava2分Observable和Flowable两种(无被压和有被压),我们今天先从简单的无背压的observable来分析。源码基于rxjava:2.1.1。

    一、Rxjava如何创建事件源、发射事件、何时发射事件、如何将观察者和被观察者关联起来

    简单的例子

    先来段最简单的代码,直观的了解下整个Rxjava运行的完整流程。

     1 private void doSomeWork() {
     2         Observable<String> observable =  Observable.create(new ObservableOnSubscribe<String>() {
     3             @Override
     4             public void subscribe(ObservableEmitter<String> e) throws Exception {
     5                 e.onNext("a");
     6                 e.onComplete();
     7             }
     8         });
     9         Observer observer = new Observer<String>() {
    10 
    11             @Override
    12             public void onSubscribe(Disposable d) {
    13                 Log.i("lx", " onSubscribe : " + d.isDisposed());
    14             }
    15 
    16             @Override
    17             public void onNext(String str) {
    18                 Log.i("lx", " onNext : " + str);
    19             }
    20 
    21             @Override
    22             public void onError(Throwable e) {
    23                 Log.i("lx", " onError : " + e.getMessage());
    24             }
    25 
    26             @Override
    27             public void onComplete() {
    28                 Log.i("lx", " onComplete");
    29             }
    30         };
    31         observable.subscribe(observer);
    32     }

    上面代码之所以将observable和observer单独声明,最后再调用observable.subscribe(observer);
    是为了分步来分析:

    1. 被观察者 Observable 如何生产事件的
    2. 被观察者 Observable 何时生产事件的
    3. 观察者Observer是何时接收到上游事件的
    4. Observable 与Observer是如何关联在一起的

    Observable

    Observable是数据的上游,即事件生产者
    首先来分析事件是如何生成的,直接看代码 Observable.create()方法。

    1 @SchedulerSupport(SchedulerSupport.NONE)
    2     public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {    // ObservableOnSubscribe 是个接口,只包含subscribe方法,是事件生产的源头。
    3         ObjectHelper.requireNonNull(source, "source is null"); // 判空
    4         return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    5     }

    最重要的是RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));这句代码。继续跟踪进去

     1 /**
     2      * Calls the associated hook function.
     3      * @param <T> the value type
     4      * @param source the hook's input value
     5      * @return the value returned by the hook
     6      */
     7     @SuppressWarnings({ "rawtypes", "unchecked" })
     8     @NonNull
     9     public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    10         Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    11         if (f != null) {
    12             return apply(f, source);
    13         }
    14         return source;
    15     }

    看注释,原来这个方法是个hook function。 通过调试得知静态对象onObservableAssembly默认为null, 所以此方法直接返回传入的参数source。
    onObservableAssembly可以通过静态方法RxJavaPlugins. setOnObservableAssembly ()设置全局的Hook函数, 有兴趣的同学可以自己去试试。 这里暂且不谈,我们继续返回代码。
    现在我们明白了:

    1  Observable<String> observable=Observable.create(new ObservableOnSubscribe<String>() {
    2   ...
    3   ...
    4 })

    相当于:

    1 Observable<String> observable=new ObservableCreate(new ObservableOnSubscribe<String>() {
    2   ...
    3   ...
    4 }))

    好了,至此我们明白了,事件的源就是new ObservableCreate()对象,将ObservableOnSubscribe作为参数传递给ObservableCreate的构造函数。
    事件是由接口ObservableOnSubscribe的subscribe方法上产的,至于何时生产事件,稍后再分析。

    Observer

    Observer 是数据的下游,即事件消费者
    Observer是个interface,包含 :

    1     void onSubscribe(@NonNull Disposable d);
    2     void onNext(@NonNull T t);
    3     void onError(@NonNull Throwable e);
    4     void onComplete();

    上游发送的事件就是再这几个方法中被消费的。上游何时发送事件、如何发送,稍后再表。

    subscribe

    重点来了,接下来最重要的方法来了:observable.subscribe(observer);
    从这个方法的名字就知道,subscribe是订阅,是将观察者(observer)与被观察者(observable)连接起来的方法。只有subscribe方法执行后,上游产生的事件才能被下游接收并处理。其实自然的方式应该是observer订阅(subscribe) observable, 但这样会打断rxjava的链式结构。所以采用相反的方式。
    接下来看源码,只列出关键代码

     1 public final void subscribe(Observer<? super T> observer) {
     2         ObjectHelper.requireNonNull(observer, "observer is null");
     3         ......
     4        observer = RxJavaPlugins.onSubscribe(this, observer); // hook ,默认直接返回observer
     5        ......
     6        subscribeActual(observer);  // 这个才是真正实现订阅的方法。
     7        ......
     8     }
     9 
    10 // subscribeActual 是抽象方法,所以需要到实现类中去看具体实现,也就是说实现是在上文中提到的ObservableCreate中
    11 protected abstract void subscribeActual(Observer<? super T> observer);

    接下来我们来看ObservableCreate.java:

     1 public ObservableCreate(ObservableOnSubscribe<T> source) {
     2         this.source = source;  // 事件源,生产事件的接口,由我们自己实现
     3     }
     4 
     5    @Override
     6     protected void subscribeActual(Observer<? super T> observer) {
     7         CreateEmitter<T> parent = new CreateEmitter<T>(observer); // 发射器
     8         observer.onSubscribe(parent);  //直接回调了观察者的onSubscribe
     9 
    10         try {
    11             // 调用了事件源subscribe方法生产事件,同时将发射器传给事件源。 
    12             // 现在我们明白了,数据源生产事件的subscribe方法只有在observable.subscribe(observer)被执行
    13               后才执行的。 换言之,事件流是在订阅后才产生的。
    14             //而observable被创建出来时并不生产事件,同时也不发射事件。
    15           source.subscribe(parent);  
    16         } catch (Throwable ex) {
    17             Exceptions.throwIfFatal(ex);
    18             parent.onError(ex);
    19         }
    20     }

    现在我们明白了,数据源生产事件的subscribe方法只有在observable.subscribe(observer)被执行后才执行的。 换言之,事件流是在订阅后才产生的。而observable被创建出来时并不生产事件,同时也不发射事件。
    接下来我们再来看看事件是如何被发射出去,同时observer是如何接收到发射的事件的
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    CreateEmitter 实现了ObservableEmitter接口,同时ObservableEmitter接口又继承了Emitter接口。
    CreateEmitter 还实现了Disposable接口,这个disposable接口是用来判断是否中断事件发射的。
    从名称上就能看出,这个是发射器,故名思议是用来发射事件的,正是它将上游产生的事件发射到下游的。
    Emitter是事件源与下游的桥梁。
    CreateEmitter 主要包括方法:

    1 void onNext(@NonNull T value);
    2     void onError(@NonNull Throwable error);
    3     void onComplete();
    4     public void dispose() ;
    5     public boolean isDisposed();

    是不是跟observer的方法很像?
    我们来看看CreateEmitter中这几个方法的具体实现:
    只列出关键代码

     1 public void onNext(T t) {
     2          if (!isDisposed()) { // 判断事件是否需要被丢弃
     3              observer.onNext(t); // 调用Emitter的onNext,它会直接调用observer的onNext
     4          }
     5       }
     6       public void onError(Throwable t) {
     7            if (!isDisposed()) {
     8                 try {
     9                     observer.onError(t); // 调用Emitter的onError,它会直接调用observer的onError
    10                 } finally {
    11                     dispose();  // 当onError被触发时,执行dispose(), 后续onNext,onError, onComplete就不会继
    12                                     续发射事件了
    13                 }
    14             }
    15         }
    16 
    17        @Override
    18         public void onComplete() {
    19             if (!isDisposed()) {
    20                 try {
    21                     observer.onComplete(); // 调用Emitter的onComplete,它会直接调用observer的onComplete
    22                 } finally {
    23                     dispose();  // 当onComplete被触发时,也会执行dispose(), 后续onNext,onError, onComplete
    24                                       同样不会继续发射事件了
    25                 }
    26             }
    27         }

    CreateEmitter 的onError和onComplete方法任何一个执行完都会执行dispose()中断事件发射,所以observer中的onError和onComplete也只能有一个被执行。
    现在终于明白了,事件是如何被发射给下游的。当订阅成功后,数据源ObservableOnSubscribe开始生产事件,调用Emitter的onNext,onComplete向下游发射事件,

    Emitter包含了observer的引用,又调用了observer onNext,onComplete,这样下游observer就接收到了上游发射的数据。

    总结

    Rxjava的流程大概是:

    1. Observable.create 创建事件源,但并不生产也不发射事件。
    2. 实现observer接口,但此时没有也无法接受到任何发射来的事件。
    3. 订阅 observable.subscribe(observer), 此时会调用具体Observable的实现类中的subscribeActual方法,
      此时会才会真正触发事件源生产事件,事件源生产出来的事件通过Emitter的onNext,onError,onComplete发射给observer对应的方法由下游observer消费掉。从而完成整个事件流的处理。

         observer中的onSubscribe在订阅时即被调用,并传回了Disposable, observer中可以利用Disposable来随时中断事件流的发射。

    今天所列举的例子是最简单的一个事件处理流程,没有使用线程调度,Rxjava最强大的就是异步时对线程的调度和随时切换观察者线程,未完待续。

    上面分析了Rxjava是如何创建事件源,如何发射事件,何时发射事件,也清楚了上游和下游是如何关联起来的。
    下面着重来分析下Rxjava强大的线程调度是如何实现的。

    二、RxJava的线程调度机制

    简单的例子

     1 private void doSomeWork() {
     2         Observable.create(new ObservableOnSubscribe<String>() {
     3             @Override
     4             public void subscribe(ObservableEmitter<String> e) throws Exception {
     5                 Log.i("lx", " subscribe: " + Thread.currentThread().getName());
     6                 Thread.sleep(2000);
     7                 e.onNext("a");
     8                 e.onComplete();
     9             }
    10         }).subscribe(new Observer<String>() {
    11             @Override
    12             public void onSubscribe(Disposable d) {
    13                 Log.i("lx", " onSubscribe: " + Thread.currentThread().getName());
    14             }
    15             @Override
    16             public void onNext(String str) {
    17                 Log.i("lx", " onNext: " + Thread.currentThread().getName());
    18             }
    19             @Override
    20             public void onError(Throwable e) {
    21                 Log.i("lx", " onError: " + Thread.currentThread().getName());
    22             }
    23             @Override
    24             public void onComplete() {
    25                 Log.i("lx", " onComplete: " + Thread.currentThread().getName());
    26             }
    27         });
    28     }

    运行结果:

    1 com.rxjava2.android.samples I/lx:  onSubscribe: main
    2 com.rxjava2.android.samples I/lx:  subscribe: main
    3 com.rxjava2.android.samples I/lx:  onNext: main
    4 com.rxjava2.android.samples I/lx:  onComplete: main

    因为此方法笔者是在main线程中调用的,所以没有进行线程调度的情况下,所有方法都运行在main线程中。但我们知道Android的UI线程是不能做网络操作,也不能做耗时操作,所以一般我们把网络或耗时操作都放在非UI线程中执行。接下来我们就来感受下Rxjava强大的线程调度能力。

     1 private void doSomeWork() {
     2         Observable.create(new ObservableOnSubscribe<String>() {
     3             @Override
     4             public void subscribe(ObservableEmitter<String> e) throws Exception {
     5                 Log.i("lx", " subscribe: " + Thread.currentThread().getName());
     6                 Thread.sleep(2000);
     7                 e.onNext("a");
     8                 e.onComplete();
     9             }
    10         }).subscribeOn(Schedulers.io()) //增加了这一句
    11           .subscribe(new Observer<String>() {
    12             @Override
    13             public void onSubscribe(Disposable d) {
    14                 Log.i("lx", " onSubscribe: " + Thread.currentThread().getName());
    15             }
    16             @Override
    17             public void onNext(String str) {
    18                 Log.i("lx", " onNext: " + Thread.currentThread().getName());
    19             }
    20             @Override
    21             public void onError(Throwable e) {
    22                 Log.i("lx", " onError: " + Thread.currentThread().getName());
    23             }
    24             @Override
    25             public void onComplete() {
    26                 Log.i("lx", " onComplete: " + Thread.currentThread().getName());
    27             }
    28         });
    29     }

    运行结果:

    1 com.rxjava2.android.samples I/lx:  onSubscribe: main
    2 com.rxjava2.android.samples I/lx:  subscribe: RxCachedThreadScheduler-1
    3 com.rxjava2.android.samples I/lx:  onNext: RxCachedThreadScheduler-1
    4 com.rxjava2.android.samples I/lx:  onComplete: RxCachedThreadScheduler-1

    只增加了subscribeOn这一句代码, 就发生如此神奇的现象,除了onSubscribe方法还运行在main线程(订阅发生的线程)其它方法全部都运行在一个名为RxCachedThreadScheduler-1的线程中。我们来看看rxjava是怎么完成这个线程调度的。

    线程调度subscribeOn

    首先我们先分析下Schedulers.io()这个东东。

    1  @NonNull
    2     public static Scheduler io() {
    3         return RxJavaPlugins.onIoScheduler(IO); // hook function
    4         // 等价于
    5         return IO;
    6     }

    再看看IO是什么, IO是个static变量,初始化的地方是

    1 IO = RxJavaPlugins.initIoScheduler(new IOTask()); // 又是hook function
    2 // 等价于
    3 IO = callRequireNonNull(new IOTask());
    4 // 等价于
    5 IO = new IOTask().call();

    继续看看IOTask

    1 static final class IOTask implements Callable<Scheduler> {
    2         @Override
    3         public Scheduler call() throws Exception {
    4             return IoHolder.DEFAULT;
    5             // 等价于
    6             return new IoScheduler();
    7         }
    8     }

    代码层次很深,为了便于记忆,我们再回顾一下:

    1 Schedulers.io()等价于 new IoScheduler()
    2 
    3     // Schedulers.io()等价于
    4     @NonNull
    5     public static Scheduler io() {
    6         return new IoScheduler();
    7     }

    好了,排除了其他干扰代码,接下来看看IoScheduler()是什么东东了

    IoScheduler看名称就知道是个IO线程调度器,根据代码注释得知,它就是一个用来创建和缓存线程的线程池。看到这个豁然开朗了,原来Rxjava就是通过这个调度器来调度线程的,至于具体怎么实现我们接着往下看

     1 public IoScheduler() {
     2         this(WORKER_THREAD_FACTORY);
     3     }
     4     
     5     public IoScheduler(ThreadFactory threadFactory) {
     6         this.threadFactory = threadFactory;
     7         this.pool = new AtomicReference<CachedWorkerPool>(NONE);
     8         start();
     9     }
    10 
    11     @Override
    12     public void start() {
    13         CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
    14         if (!pool.compareAndSet(NONE, update)) {
    15             update.shutdown();
    16         }
    17     }
    18     
    19     CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
    20             this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
    21             this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
    22             this.allWorkers = new CompositeDisposable();
    23             this.threadFactory = threadFactory;
    24 
    25             ScheduledExecutorService evictor = null;
    26             Future<?> task = null;
    27             if (unit != null) {
    28                 evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
    29                 task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
    30             }
    31             evictorService = evictor;
    32             evictorTask = task;
    33         }

    从上面的代码可以看出,new IoScheduler()后Rxjava会创建CachedWorkerPool的线程池,同时也创建并运行了一个名为RxCachedWorkerPoolEvictor的清除线程,主要作用是清除不再使用的一些线程。

    但目前只创建了线程池并没有实际的thread,所以Schedulers.io()相当于只做了线程调度的前期准备。

    OK,终于可以开始分析Rxjava是如何实现线程调度的。回到Demo来看subscribeOn()方法的内部实现:

    1 public final Observable<T> subscribeOn(Scheduler scheduler) {
    2         ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    3         return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    4     }

    很熟悉的代码RxJavaPlugins.onAssembly,上一篇已经分析过这个方法,就是个hook function, 等价于直接return new ObservableSubscribeOn<T>(this, scheduler);, 现在知道了这里的scheduler其实就是IoScheduler。

    跟踪代码进入ObservableSubscribeOn
    可以看到这个ObservableSubscribeOn 继承自Observable,并且扩展了一些属性,增加了scheduler。 各位看官,这不就是典型的装饰模式嘛,Rxjava中大量用到了装饰模式,后面还会经常看到这种wrap类。

    上篇文章我们已经知道了Observable.subscribe()方法最终都是调用了对应的实现类的subscribeActual方法。我们重点分析下subscribeActual:

     1 @Override
     2     public void subscribeActual(final Observer<? super T> s) {
     3         final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
     4 
     5         // 没有任何线程调度,直接调用的,所以下游的onSubscribe方法没有切换线程, 
     6         //本文demo中下游就是观察者,所以我们明白了为什么只有onSubscribe还运行在main线程
     7         s.onSubscribe(parent);
     8 
     9         parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    10     }

    SubscribeOnObserver也是装饰模式的体现, 是对下游observer的一个wrap,只是添加了Disposable的管理。

    接下来分析最重要的scheduler.scheduleDirect(new SubscribeTask(parent))

     1 // 这个类很简单,就是一个Runnable,最终运行上游的subscribe方法
     2     final class SubscribeTask implements Runnable {
     3         private final SubscribeOnObserver<T> parent;
     4 
     5         SubscribeTask(SubscribeOnObserver<T> parent) {
     6             this.parent = parent;
     7         }
     8 
     9         @Override
    10         public void run() {
    11             source.subscribe(parent);
    12         }
    13     }
    14     @NonNull
    15     public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    16         // IoSchedular 中的createWorker()
    17         final Worker w = createWorker();
    18         // hook decoratedRun=run;
    19         final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    20         // decoratedRun的wrap,增加了Dispose的管理
    21         DisposeTask task = new DisposeTask(decoratedRun, w);
    22         // 线程调度
    23         w.schedule(task, delay, unit);
    24 
    25         return task;
    26     }

    回到IoSchedular

     1 public Worker createWorker() {
     2         // 工作线程是在此时创建的
     3         return new EventLoopWorker(pool.get());
     4     }
     5     
     6     public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
     7             if (tasks.isDisposed()) {
     8                 // don't schedule, we are unsubscribed
     9                 return EmptyDisposable.INSTANCE;
    10             }
    11             // action 中就包含上游subscribe的runnable
    12             return threadWorker.scheduleActual(action, delayTime, unit, tasks);
    13         }

    最终线程是在这个方法内调度并执行的。

     1 public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
     2         // decoratedRun = run, 包含上游subscribe方法的runnable
     3         Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
     4 
     5         // decoratedRun的wrap,增加了dispose的管理
     6         ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
     7 
     8         if (parent != null) {
     9             if (!parent.add(sr)) {
    10                 return sr;
    11             }
    12         }
    13 
    14         // 最终decoratedRun被调度到之前创建或从线程池中取出的线程,
    15         // 也就是说在RxCachedThreadScheduler-x运行
    16         Future<?> f;
    17         try {
    18             if (delayTime <= 0) {
    19                 f = executor.submit((Callable<Object>)sr);
    20             } else {
    21                 f = executor.schedule((Callable<Object>)sr, delayTime, unit);
    22             }
    23             sr.setFuture(f);
    24         } catch (RejectedExecutionException ex) {
    25             if (parent != null) {
    26                 parent.remove(sr);
    27             }
    28             RxJavaPlugins.onError(ex);
    29         }
    30 
    31         return sr;
    32     }

    至此我们终于明白了Rxjava是如何调度线程并执行的,通过subscribeOn方法将上游生产事件的方法运行在指定的调度线程中。

    1 com.rxjava2.android.samples I/lx:  onSubscribe: main
    2 com.rxjava2.android.samples I/lx:  subscribe: RxCachedThreadScheduler-1
    3 com.rxjava2.android.samples I/lx:  onNext: RxCachedThreadScheduler-1
    4 com.rxjava2.android.samples I/lx:  onComplete: RxCachedThreadScheduler-1

    从上面的运行结果来看,因为上游生产者已被调度到RxCachedThreadScheduler-1线程中,同时发射事件并没有切换线程,所以发射后消费事件的onNext onErro onComplete也在RxCachedThreadScheduler-1线程中。

    总结

    1. Schedulers.io()等价于 new IoScheduler()。
    2. new IoScheduler() Rxjava创建了线程池,为后续创建线程做准备,同时创建并运行了一个清理线程RxCachedWorkerPoolEvictor,定期执行清理任务。
    3. subscribeOn()返回一个ObservableSubscribeOn对象,它是Observable的一个装饰类,增加了scheduler
    4. 调用subscribe()方法,在这个方法调用后,subscribeActual()被调用,才真正执行了IoSchduler中的createWorker()创建线程并运行,最终将上游Observablesubscribe()方法调度到新创建的线程中运行。

    现在了解了被观察者执行线程是如何被调度到指定线程中执行的,但很多情况下,我们希望观察者(事件下游)处理事件最好在UI线程执行,比如更新UI操作等。下面分析下游何时调度,如何调度由于篇幅问题。

    三、Rxjava如何对观察者线程进行调度

    简单的例子

     1 private void doSomeWork() {
     2         Observable.create(new ObservableOnSubscribe<String>() {
     3             @Override
     4             public void subscribe(ObservableEmitter<String> e) throws Exception {
     5                 Log.i("lx", " subscribe: " + Thread.currentThread().getName());
     6                 e.onNext("a");
     7                 e.onComplete();
     8             }
     9         }).subscribeOn(Schedulers.io())
    10           .observeOn(AndroidSchedulers.mainThread())
    11           .subscribe(new Observer<String>() {
    12             @Override
    13             public void onSubscribe(Disposable d) {
    14                 Log.i("lx", " onSubscribe: " + Thread.currentThread().getName());
    15             }
    16             @Override
    17             public void onNext(String str) {
    18                 Log.i("lx", " onNext: " + Thread.currentThread().getName());
    19             }
    20             @Override
    21             public void onError(Throwable e) {
    22                 Log.i("lx", " onError: " + Thread.currentThread().getName());
    23             }
    24             @Override
    25             public void onComplete() {
    26                 Log.i("lx", " onComplete: " + Thread.currentThread().getName());
    27             }
    28         });
    29     }

    看看运行结果:

    1 com.rxjava2.android.samples I/lx:  onSubscribe: main
    2 com.rxjava2.android.samples I/lx:  subscribe: RxCachedThreadScheduler-1
    3 com.rxjava2.android.samples I/lx:  onNext: main
    4 com.rxjava2.android.samples I/lx:  onComplete: main

    从结果可以看出,事件的生产线程运行在RxCachedThreadScheduler-1中,而事件的消费线程则被调度到了main线程中。关键代码是因为这句.observeOn(AndroidSchedulers.mainThread())。 下面我们着重分析下这句代码都做了哪些事情。

    AndroidSchedulers.mainThread()

    先来看看AndroidSchedulers.mainThread()是什么?贴代码

    1  /** A {@link Scheduler} which executes actions on the Android main thread. */
    2     public static Scheduler mainThread() {
    3         return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    4     }

    注释已经说的很明白了,是一个在主线程执行任务的scheduler,接着看

     1 private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
     2             new Callable<Scheduler>() {
     3                 @Override public Scheduler call() throws Exception {
     4                     return MainHolder.DEFAULT;
     5                 }
     6             });
     7             
     8     public static Scheduler initMainThreadScheduler(Callable<Scheduler> scheduler) {
     9     if (scheduler == null) {
    10         throw new NullPointerException("scheduler == null");
    11     }
    12     Function<Callable<Scheduler>, Scheduler> f = onInitMainThreadHandler;
    13     if (f == null) {
    14         return callRequireNonNull(scheduler);
    15     }
    16     return applyRequireNonNull(f, scheduler);
    17     }

    代码很简单,这个AndroidSchedulers.mainThread()想当于new HandlerScheduler(new Handler(Looper.getMainLooper())),原来是利用AndroidHandler来调度到main线程的。

    我们再看看HandlerScheduler,它与我们上节分析的IOScheduler类似,都是继承自Scheduler,所以AndroidSchedulers.mainThread()其实就是是创建了一个运行在main thread上的scheduler。
    好了,我们再回过头来看observeOn方法。

    observeOn

     1 public final Observable<T> observeOn(Scheduler scheduler) {
     2         return observeOn(scheduler, false, bufferSize());
     3     }
     4     
     5     public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
     6         ObjectHelper.requireNonNull(scheduler, "scheduler is null");
     7         ObjectHelper.verifyPositive(bufferSize, "bufferSize");
     8         return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
     9     }
    10     

    重点是这个new ObservableObserveOn,看名字是不是有种似成相识的感觉,还记得上篇的ObservableSubscribeOn吗? 它俩就是亲兄弟,是继承自同一个父类。

    重点还是这个方法,我们前文已经提到了,Observable的subscribe方法最终都是调用subscribeActual方法。下面看看这个方法的实现:

     1   @Override
     2     protected void subscribeActual(Observer<? super T> observer) {
     3         // scheduler 就是前面提到的 HandlerScheduler,所以进入else分支
     4         if (scheduler instanceof TrampolineScheduler) {
     5             source.subscribe(observer);
     6         } else {
     7             // 创建 HandlerWorker
     8             Scheduler.Worker w = scheduler.createWorker();
     9             // 调用上游Observable的subscribe,将订阅向上传递
    10             source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    11         }
    12     }

    从上面代码可以看到使用了ObserveOnObserver类对observer进行装饰,好了,我们再来看看ObserveOnObserver

    我们已经知道了,事件源发射的事件,是通过observer的onNext,onError,onComplete发射到下游的。所以看看ObserveOnObserver的这三个方法是如何实现的。
    由于篇幅问题,我们只分析onNext方法,onErroronComplete方法有兴趣的同学可以自己分析下。

     1 @Override
     2     public void onNext(T t) {
     3         if (done) {
     4             return;
     5         }
     6         
     7         // 如果是非异步方式,将上游发射的时间加入到队列
     8         if (sourceMode != QueueDisposable.ASYNC) {
     9             queue.offer(t);
    10         }
    11         schedule();
    12     }
    13     
    14     void schedule() {
    15         // 保证只有唯一任务在运行
    16         if (getAndIncrement() == 0) {
    17             // 调用的就是HandlerWorker的schedule方法
    18             worker.schedule(this);
    19         }
    20     }
    21     
    22         @Override
    23         public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    24             if (run == null) throw new NullPointerException("run == null");
    25             if (unit == null) throw new NullPointerException("unit == null");
    26 
    27             if (disposed) {
    28                 return Disposables.disposed();
    29             }
    30 
    31             run = RxJavaPlugins.onSchedule(run);
    32 
    33             ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
    34 
    35             Message message = Message.obtain(handler, scheduled);
    36             message.obj = this; // Used as token for batch disposal of this worker's runnables.
    37 
    38             handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
    39 
    40             // Re-check disposed state for removing in case we were racing a call to dispose().
    41             if (disposed) {
    42                 handler.removeCallbacks(scheduled);
    43                 return Disposables.disposed();
    44             }
    45 
    46             return scheduled;
    47         }

    schedule方法将传入的run调度到对应的handle所在的线程来执行,这个例子里就是有main线程来完成。 再回去看看前面传入的run吧。

    回到ObserveOnObserver中的run方法:

     1 @Override
     2     public void run() {
     3         // 此例子中代码不会进入这个分支,至于这个drainFused是什么,后面章节再讨论。
     4         if (outputFused) {
     5             drainFused();
     6         } else {
     7             drainNormal();
     8         }
     9     }
    10     
    11     void drainNormal() {
    12         int missed = 1;
    13 
    14         final SimpleQueue<T> q = queue;
    15         final Observer<? super T> a = actual;
    16 
    17         for (;;) {
    18             if (checkTerminated(done, q.isEmpty(), a)) {
    19                 return;
    20             }
    21 
    22             for (;;) {
    23                 boolean d = done;
    24                 T v;
    25 
    26                 try {
    27                     // 从队列中queue中取出事件
    28                     v = q.poll();
    29                 } catch (Throwable ex) {
    30                     Exceptions.throwIfFatal(ex);
    31                     s.dispose();
    32                     q.clear();
    33                     a.onError(ex);
    34                     worker.dispose();
    35                     return;
    36                 }
    37                 boolean empty = v == null;
    38 
    39                 if (checkTerminated(d, empty, a)) {
    40                     return;
    41                 }
    42 
    43                 if (empty) {
    44                     break;
    45                 }
    46                 //调用下游observer的onNext将事件v发射出去
    47                 a.onNext(v);
    48             }
    49 
    50             missed = addAndGet(-missed);
    51             if (missed == 0) {
    52                 break;
    53             }
    54         }
    55     }

    至此我们明白了RXjava是如何调度消费者线程了。

    消费者线程调度流程概括

    Rxjava调度消费者现在的流程,以observeOn(AndroidSchedulers.mainThread())为例。

    1. AndroidSchedulers.mainThread()先创建一个包含handlerScheduler, 这个handler是主线程的handler
    2. observeOn方法创建ObservableObserveOn,它是上游Observable的一个装饰类,其中包含前面创建的SchedulerbufferSize等.
    3. 当订阅方法subscribe被调用后,ObservableObserveOnsubscribeActual方法创建Scheduler.Worker并调用上游的subscribe方法,同时将自身接收的参数'observer'用装饰类ObserveOnObserver装饰后传递给上游。
    4. 当上游调用被ObserveOnObserveronNextonErroronComplete方法时,ObserveOnObserver将上游发送的事件通通加入到队列queue中,然后再调用scheduler将处理事件的方法调度到对应的线程中(本例会调度到main thread)。 处理事件的方法将queue中保存的事件取出来,调用下游原始的observer再发射出去。
    5. 经过以上流程,下游处理事件的消费者线程就运行在了observeOn调度后的thread中。

    总结

    经过前面两节的分析,我们已经明白了Rxjava是如何对线程进行调度的。

    • Rxjava的subscribe方法是由下游一步步向上游进行传递的。会调用上游的subscribe,直到调用到事件源。
      如: source.subscribe(xxx);

    而上游的source往往是经过装饰后的Observable, Rxjava就是利用ObservableSubscribeOnsubscribe方法调度到了指定线程运行,生产者线程最终会运行在被调度后的线程中。但多次调用subscribeOn方法会怎么样呢? 我们知道因为subscribe方法是由下而上传递的,所以事件源的生产者线程最终都只会运行在第一次执行subscribeOn所调度的线程中,换句话就是多次调用subscribeOn方法,只有第一次有效。

    • Rxjava发射事件是由上而下发射的,上游的onNextonErroronComplete方法会调用下游传入的observer的对应方法。往往下游传递的observer对象也是经过装饰后的observer对象。Rxjava就是利用ObserveOnObserver将执行线程调度后,再调用下游对应的onNextonErroronComplete方法,这样下游消费者就运行再了指定的线程内。 那么多次调用observeOn调度不同的线程会怎么样呢? 因为事件是由上而下发射的,所以每次用observeOn切换完线程后,对下游的事件消费都有效,比如下游的map操作符。最终的事件消费线程运行在最后一个observeOn切换后线程中。
    • 另外通过源码可以看到onSubscribe运行在subscribe的调用线程中,这个就不具体分析了。
  • 相关阅读:
    jQuery的标签选择器$('p')、类选择器$('.myClass')、id选择器$('#myId')
    jQuery Validate验证框架与 jQuery ajaxSubmit的联合使用
    23种设计模式(一) 单例模式
    java 常见的几种运行时异常RuntimeException
    Servlet 生命周期、工作原理
    throw与throws的区别
    Apache Shiro java安全框架
    web.xml 中<context-param>与<init-param>的区别与作用
    web.xml 中CharacterEncodingFilter类的学习
    web.xml中的contextConfigLocation在spring中的作用
  • 原文地址:https://www.cnblogs.com/linghu-java/p/9719427.html
Copyright © 2011-2022 走看看