zoukankan      html  css  js  c++  java
  • RxJava简要分析

    一:RxJava执行流程:

    RxJava简单使用

    private final String tag = getClass().getSimpleName();
    //数据源,被观察对象
        Observable<String> obser = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d(tag,"emit 1");
                emitter.onNext("t1");
                Log.d(tag,"emit 2");
                emitter.onNext("t2");
                Log.d(tag,"emit 3");
                emitter.onNext("t3");
                Log.d(tag,"emit ");
                emitter.onComplete();
            }
        });
    
    
        private void observer_test(){
            Observer<String> dnObser = new Observer<String>() {//观察者,处理对应事件
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(tag,"onSubscribe");
                }
    
                @Override
                public void onNext(String s) {
                    Log.d(tag,"onNext "+s);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e(tag,"onError");
                }
    
                @Override
                public void onComplete() {
                    Log.d(tag,"onComplete");
                }
            };
            obser.subscribe(dnObser);
            });
        }
    

    从例子中看出RxJava主要组成:

    Observable:被观察者,被观察者本身
    ObservableOnSubscribe:通知观察者执行哪些行为
    Observer:观察者,通过实现对应方法做具体处理

    订阅过程处理:

      @Override
        public final void subscribe(Observer<? super T> observer) {
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
                observer = RxJavaPlugins.onSubscribe(this, observer);
    
                ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
        //开始调用ObservableOnSubscribe subscribe
                subscribeActual(observer);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // can't call onError because no way to know if a Disposable has been set or not
                // can't call onSubscribe because the call might have set a Subscription already
                RxJavaPlugins.onError(e);
    
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
        }
    

    protected abstract void subscribeActual(Observer<? super T> observer);

    查看subscribe方法为抽象方法,具体实现为ObservableCreate,从Observabel的create方法可以知道

    @CheckReturnValue
        @SchedulerSupport(SchedulerSupport.NONE)
        public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            ObjectHelper.requireNonNull(source, "source is null");
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
    

     查看Observable内的源码

    public final class ObservableCreate<T> extends Observable<T> {
        final ObservableOnSubscribe<T> source;
    
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            observer.onSubscribe(parent);
    
            try {//这里调用ObservableOnSubscribe 的subscribe方法,开始执行事件流程
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    ......
    }
    

    二:数据转换

    收到Observable的消息之前我们有可能会对数据流进行处理,例如map()、flatMap()、fllter()等方法,
    这里使用了map()方法,它接收了observeable的数据并将通过该方法将数据进行转换后的新数据发出去,即做了中间转化

     public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
            ObjectHelper.requireNonNull(mapper, "mapper is null");
            return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
        }
    

     rxjava2使用了Function接口提供转换功能,

    public interface Function<T, R> {
       //将T类型数据转化为R处理
        @NonNull
        R apply(@NonNull T t) throws Exception;
    }
    

    具体操作交给ObservableMap内部类MapObserver处理

    public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
        final Function<? super T, ? extends U> function;
    
        public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
            super(source);
            this.function = function;
        }
    
        @Override
        public void subscribeActual(Observer<? super U> t) {
            source.subscribe(new MapObserver<T, U>(t, function));
        }
    
    
        static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
            final Function<? super T, ? extends U> mapper;
    
            MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
                super(actual);
                this.mapper = mapper;
            }
    
            @Override
            public void onNext(T t) {
               .......
    
                U v;
                try {
    //调用Function apply处理
                    v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
                } catch (Throwable ex) {
                    fail(ex);
                    return;
                }//将转换后的类型再传递给原Obsever
                actual.onNext(v);
            }
    ......
        }
    }
    

    MapObserver实现Observer,持有传入的Observer,通过Function的mapper.apply(t)进行转换后再传递给原observer onNext()


    三:任务调度(scheduler)

    通过使用subscribeOn()、observeOn()方法传入对应的Scheduler去指定每个操作应该运行在何种线程之中

    Observable.create(...)
        ...
        .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
        ...
        .subscribeOn(Schedulers.newThread())
        ...
        .subscribeOn(Schedulers.computation())
        ...
            .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
            .subscribe(...)

     @CheckReturnValue
        @SchedulerSupport(SchedulerSupport.CUSTOM)
        public final Observable<T> subscribeOn(Scheduler scheduler) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
        }
    

    创建了一个新的Observable,并为新的Observable创建了新的计划表ObservableSubscribeOn对象,新的计划表保存了原始Observable对象和调度器scheduler

    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;
    
        public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
            super(source);
            this.scheduler = scheduler;
        }
    
        @Override
        public void subscribeActual(final Observer<? super T> s) {
    
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    
            s.onSubscribe(parent);
    //调用了Scheduler的shedule方法,创建Runable内部执行原obseverable sbscribe
            parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
                @Override
                public void run() {
                    source.subscribe(parent);
                }
            }));
        }
    ...
    }
    

    以IOScheduler为例

    Scheduler schdule

    @NonNull
        public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
            final Worker w = createWorker();
    
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    //调用Work 切换执行线程
            w.schedule(new Runnable() {
                @Override
                public void run() {
                    try {
                        decoratedRun.run();
                    } finally {
                        w.dispose();
                    }
                }
            }, delay, unit);
    
            return w;
        }
    
        @NonNull
        @Override
        public Worker createWorker() {
            return new EventLoopWorker(pool.get());
        }
    
        public int size() {
            return pool.get().allWorkers.size();
        }
    
        static final class EventLoopWorker extends Scheduler.Worker {
            private final CompositeDisposable tasks;
            private final CachedWorkerPool pool;
            private final ThreadWorker threadWorker;
    
            final AtomicBoolean once = new AtomicBoolean();
    
            EventLoopWorker(CachedWorkerPool pool) {
                this.pool = pool;
                this.tasks = new CompositeDisposable();
                this.threadWorker = pool.get();
            }
    。。。。
    
            @NonNull
            @Override
            public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
                if (tasks.isDisposed()) {
                    // don't schedule, we are unsubscribed
                    return EmptyDisposable.INSTANCE;
                }
    
                return threadWorker.scheduleActual(action, delayTime, unit, tasks);
            }
        }
    
        static final class ThreadWorker extends NewThreadWorker {
            private long expirationTime;
    
            ThreadWorker(ThreadFactory threadFactory) {
                super(threadFactory);
                this.expirationTime = 0L;
            }
    
           。。。
        }
    

     我们从缓存池里拿到需要的worker并作了一层封装成为EventLoopWorker:最后调用NewThreadWorker 的scheduleActual

    NewThreadWorker实现:

    public class NewThreadWorker extends Scheduler.Worker implements Disposable {
        private final ScheduledExecutorService executor;//线程执行器
    ...
    
        public NewThreadWorker(ThreadFactory threadFactory) {
            executor = SchedulerPoolFactory.create(threadFactory);
        }
    ......
    
       //通过Execotor来执行上面传递过来的Runable对象,达到在不同类型线程来执行调用Observer方法
        public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
            Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
            try {
                Future<?> f;
                if (delayTime <= 0) {
                    f = executor.submit(decoratedRun);//提交到线程池执行
                } else {
                    f = executor.schedule(decoratedRun, delayTime, unit);
                }
                return Disposables.fromFuture(f);
            } catch (RejectedExecutionException ex) {
                RxJavaPlugins.onError(ex);
                return EmptyDisposable.INSTANCE;
            }
        }
    

    再看看observeOn

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            ObjectHelper.verifyPositive(bufferSize, "bufferSize");
            return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
        }
    
    public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;
        final boolean delayError;
        final int bufferSize;
        public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
            super(source);
            this.scheduler = scheduler;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
                Scheduler.Worker w = scheduler.createWorker();
    
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
    
        static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
        implements Observer<T>, Runnable {
    
            private static final long serialVersionUID = 6576896619930983584L;
            final Observer<? super T> actual;
            final Scheduler.Worker worker;
            final boolean delayError;
            final int bufferSize;
    
            SimpleQueue<T> queue;
    
            Disposable s;
    
            Throwable error;
            volatile boolean done;
    
            volatile boolean cancelled;
    
            int sourceMode;
    
            boolean outputFused;
    
            ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
                this.actual = actual;
                this.worker = worker;
                this.delayError = delayError;
                this.bufferSize = bufferSize;
            }
    
            @Override
            public void onSubscribe(Disposable s) {
                ...
    
                    actual.onSubscribe(this);
                }
            }
    
            @Override
            public void onNext(T t) {
                if (done) {
                    return;
                }
    
                if (sourceMode != QueueDisposable.ASYNC) {
                    queue.offer(t);
                }
                schedule();
            }
    
            @Override
            public void onError(Throwable t) {
                if (done) {
                    RxJavaPlugins.onError(t);
                    return;
                }
                error = t;
                done = true;
                schedule();
            }
    
            @Override
            public void onComplete() {
                if (done) {
                    return;
                }
                done = true;
                schedule();
            }
        ...
            void schedule() {//任务调度,交给线程池回调Runable
                if (getAndIncrement() == 0) {
                    worker.schedule(this);
                }
            }
        @Override
            public void run() {//回调处理,代理调用原Observer方法
                if (outputFused) {
                    drainFused();
                } else {
                    drainNormal();
                }
            }
            
       void drainNormal() {
                int missed = 1;

                final SimpleQueue<T> q = queue;
                final Observer<? super T> a = actual;
    .......

                        a.onNext(v);
                    ......
                }
            }        ... } }

    这里通过ObservableObserveOn代理,实现Observer observeOn线程切换处理

    未完待续。。。

  • 相关阅读:
    PAT 乙级真题 1013.组个最小数
    PAT 乙级真题 1012.D进制的A+B
    PAT 乙级真题 1011.个位数统计
    PAT 乙级真题 1010.月饼
    PAT 乙级真题 1009.1019.数字黑洞
    PAT 乙级真题 1008.锤子剪刀布
    PAT 乙级真题 1007.A除以B
    PAT 乙级真题 1006.1016.部分A+B
    C++自定义sort函数
    VS2017如何使用scanf函数
  • 原文地址:https://www.cnblogs.com/happyxiaoyu02/p/6150735.html
Copyright © 2011-2022 走看看