zoukankan      html  css  js  c++  java
  • Rxjava2源码解析

    1:用法:

     Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    //Log.d(TAG, "ObservableEmitter");
                    //Log.d(TAG, "Observable thread is" + Thread.currentThread().getName());
                    emitter.onNext(12);
                    emitter.onNext(13);
                    emitter.onNext(14);
                    emitter.onNext(15);
                    emitter.onComplete();
                }
            });
            Observer<Integer> observer = new Observer<Integer>() {
                private int i;
                private Disposable mDisposable;
    
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "onSubscribe");
                    mDisposable = d;
                }
    
                @Override
                public void onNext(Integer integer) {
                    /*i++;
                    if(i == 3){
                        mDisposable.dispose();
                    }*/
                    Log.d(TAG, "onNext" + integer);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "onError");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            };
    
            //observable.subscribe(observer);
            observable.doOnSubscribe(disposable -> {
                        Log.d(TAG, "doOnSubscribe");
                    }
            ).doOnComplete(() -> {
                Log.d(TAG, "doOnComplete");
            }).doOnNext((C) -> {
                Log.d(TAG, "doNext" + C);
            }).subscribe(observer);

    2:Observable

    首先看Observable:

    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    //Log.d(TAG, "ObservableEmitter");
                    //Log.d(TAG, "Observable thread is" + Thread.currentThread().getName());
                    emitter.onNext(12);
                    emitter.onNext(13);
                    emitter.onNext(14);
                    emitter.onNext(15);
                    emitter.onComplete();
                }
            });

    看一下create:

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            ObjectHelper.requireNonNull(source, "source is null");
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }

    参数ObservableOnSubscribe是一个接口,里面只有一个函数subscribe:

    public interface ObservableOnSubscribe<T> {
    
        /**
         * Called for each Observer that subscribes.
         * @param e the safe emitter instance, never null
         * @throws Exception on error
         */
        void subscribe(ObservableEmitter<T> e) throws Exception;
    }

    create需要返回的是一个Observable:

    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));

    通过在这里可知反回的是一个Observable的继承类ObservableCreate:

    public final class ObservableCreate<T> extends Observable<T> {
        final ObservableOnSubscribe<T> source;
    
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
    。。。。。。。。。。

    总结以上的几个类,可以归纳一下:

    通过Observable.create返回一个Observable,具体是返回ObservableCreate,该类继承Observable,同时该类持有ObservableOnSubscribe,而ObservableOnSubscribe是一个接口,具体的实现是在:

     Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    //Log.d(TAG, "ObservableEmitter");
                    //Log.d(TAG, "Observable thread is" + Thread.currentThread().getName());
                    emitter.onNext(12);
                    emitter.onNext(13);
                    emitter.onNext(14);
                    emitter.onNext(15);
                    emitter.onComplete();
                }
            });

    Observable的解析暂时先到这里,我们先看如何和observer关联起来:

    observable.subscribe(observer);
            observable.doOnSubscribe(disposable -> {
                        Log.d(TAG, "doOnSubscribe");
                    }
            ).doOnComplete(() -> {
                Log.d(TAG, "doOnComplete");
            }).doOnNext((C) -> {
                Log.d(TAG, "doNext" + C);
            }).subscribe(observer);

    进入subscribe:

     @SchedulerSupport(SchedulerSupport.NONE)
        @Override
        public final void subscribe(Observer<? super T> observer) {
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
                observer = RxJavaPlugins.onSubscribe(this, observer);
    
                ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
    
                subscribeActual(observer);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // can't call onError because no way to know if a Disposable has been set or not
                // can't call onSubscribe because the call might have set a Subscription already
                RxJavaPlugins.onError(e);
    
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
        }

    可以看到这是Observable里面的一个方法,前面我们说到,create生成的是ObservableCreate,而该类继承Observable,所以我们现在就是在ObservableCreate的subscribe方法里面,看参数,传进来的是一个observer,observer也是一个interface,具体实现就是应用层的:

    Observer<Integer> observer = new Observer<Integer>() {
                private int i;
                private Disposable mDisposable;
    
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "onSubscribe");
                    mDisposable = d;
                }
    
                @Override
                public void onNext(Integer integer) {
                    /*i++;
                    if(i == 3){
                        mDisposable.dispose();
                    }*/
                    Log.d(TAG, "onNext" + integer);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "onError");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            };

    到这里再来总结一下,通过subscribe,就是ObservableCreate这个类(该类继承Observable,同时该类持有ObservableOnSubscribe,而ObservableOnSubscribe是一个接口)执行subscribe,传参为observer,是一个实现为用户层定义的接口。接下来就是具体看一下subscribe这个函数是如何走的:

    @SchedulerSupport(SchedulerSupport.NONE)
        @Override
        public final void subscribe(Observer<? super T> observer) {
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
                observer = RxJavaPlugins.onSubscribe(this, observer);
    
                ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
    
                subscribeActual(observer);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // can't call onError because no way to know if a Disposable has been set or not
                // can't call onSubscribe because the call might have set a Subscription already
                RxJavaPlugins.onError(e);
    
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
        }

    首先看:

    subscribeActual(observer);

    点进这个函数看到是一个Observable里面的一个abstract函数,那实现是在哪里呢,前面一直提到的,我们现在其实是处于ObservableCreate这个类,进入该类找到subscribeActual这个函数:

     @Override
        protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            observer.onSubscribe(parent);
    
            try {
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }

    先看这句:

    source.subscribe(parent);

    先看看source是怎么来的:

     final ObservableOnSubscribe<T> source;
    
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
     public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            ObjectHelper.requireNonNull(source, "source is null");
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    //Log.d(TAG, "ObservableEmitter");
                    //Log.d(TAG, "Observable thread is" + Thread.currentThread().getName());
                    emitter.onNext(12);
                    emitter.onNext(13);
                    emitter.onNext(14);
                    emitter.onNext(15);
                    emitter.onComplete();
                }
            });

    一层层往上追溯可以看到sorce其实就是用户层实现的那个接口:

    public interface ObservableOnSubscribe<T> {
    
        /**
         * Called for each Observer that subscribes.
         * @param e the safe emitter instance, never null
         * @throws Exception on error
         */
        void subscribe(ObservableEmitter<T> e) throws Exception;
    }

    会到:

     source.subscribe(parent);

    这里的parent是CreateEmitter,先看subscribe:

     public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    //Log.d(TAG, "ObservableEmitter");
                    //Log.d(TAG, "Observable thread is" + Thread.currentThread().getName());
                    emitter.onNext(12);
                    emitter.onNext(13);
                    emitter.onNext(14);
                    emitter.onNext(15);
                    emitter.onComplete();
                }

    前面看到subscribe的参数是parent:

      CreateEmitter<T> parent = new CreateEmitter<T>(observer);

    这里定义了一个CreateEmitter,持有observer:

     implements ObservableEmitter<T>, Disposable {
    
    
            private static final long serialVersionUID = -3434801548987643227L;
    
            final Observer<? super T> observer;
    
            CreateEmitter(Observer<? super T> observer) {
                this.observer = observer;
            }
    
            @Override
            public void onNext(T t) {
                if (t == null) {
                    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                    return;
                }
                if (!isDisposed()) {
                    observer.onNext(t);
                }
            }
    
            @Override
            public void onError(Throwable t) {
                if (t == null) {
                    t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
                }
                if (!isDisposed()) {
                    try {
                        observer.onError(t);
                    } finally {
                        dispose();
                    }
                } else {
                    RxJavaPlugins.onError(t);
                }
            }
    
            @Override
            public void onComplete() {
                if (!isDisposed()) {
                    try {
                        observer.onComplete();
                    } finally {
                        dispose();
                    }
                }
            }

    我们来看一下onNext是如何执行的,当执行到:

    emitter.onNext(12);

    的时候,就到了:

     @Override
            public void onNext(T t) {
                if (t == null) {
                    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                    return;
                }
                if (!isDisposed()) {
                    observer.onNext(t);
                }
            }

    看这句:

    observer.onNext(t);

    我们前面讲到了,这里的observer,就是用户层自定义的:

     Observer<Integer> observer = new Observer<Integer>() {
                private int i;
                private Disposable mDisposable;
    
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "onSubscribe");
                    mDisposable = d;
                }
    
                @Override
                public void onNext(Integer integer) {
                    /*i++;
                    if(i == 3){
                        mDisposable.dispose();
                    }*/
                    Log.d(TAG, "onNext" + integer);
                }

    到这里,Observable里面的onNext和Observer的onNext就联系起来了,Observavble发送一个,Observer执行一个。onError和onComplete同理。

  • 相关阅读:
    程序员面试金典--最大和子矩阵
    hihocoder-[Offer收割]编程练习赛25
    牛客网--今日头条2017后端工程师实习生笔试题
    牛客网-网易2017内推笔试编程题合集(二)
    hihocoder-1565-大富翁
    引用传递和值传递
    SSH详解
    Oracle中的游标
    Oracle触发器-变异表触发器不能访问本表
    Oracle中 in、exists、not in,not exists的比较
  • 原文地址:https://www.cnblogs.com/wnpp/p/11875545.html
Copyright © 2011-2022 走看看