zoukankan      html  css  js  c++  java
  • Android主流框架——Rxjava(原理分析)

    基本接口定义

    package com.example.rxjavapractice.my.rxjava
    
    import io.reactivex.functions.Function
    
    
    public abstract class Observable<T> {
    
        companion object {
            /**
             * 创建:将传递进来的参数教给ObservableCreate管理
             */
            public fun <T> create(
                observableOnSubscribe: ObservableOnSubscribe<T>
            ): ObservableCreate<T> {
                return ObservableCreate<T>(observableOnSubscribe)
            }
        }
    
        /**
         * 订阅:实现创建出来的ObservableCreate对象的subscribeActual方法
         */
        public fun subscribe(observer: Observer<T>) {
            subscribeActual(observer)
        }
    
        public abstract fun subscribeActual(observer: Observer<T>)
    
    }
    
    public interface Observer<T> {
        fun onSubscribe()
        fun onNext(t: T)
        fun onError()
        fun onComplete()
    }
    
    /**
     * 被观察者被订阅时
     */
    public interface ObservableOnSubscribe<T> {
        fun subscribe(emitter: Emitter<T>)
    }
    
    public interface Emitter<T> {
        fun onNext(t: T)
        fun onError()
    }

    订阅过程

    package com.example.rxjavapractice.my.rxjava
    
    import io.reactivex.disposables.Disposable
    import io.reactivex.internal.disposables.DisposableHelper
    import java.util.concurrent.atomic.AtomicReference
    
    /**
     * 订阅的流程:
     *  Observable(source).subscribe(observer) ->
     *  ObservableCreate(source).subscribeActual(observer) ->
     *      EmitterCreate(observer) ->
     *      observer.onSubscribe() ->
     *      source.subscribe(emitterCreate) ->
     *   EmitterCreate.onNext() ->
     *   Observer.onNext()
     */
    
    /**
     * 继承Observable,父类在subscribe时,走该类的subscribeActual
     * Observable.subscribe -> ObservableCreate.subscribeActual
     */
    public class ObservableCreate<T>(
        private val source: ObservableOnSubscribe<T>
    ) : Observable<T>() {
    
        override fun subscribeActual(observer: Observer<T>) {
            // 1. 创建emitter
            val emitterCreate = EmitterCreate(observer)
            // 2. 订阅回调
            observer.onSubscribe()
            // 3. 调用emitter的subscribe,进行事件发射
            source.subscribe(emitterCreate)
    
        }
    
    }
    
    /**
     * emitter接口实现类,用于发射事件,释放事件
     */
    public class EmitterCreate<T>(
        private val observer: Observer<T>
    ) : AtomicReference<Disposable>(), Emitter<T>, Disposable {
    
        override fun onNext(t: T) {
            if (!isDisposed) {
                observer.onNext(t)
            }
        }
    
        override fun onError() {
            if (!isDisposed) {
                try {
                    observer.onError()
                } finally {
                    dispose()
                }
            }
        }
    
        override fun dispose() {
            DisposableHelper.dispose(this)
        }
    
        override fun isDisposed(): Boolean {
            return DisposableHelper.isDisposed(this)
        }
    
    }

    Map等操作符原理

    package com.example.rxjavapractice.my.rxjava
    
    import io.reactivex.functions.Function
    
    /**
     * 给Observable新增一个操作符map,返回一个ObservableMap对象
     */
    fun <T,U> Observable<T>.map(mapper: Function<T, U>): Observable<U> {
        return ObservableMap(this, mapper)
    }
    
    /**
     * Observable.map返回ObservableMap
     * 订阅:
     *      ObservableMap.subscribeActual ->
     *      ObservableCreate.subscribeActual ->
     *      ObservableOnSubscribe.subscribe(EmitterCreate)
     * 利用subscribeActual,将observer从下游一直到上游,一层层包裹起来,越下游的observer在越内部
     */
    public class ObservableMap<T, U>(
        private val source: Observable<T>,
        private val function: Function<T, U>
    ) : Observable<U>(){
    
        override fun subscribeActual(observer: Observer<U>) {
            source.subscribe(MapObserver(observer, function))
        }
    }
    
    /**
     * ObservableMap对应的MapObserver
     * 形参:
     *      mapper:将T->U
     *      actual:责任链上的下一个observer
     * 事件发射过程:
     *      EmitterCreate.onNext(T) ->
     *      MapObserver.onNext(T) ->
     *      Observer.onNext(U)
     */
    class MapObserver<T, U>(
        private val actual: Observer<U>,
        private val mapper: Function<T, U>
    ): Observer<T> {
        override fun onSubscribe() {}
    
        /**
         * 让emitter调用的onNext,输入为T,经mapper.apply后,变成U类型
         */
        override fun onNext(t: T) {
            val u = mapper.apply(t)
            actual.onNext(u)
        }
    
        override fun onError() {}
    
        override fun onComplete() {}
    }

    线程切换原理

    推荐博客:https://jsonchao.github.io/2019/01/01/Android%E4%B8%BB%E6%B5%81%E4%B8%89%E6%96%B9%E5%BA%93%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%EF%BC%88%E4%BA%94%E3%80%81%E6%B7%B1%E5%85%A5%E7%90%86%E8%A7%A3RxJava%E6%BA%90%E7%A0%81%EF%BC%89/

     

     

  • 相关阅读:
    LCM与GCD算法
    LCS,LIS,LICS算法
    高精度算法(C/C++)
    SystemTap
    VMware15下解决Ubuntu18.04没有网络连接问题
    Git ssh-key 配置问题
    Ubuntu18.04更换国内源
    sql 错误日志存储路径设置
    资源
    System.Data.DataTable 基本方法
  • 原文地址:https://www.cnblogs.com/zsben991126/p/14353821.html
Copyright © 2011-2022 走看看