zoukankan      html  css  js  c++  java
  • RXJava2响应式编程框架设计<四>---高阶泛型、手写RxJava2核心实现

    在之前三篇中已经对于RxJava2的各种使用及原理有了一定的了解了,接下来则通过手写实现的方式加强对于它的原理的掌握。

    <? extend T>和<? super T>阐述:

    从是否可读可写角度说明:

    在正式手写RxJava代码之前,需要先对其泛型知识做一个巩固,因为RxJava的源码中充斥着大量的泛型,先来贴一个源码中的一处代码:

    其实核心就是要了解<? extend T>和<? super T>的使用场景,关于这块在当时学习Kolin时已经阐述得非常之清楚了,可以参考https://www.cnblogs.com/webor2006/p/11291744.html,总结一下就是<? extend T>是协变,只能读,而<? super T>是逆变,只能写。

    说到这俩个泛型的区别,我记得在面试时问到候选人这个问题,基本上9成的回答是:“<? extend T>代表只能传T的子类型;<? super T>代表只能传T的父类型。。”,这个回答等于把人当傻子,是个人看到都能说出这点,extend和super从编程语义上也能知道嘛,但是好的回答应该是从使用场景来道出这俩的不同以及怎么使用。

    由于之前已经对于这块详细说明过,这里再简单的过一下,温故而知新~~

    而如果修改一下:

    再来修改:

     

    当然可以强制转换倒是可以:

    接着咱们添加试一下:

    从函数定义角度说明:

    • <? extend T>,只能读不能写,很显然适合作为方法的返回值,比如:


    • <? super T>,只能写不能值,那适合当参数传递,如下:

    总结:

    • 如果你想从一个数据类型里获取数据,使用 ? extends 通配符(能取不能存)
    • 如果你想把对象写入一个数据结构里,使用 ? super 通配符(能存不能取)
    • 如果你既想存,又想取,那就别用通配符。

    手写RxJava2核心实现:

    接下来开启又兴奋又刺激的手撸环节:

    框架搭建:

    先来建一个文件夹,里面用来存放我们写手的RxJava框架代码:

    然后先来定义一个被观察者类:

    而看一下RxJava的源码,它其实是实现了一个接口的,如下:

    所以校仿一下:

    这里为啥是要<? super T>,很明显是要将它做为参数传递的,接下来咱们来定义观察者,也是个接口:

    package com.android.rxjavastudy;
    
    import com.android.rxjavastudy.rxjava.Disposable;
    
    public interface Observer<T> {
        void onSubscribe(Disposable d);
    
        void onNext(T t);
    
        void onError(Throwable e);
    
        void onComplete();
    }

    其中需要一个Disposable,它也是一个接口,定义如下:

    package com.android.rxjavastudy.rxjava;
    
    public interface Disposable {
    
        void dispose(boolean bool);
    
        boolean isDisposed();
    }

    Observable.create():

    接下来则来实现一下创建被观察者的逻辑,如下:

    先来看一下它的定义,然后咱们再校仿一下:

    咱们校仿着定义一下,其中勾子函数的代码一并忽略:

    先来看一下源代码的定义:

     

    校仿一下:

    这里又来参考下源代码的定义:

     

    所以接下来定义两个跟发射器相关的接口:

     

    好,接下来则再来定义这个类:

     

    此时咱们就可以调用咱们自己的编写的RxJava代码了,如下:

    这里要来明白一种思想就是:

    其实它里面是做了转换了的:

    也就是对于源来的功能通过装饰则对原功能进行了加强。

    订阅逻辑实现(subscribe):

    接下来则要来实现最核心的订阅方法啦,也就是:

     

    那它里面该怎么实现呢?依然先看源码:

    所以照抄一下:

    所以在ObservableCreate()中来具体实现之:

    注意在子类中就木有必要重写subscribe()方法了,它统一由父类Observable来进行管理,那具体在子类中怎么来实现呢?

    首先可以回调一下订阅成功的回调:

    这里则需要再定义一下,在里面定义成一个内部类,如下:

    public class ObservableCreate<T> extends Observable<T> {
    
        final ObservableOnSubscribe<T> source;
    
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<>(observer);
            observer.onSubscribe(parent);
        }
    
        static final class CreateEmitter<T> implements ObservableEmitter<T>, Disposable {
            final Observer<? super T> observer;
            private boolean bool;
    
            public CreateEmitter(Observer<? super T> observer) {
                this.observer = observer;
            }
    
            @Override
            public void dispose(boolean bool) {
                this.bool = bool;
            }
    
            @Override
            public boolean isDisposed() {
                return bool;
            }
    
            @Override
            public void onNext(T value) {
                if (!bool) {
                    observer.onNext(value); //observer === MapObserver
                }
            }
    
            @Override
            public void onError(Throwable throwable) {
                if (!bool) {
                    observer.onError(throwable);
                }
            }
    
            @Override
            public void onComplete() {
                if (!bool) {
                    observer.onComplete();
                }
            }
        }
    }

    此时则会回调咱们的这个应用回调接口了:

    好,接下来则应该来回调待观察者的回调方法,来对事件进行发送:

    怎么调用呢?比较简单:

    此时我们调用发射器的方法:

    最终就是直接通过了我们在这定义的发射器的方法:

    再转向到了我们的观察者的回调了:

    原来是这么个流程,手写了一遍确实顺间另深了对RxJava调用流程的理解了。

    运行看一下调用效果:

    目前就可以来看一下调用效果了:

    加入map操作符复杂实现:

    在上面的实现中是一个最为简单的版本,这次准备实现一个map操作符的效果,最终效果如下:

    别看只是简单的增加一个操作符,便是实际它内部实现也不是很简单的,下面开始:

    新的一个操作符里面肯定又得重新封装一下,那具体怎么封装呢?先看一下源码的实现:

     

    校仿一下,先来定义一个抽象类:

    然后再来定义ObservableMap:

     

    package com.android.rxjavastudy.rxjava;
    
    public class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
        final Function<? super T, ? extends U> function;
    
        public ObservableMap(ObserverbleSource<T> source, Function<? super T, ? extends U> function) {
            super(source);
            this.function = function;
        }
    
        @Override
        protected void subscribeActual(Observer<? super U> observer) {
            //TODO
        }
    
    }

    接下来则来生成这个包装类:

    Function定义如下:

    接下来则来实现ObservableMap这个被观察者的订阅方法,首先肯定也得先回调一下观察者的订阅成功的方法啦:

    但是这里改变写法了,先来看一下源码这块的实现,可以参考一下:

    其实回调观察者的订阅方法是放到了内部类的父类了,如下:

     

    所以,校仿一下:

    package com.android.rxjavastudy.rxjava;
    
    //观察者
    public abstract class BasicFuseableObserver<T, R> implements Observer<T>, Disposable<R> {
        //观察者
        protected final Observer<? super R> actual;
    
        protected Disposable disposeble;
    
        public BasicFuseableObserver(Observer<? super R> actual) {
            this.actual = actual;
        }
    
        @Override
        public void onSubscribe(Disposable d) {
            this.disposeble = d;
            actual.onSubscribe(d);
        }
    
        @Override
        public void onError(Throwable e) {
            actual.onError(e);
        }
    
        @Override
        public void onComplete() {
            actual.onComplete();
        }
    
        @Override
        public void dispose(boolean bool) {
            disposeble.dispose(bool);
        }
    
        @Override
        public boolean isDisposed() {
            return disposeble.isDisposed();
        }
    }

    注意此时的source是调用onCreate()操作符生成的被观察者,也就是ObservableCreate, 所以当调用:

    则会转到它:

    最后再又执行到了被观察者的事件回调方法:

    好了,整个map操作符的代码就写完了,还是挺绕的,接下来咱们应用一下看下效果:

    运行结果:

    线程切换:

    接下来则来完成一下线程的切换功能,我们之前已经清楚的知道了:

    那对于map转换操作符是在哪个线程呢?其实它跟被观察者的回调是在同一个线程的,下面来打印一下验证一下:

     

    运行:

     

    确实map操作符跟被观察者的事件回调都是由subscribeOn来决定的线程,下面则来实现一下线程切换。

    subscribeOn():

    首先先来在观察者中定义这个方法:

    此时咱们就给这个方法定义参数了,默认就让它在io线程里,先来调用一下它:

    如之前的套路一样,这是一个新的操作符,又得做个被观察者的装饰,所以新建一个:

     

    所以此时就可以实例化它了:

    接下来则来处理包装被观察者的实现逻辑了,首先还是得先告诉观察者已经订阅成功了,又得有个观察者的包装,老套路如下:

    package com.android.rxjavastudy.rxjava.observable;
    
    import com.android.rxjavastudy.rxjava.Disposable;
    import com.android.rxjavastudy.rxjava.Observer;
    import com.android.rxjavastudy.rxjava.ObserverbleSource;
    
    
    //指定被观察者在那个线程运行的被观察者
    public class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    
        public ObservableSubscribeOn(ObserverbleSource<T> source) {
            super(source);
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            //重新包装observer
            final SubscribeOnObserver parent = new SubscribeOnObserver(observer);
            //告诉下游观察者订阅成功
            observer.onSubscribe(parent);
        }
    
        static final class SubscribeOnObserver<T> implements Observer<T>, Disposable {
            final Observer<? super T> actual;
            Disposable<T> disposable;
    
            public SubscribeOnObserver(Observer<? super T> actual) {
                this.actual = actual;
            }
    
            @Override
            public void dispose(boolean bool) {
                this.disposable.dispose(bool);
            }
    
            @Override
            public boolean isDisposed() {
                return this.disposable.isDisposed();
            }
    
            @Override
            public void onSubscribe(Disposable d) {
                this.disposable = d;
            }
    
            @Override
            public void onNext(T t) {
                this.actual.onNext(t);
            }
    
            @Override
            public void onError(Throwable e) {
                this.actual.onError(e);
            }
    
            @Override
            public void onComplete() {
                this.actual.onComplete();
            }
        }
    }

    好,接下来就应该是回调被观察者的事件方法了,注意!!此时就需要进行线程的切换了,如下:

    于此关于这个方法的逻辑就实现完了,是不是也不是很难?此时我们的map肯定也是在子线程啦,为啥?

    好,接下来咱们来验证一下是否已经实现了此功能了:

    运行:

    但是!!此时next()回调的线程不对呀:

    这是因为我们还没有实现observeOn(),所以接下来再来实现它。

    observeOn():

    同样的套路,就不多说了,直接贴出实现:

     

    package com.android.rxjavastudy.rxjava.observable;
    
    import android.os.Handler;
    import android.os.Looper;
    
    import com.android.rxjavastudy.rxjava.Disposable;
    import com.android.rxjavastudy.rxjava.Observer;
    import com.android.rxjavastudy.rxjava.ObserverbleSource;
    
    //指定观察者在那个线程运行的被观察者
    public class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    
        public ObservableObserveOn(ObserverbleSource<T> source) {
            super(source);
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            ObserverOnObserver<T> parent = new ObserverOnObserver<>(observer);
            source.subscribe(parent);
        }
    
    
        //包装下游观察者,并关联onNext,.....  放入主线程中执行
        static final class ObserverOnObserver<T> implements Observer<T>, Disposable {
            final Observer<? super T> actual;
            Disposable disposeble;
            private Handler handler;
    
            ObserverOnObserver(Observer<? super T> actual) {
                this.actual = actual;
                handler = new Handler(Looper.getMainLooper());
            }
    
            @Override
            public void dispose(boolean bool) {
                this.disposeble.dispose(bool);
            }
    
            @Override
            public boolean isDisposed() {
                return this.disposeble.isDisposed();
            }
    
            @Override
            public void onSubscribe(Disposable d) {
                this.disposeble = d;
                actual.onSubscribe(d);
            }
    
            @Override
            public void onNext(final T t) {
                handler.post(new Runnable() {
                    @Override
                    public void run() {
                        actual.onNext(t);
                    }
                });
            }
    
            @Override
            public void onError(final Throwable e) {
                handler.post(new Runnable() {
                    @Override
                    public void run() {
                        actual.onError(e);
                    }
                });
            }
    
            @Override
            public void onComplete() {
                handler.post(new Runnable() {
                    @Override
                    public void run() {
                        actual.onComplete();
                    }
                });
            }
        }
    }

    嗯,比较简单,再来应用一下:

    运行:

    至此,手写RxJava的核心功能就到这,对它的原理了解得也更加透彻了。

  • 相关阅读:
    区别TPS QPS HPS RPS PV UV
    C/C++常用库及工具
    CentOS常用命令备忘
    PHP的学习--Traits新特性
    CentOS7创建本地YUM源的三种方法
    CentOS下iptables详解
    Linux备份压缩命令
    Nginx HTTPS功能部署实践
    Fuel 30 分钟快速安装OpenStack
    hadoop学习通过虚拟机安装hadoop完全分布式集群
  • 原文地址:https://www.cnblogs.com/webor2006/p/12356910.html
Copyright © 2011-2022 走看看