zoukankan      html  css  js  c++  java
  • RxJava2绑定流程分析之——观察者和被观察者是如何实现绑定的

    一、概述

      本小节从一个简单的当前线程绑定的例子说起,讲解一下RxJava2的被观察者和观察者是如何实现绑定的。搞明白原理以后就能够更好的理解:为什么上游只要执行了onNext方法,下游就能立马接收到。这就是传说中的“知其然,还有知其所以然”。嘎嘎。

    二、最简单的绑定代码

      绑定流程1.使用Observable.create(ObservableOnSubscribe source)方法生成一个Observable对象。然后利用Observable的subscribe(consumer)执行订阅方法。绑定最终会执行ObservableOnSubscrib实例的subscribe(ObservableEmitter emitter)方法。执行了emitter.onNext后就会执行Consumer.accept方法。下面就针对这个流程进行源码分析。

      下面给出一个最简单的绑定例子

    private void currentThreadRxJavaTest() {
            //创建一个观察者对象
            Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(99);
                }
            });
            //订阅
            Disposable disposable = observable.subscribe(new Consumer<Integer>() {//consumer消费者
                @Override
                public void accept(Integer integer) throws Exception {
                    MyLog.log(integer);
                }
            });
    
        }
    

      上述代码在运行后会打印出:99

    三、源代码分析

      1.创建被观察者源码分析:Observable.create(ObservableOnSubscribe source);

      

       以上源码逻辑:将ObservableOnSubscribe实例传入Observable的create函数。并将其包装成为ObservableCreate。ObservableCreate从而持有了ObservableOnSubscribe实例。RxJavaPlugins.onAssembly对ObservableCreate实例进行hook处理,如果没有处理则直接返回source。RxJavaPlugins.onAssembly的代码如下所示

     到此被观察者Observable的创建就已经完成了。

    2.被观察者与观察者进行绑定:通过Observable.subscribe进行绑定

    subscribe的代码如下

     

     我们重点看下第二个截图:即有LambdaObserver实例的截图。我们发现到这一步创建了一个叫做LambdaObserver的观察者对象,并且此观察者对象持有Consumer接口的实例。需要注意的是Consumer仅仅是一个普通的接口,真正的观察者其实LambdaObserver实例。

    接着,LambdaObserver对象会传入subscribe方法执行下一步操作,如下所示:

     在这段代码中真正重要的只有一行代码subscribeActual(observer).这个方法是Observable类中的抽象方法。其真正的实现是在ObservableCreate(其继承了Observable)类的subscribeActual方法中。如下所示:

    方法参数中的observer其实就是上面的LambdaObserver,source就是Observable.create(source)中的实例。CreateEmitter实现了ObservableEmitter接口。source.subscribe(parent)对应的就是上面简单例子中的ObservableOnSubscribe实例的subscribe方法。

    到此简单的绑定流程就结束了。

    3.被观察者发生改变的时候是如何通知观察者的呢?即:ObservableOnSubscribe.subscribe(ObservableEmitter emitter)方法执行emitter.onNext方法的时候。Observable.subscribe(Consumer consumer)的consumer.accept方法是如何触发的。

    大致流程是这样的:ObservableEmitter是一个接口其真正的实现是CreateEmitter。而CreateEmitter持有观察者LambdaObserver的实例。CreateEmitter调用其onNext方法其实是在调用LambdaObserver的onNext方法。而LambdaObserver的onNext方法其实调用的是consumer的accept方法。如下图LambdaObserver代码所示

     

     其中onNext方法中的onNext其实就是Consumer,也就是Observable.subscribe(Consumer consumer)的参数实例。

    总结:到此RxJava2从创建Observable、绑定Observer到调用发射器从而让accept接收的三个流程线路已经分析完成了。相信大家在开发过程中再遇到类似代码的时候就不仅仅知其然,而且应该知其所以然了。

      

  • 相关阅读:
    应对高并发场景的redis加锁技巧
    Spring中@Transactional事务回滚(含实例具体解说,附源代码)
    计算机网络10--计算机网络体系结构简单介绍
    IIS身份验证的配置
    AMR音频文件格式分析
    IOS版本号被拒的经历
    两分钟读懂《成大事者不纠结》——读书笔记
    同一个TextView设置不同的颜色和大小
    似非而是的程序猿悖论---为什么救火比防火更加吃香?
    OS
  • 原文地址:https://www.cnblogs.com/tony-yang-flutter/p/12322827.html
Copyright © 2011-2022 走看看