zoukankan      html  css  js  c++  java
  • RxJava 以及 Android 中的通用线程解决方案、并发与线程安全

    关于RxJava如今是熟到发紫了,所以对于它底层的动作机制的了解是迫在眉睫了,费话不多说,直接开始。

    这里还是以之前获取个人github仓库列表为例,用retrofit+rxjava,也是实际项目中用得最多的,先来回顾一下当时【https://www.cnblogs.com/webor2006/p/10502230.html】在研究retrofit时所定义的API:

    这里新建一个工程,深入之前先来学会RxJava的基本使用,先来声明API,首先得增加retrofit的支持,如下:

    然后API定义如下:

    具体使用:

    运行:

    以上不多说,只是回顾一下retrofit的用法,接下来改用RxJava的方式,先增加相关的依赖:

    ,首先API返回的由Call就得变为Single了,如下:

    而在我的公司项目中貌似返回的Observable这个对像,如下:

    关于这俩的区别在之后再说,先默认使用Single,修改了API之后,接下来使用一下:

    此时编译运行一下:

    报错了,貌似是无法从Call转换成Single对像,因为API如今是返回Single了:

    所以,此时需要增加一个CallAdapter的配置,用来支持进行转换,如下:

    然后再运行:

    失败了。。这是为啥呢?我们知道其实RxJava是需要手动来切换线程的,所以先增加它,先不管其内部实现:

    成功啦,目前由于在回调中木有操作View,如果在回调中操作View会怎么样呢,试试:

    再编译运行:

    很显然是由于是非UI线程更新View异常了,所以此时对于回调所在线程是在非UI线程里的,所以需要将其切换到主线程里,这里需要增加另外一个库了:

    编译运行:

    其中还有一个回调方法:

    请求前的回调,所以可以在请求前来在界面中增加一个请求提示,如下:

    其中这个回调有一个Disposable参数,它的意义简单说可以理解成取消订阅,具体可以这样用它:

    好,对于利用Rxjava来实现网络请求的简单使用就到这,接下来则重点是来理解这个框架,当然不是先来直接分析这个代码,而是将其拆解,从最基础的使用上来理解,如下:

    Single和Observable:

    运行一下:

    顺间就显示了,因为是本地的数据,对于纯小白来说看到这样的写法还是挺难理解的,其订阅关系是:

    接下来就彻底来挼清这个简单代码的运作机理,先来看下它:

    很明显是来对参数做判空,简单瞅下它的实现,很简单:

    继续:

    这句涉及到两个陌生的东东:SingleJust、RxJavaPlugins.onAssembly(),下面先来看下RxJavaPlugins.onAssembly()的细节:

    看到泛型就晕。。也只能硬着头皮细细揣摩:

    其中Function是传一个类型的类,返回另一个类型,熟悉Java8的亲们应该比较熟悉:

    如果onSingleAssembly不为空则就执行转换了,而它是用户可以配置的,其实也就是是否要对Single对象进行进一步加工,默认肯定是不需要,所以,对于这句话可以简单理解:

    所以此just()方法重点看到它就成了,所以点进去看一下SingleJust,如下:

    那再看一下Single,发现是一个3000多行超级大的抽象类:

    所以里面的细节就不必现跟了,总之记着SingleJust里面重写了subscribeActual()方法:

    这个方法会在调用它时最终被调用的:

    所以点击进去看是不是这样:

    很显然最终会调用SingleJust中的subscribeActual()方法嘛,所以要理解这个回调是如何调用的:

    就需要看SingleJust的subscribeActual()方法的具体实现细节啦,如下:

    秒懂不,直接就调用了我们传的回调对象嘛,以上简单的示例虽说简单,但是完整的阐述了RxJava的一个订阅实现细节,也就是其比较核心的原理,很重要,再简单挼下:

    此时会在里面生成一个SingleJust对像:

    当调用它时,实际会调用SingleJust.subscribeActual()方法,而subscribeActual()方法最终又会回调我们的这个对象的回调方法,如下:

    至此,整个这个示例原理就彻底理清楚了,接下来继续深入RxJava,此时就涉及到操作符相关的东东了,关于什么是操作符,对于熟悉Java8的亲们也是不会陌生滴,就不过多解释啦,这里以map转换符为例,如下:

    此时运行效果一样,这里的map做了啥事件,就已经牵扯到RxJava的结构啦,下面用图来剖析下:

    而在我们调用Single.subscribe()方法时,其调用其实是这样子滴:

    好,那此时如果再加上一个map的转换操作符后,具体又是如何运转的呢?

    具体是如何转换的呢?此时从图中切回到代码:

    等于创建了一个新的SingleMap,也就是表象上看着还是Single对象,其实内部已经发生对象的转换了,当然需要将当前的Single对像给传进来:

    而此时又回到图画一下当map()之后的形态变化:

    接下来就瞅一下SingleMap干了啥?

    其中source则是原Single对像,调用了它的subscribe方法,但是!!此时的SingleObserver对像变化了,变为了MapSingleObserver啦,此时的图片形态就变成了:

    接下来则具体看下它里面是如何工作的:

    接着执行onSuccess()方法:

    当然还可以再多增加其它的操作符,总的来说源和目标肯定只有一个,中间的都是形成了一个链,发送会往源传,得到结果之后则从源往目标传,理解这个理念非常之重要。

    说了Single,也谈一下Observable,Rxjava1时还木有Single,其实Single就是Observable的简化版本,它的回调就只观注开始和结束,而对于Observable而言,它的中间过程也会回调,如下:

    其中的onNext()可能会被调用多次,了解一下区别既可。

    Disposable:

    在之前咱们已经稍微提到过它,其实是可以做取消订阅用的,下面用一个例子来实践一下,这里采用Observable来进行每秒间隔发事件,如下:

    运行看一下效果:

    此时增加一个按钮,在运行期间将其停止掉,这时就可以利用Disposable了,如下:

    运行:

    接下来很想知道Observable.subscribe()方法到底做了啥导致在onSubscribe()方法中得到了一个什么样的Disposable,才能知道Disposable是怎么工作的,不同的Observable它的Disposable是不一样的,所以咱们先来看一下目前这个Observable是一个什么样的Observable,看它:

    点开看一下interval():

    下面来细看一下:

    此时就得查看一下IntervalObserver了:

    也就是说:

    调用的也就是IntervalObserver.dispose()方法,所以得研究一下它的具体实现:

    然后看一下具体实现:

    而对于IntervalObserver中的run()方法就会根据这个状态来做判断,如下:

    没有中间操作符的Disposable工作方式都是这样的,没有一个传递的过程,接下来咱们来看一下有map操作的Disposable又是咋样的:

    然后点击查看源码:

    也就是最终:

    调用的是MapObserver.dispose()方法,但是!!貌似在MapObserver中木有找到dispose()方法呀:

    这就需要往父类进行查找喽,果真有,如下:

    其中的s是?

    那不就是源,也就是最终会调用源的dispose()方法,如下:

    总结Disposable也就是两种场景:

    1、原始的Disposable则是停掉自己的任务。

    2、如果带有操作符的,则先停掉自己,然后再停掉原始的Disposable。

    线程切换Scheduler:

    关于线程切换有几处,咱们一个个来剖析,这里还是回到Single的程序来,首先来瞅下它:

    点击进行:

    其中核心是:

    也就是我们调用subscribeOn所传的参数:

    点进去瞅下:

    具体它的方法先不分析,可以看到参数是需要一个Runnable,而由于SubscribeOnObserver实现了Runnable方法,所以当然可以将它传进去喽:

    而它里面的run方法的实现:

    也就是说:

    此时我们的调用就会在指定的线程来做了,如下:

    用图来说明一下这个subscribeOn()的过程:

    那如果写多个subscribeOn()呢?

    其实最终会用第一行的切线程,图例表示其形态:

    而对于subscribeOn()之后的Disposable则为:

    也就是subscribeOn()指定的线程是用来决定subscribe()的订阅操作的,那接下来看一下observeOn(),如下:

    此时就看一下它的回调中瞅下是否有切线程的东东:

    那如果有多个observeOn会有啥结果呢?由于是管下面的,可能会是这样:

    那么就有东东可以利用啦,怎么利用?假如再插入一个操作符:

    也就是使用observeOn()多次切是会生效的,而不像subscribeOn()使用多次是没啥效果的,那有了observeOn()的灵活性,subscribeOn()是不是可以不用了?当然不行,因为subscribeOn()是管订阅的,需要配合着使用。总的来说:subscribeOn()是先切线程再进行订阅,而observeOn()是先订阅,而每次回调会切线程处理

    接下来再研究一下它们的原理:

    首先来瞅一下Schedulers.io(),不过有一个跟它类似的api需要先看它:

    先说一下它们俩的区别:

    Schedulers.newThread():每次都会新开一个线程。

    Schedulers.io():里面用到了线程池,不是每次都新开线程。

    好,先来了解下Schedulers.newThread()是做了啥?

    直接看NEW_THREAD:

     而切换线程在上面的分析中主要是这句代码:

    所以咱们来瞅一下它里面有木有scheduleDirect()这个方法,发现木有。。那就肯定是在它的父类找呗,找找:

    确实是有:

    下面来简单分析一下:

    然后核心代码:

    点进去瞅一下:

    抽象方法,肯定得看子类:

    一层套一层,继续:

    其实可以看出Scheduler就是包装Worker的东东,而Worker是包装了executor的东东,咱们再回过头来看:

    就会调到子类:

    再回过头来瞅一下DisposeTask:

    好,了解了Schedulers.newThread()机制之后, 就可以来理解Schedulers.io()了,其实会有一个IoScheduler对像,如下:

    然后它里面也有一个createWork()方法,瞅一下:

    就不往里跟了,很明显确实io()就是带有缓存的,不是每次都new一个线程。

    好接下来就看最后一个主线程的scheduler啦:

    点击进来:

    然后可以大致看一下切换细节:

    至此!!关于Rxjava的核心机制原理都已经剖析完毕,还是挺麻烦!!

  • 相关阅读:
    Haskell Interactive Development in Emacs
    Access Java API in Groovy Script
    手工设置Eclipse文本编辑器的配色
    Color Theme of Emacs
    Gnucash的投资记录
    Special Forms and Syntax Sugars in Clojure
    Use w3m as Web Browser
    SSE指令集加速之 I420转BGR24
    【图像处理】 增加程序速度的方法
    TBB 入门笔记
  • 原文地址:https://www.cnblogs.com/webor2006/p/10545699.html
Copyright © 2011-2022 走看看