zoukankan      html  css  js  c++  java
  • RxJava2源码解析(二)

    title: RxJava2源码解析(二) categories:

    • 源码解析 tags:
    • 源码解析
    • rxJava2

    前言

    本篇主要解析RxJava的线程切换的原理实现

    subscribeOn

    首先, 我们先看下subscribeOn()方法, 老样子, 先上Demo

    Observable<Integer> observable =
                    Observable
                    .create(new ObservableOnSubscribe<Integer>() {
                        @Override
                        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                            emitter.onNext(123);
                            emitter.onComplete();
                        }
                    });
    
    observable
            .subscribeOn(Schedulers.io())
            .subscribe(getObserver());
    复制代码

    subscribeOn操作符源码里其实是返回了一个ObservableSubscribeOn对象, 而从上篇我们已经知道, 订阅的动作其实在每个ObservablesubscribeActual(observer)中执行, 所以我们直接去看ObservableSubscribeOn中的对应重载方法就行了.

    @Override
        public void subscribeActual(final Observer<? super T> s) {
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    
            s.onSubscribe(parent);
    
            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
        }
    复制代码
    final class SubscribeTask implements Runnable {
            private final SubscribeOnObserver<T> parent;
    
            SubscribeTask(SubscribeOnObserver<T> parent) {
                this.parent = parent;
            }
    
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }
    复制代码

    SubscribeTask是一个Runnable的实现类, 执行内容就是修饰后的Observer订阅上游的动作, 我们先看scheduler.scheduleDirect(runable)方法

    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
    
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
        DisposeTask task = new DisposeTask(decoratedRun, w);
    
        w.schedule(task, delay, unit);
    
        return task;
    }
    复制代码

    这里createWorker是个抽象方法, 我们需要找到对应的修饰类, 我们返回去看Schedulers.io(), IOIoScheduler的实例, 它的重载方法代码如下

    final AtomicReference<CachedWorkerPool> pool;
    public Worker createWorker() {
            return new EventLoopWorker(pool.get());
        }
    复制代码

    可以看到IO线程实际使用的是一个有线程缓存的线程调度器.它内部通过ScheduledExecutorService实例来尝试重用之前worker开始使用的实例, 由于本篇着重在流程实现原理, 所以略过细节处. 在EventLoopWorker中, 我们看下对应的重载方法

    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
                if (tasks.isDisposed()) {
                    // don't schedule, we are unsubscribed
                    return EmptyDisposable.INSTANCE;
                }
    
                return threadWorker.scheduleActual(action, delayTime, unit, tasks);
            }
    复制代码

    继续往下, 其实这个时候已经是在线程池目标线程执行相关的工作了. 再深入就是线程池的操作了, 所以这里我们不再赘述

    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
            Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
            ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    
            if (parent != null) {
                if (!parent.add(sr)) {
                    return sr;
                }
            }
    
            Future<?> f;
            try {
                if (delayTime <= 0) {
                    f = executor.submit((Callable<Object>)sr);
                } else {
                    f = executor.schedule((Callable<Object>)sr, delayTime, unit);
                }
                sr.setFuture(f);
            } catch (RejectedExecutionException ex) {
                if (parent != null) {
                    parent.remove(sr);
                }
                RxJavaPlugins.onError(ex);
            }
    
            return sr;
        }
    复制代码

    由此我们可以看出来, 每次每个subscribeOn操作符执行的时候, 其实在source.subscribe(parent);订阅动作就做了线程切换, 所以在多次调subscribeOn的时候, 就会一直切换线程, 直到离ObservableSource最近的subscribeOn线程切换生效.

    observeOn

    废话不说, 我们直接看ObservableObserveOn.subscribeActual(observer)

    protected void subscribeActual(Observer<? super T> observer) {
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
                Scheduler.Worker w = scheduler.createWorker();
    
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
    复制代码

    熟悉的配方, 当相同想成的时候, 直接订阅, 而当不同线程的时候, 可以看到我们获取目标切换线程对应的worker实例以及装饰对应的ObserverObserveOnOberver,后面的流程我们心知肚明, 就是Observer层层订阅上去, 然后我们看当碰到最上流的ObservableSource往下执行的时候, 做什么操作.具体我们看ObserveOnOberver代码, 我们这里着重看下onSubscribeonNext方法

    @Override
    public void onSubscribe(Disposable s) {
        if (DisposableHelper.validate(this.s, s)) {
            this.s = s;
            // 发送的数据是集合队列形式的时候
            if (s instanceof QueueDisposable) {
                @SuppressWarnings("unchecked")
                QueueDisposable<T> qd = (QueueDisposable<T>) s;
    
                int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
                //是同步模式的时候
                if (m == QueueDisposable.SYNC) {
                    sourceMode = m;
                    queue = qd;
                    done = true;
                    actual.onSubscribe(this);
                    // 线程调度
                    schedule();
                    return;
                }
                // 异步模式
                if (m == QueueDisposable.ASYNC) {
                    sourceMode = m;
                    queue = qd;
                    actual.onSubscribe(this);
                    return;
                }
            }
    
            queue = new SpscLinkedArrayQueue<T>(bufferSize);
    
            actual.onSubscribe(this);
        }
    }
    
    @Override
    public void onNext(T t) {
        // 是否已经调用到onComplete 或者 onError, 如果是, 则不再执行后面的onNext
        if (done) {
            return;
        }
        // 如果是非异步操作, 将数据添加到队列中
        if (sourceMode != QueueDisposable.ASYNC) {
            queue.offer(t);
        }
        // 线程调度
        schedule();
    }
    复制代码

    大概的注释都加在代码上了, 我们再补充看下看onSubscribe方法, 首先判断发送的数据是否属于QueueDisposable, 如果不是, 直接执行下游的onSubscribe,这里我卡了一下, 看不到他的线程切换是在哪里做, 后来往回看, 发现在我们执行ObservableSubscribeOn.subscribeActual(observer)的时候, onSubscribe()方法本身的确不是在切换后的线程内执行的. 但是, 当我们发送的是集合数据, 那么我们需要判断是哪种线程模式进行线程调度.

    我们来看具体的schedule()方法代码

    void schedule() {
                // 判断当前自增值是否为0, 原子性保证worker.schedule(this);不会在调用结束前被重复调用
                if (getAndIncrement() == 0) {
                    worker.schedule(this);
                }
            }
    复制代码

    这个时候就是在指定线程内run了, Disposable schedule(@NonNull Runnable run)传入的是个Runnable的实现类, 我们来找重载的run方法

    @Override
    public void run() {
        if (outputFused) {
            drainFused();
        } else {
            drainNormal();
        }
    }
    
    void drainNormal() {
        int missed = 1;
    
        final SimpleQueue<T> q = queue;
        final Observer<? super T> a = actual;
        // 无限循环
        for (;;) {
            // 判断是否被取消, 或者调用onError 或者调用onComplete则退出循环
            if (checkTerminated(done, q.isEmpty(), a)) {
                return;
            }
            // 无限循环
            for (;;) {
                boolean d = done;
                T v;
    
                try {
                    // 队列数据分发
                    v = q.poll();
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    s.dispose();
                    q.clear();
                    a.onError(ex);
                    worker.dispose();
                    return;
                }
                boolean empty = v == null;
                // 判断是否应该被终止
                if (checkTerminated(d, empty, a)) {
                    return;
                }
    
                if (empty) {
                    break;
                }
                a.onNext(v);
            }
            // 原子性保证worker.schedule(this)的调用
            missed = addAndGet(-missed);
            if (missed == 0) {
                break;
            }
        }
    }
    
    // 判断循环是否终止
    boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
        // 如果订阅已经被取消, 则清除队列, 终止
        if (cancelled) {
            queue.clear();
            return true;
        }
        // 如果调用过onError 或者 onComplete
        if (d) {
            Throwable e = error;
            // 默认false
            if (delayError) {
                // 等到队列为空的时候再调用onError或者onComplete
                if (empty) {
                    if (e != null) {
                        a.onError(e);
                    } else {
                        a.onComplete();
                    }
                    worker.dispose();
                    return true;
                }
            } else {
                // 如果有抛出异常, 走下游的onError
                // 线程任务停止
                if (e != null) {
                    queue.clear();
                    a.onError(e);
                    worker.dispose();
                    return true;
                }
                // 没有, 走下游的onComplete
                // 线程任务停止
                else if (empty) {
                    a.onComplete();
                    worker.dispose();
                    return true;
                }
            }
        }
        // 否则不结束
        return false;
    }
    复制代码

    由此我们可以得出结论, observeOn的操作符可以保证我们下流操作线程切换生效

    总结

    到这里, 我们线程切换的原理大体流程就基本分析完毕了, 可以看出subscribeOn操作符只对上游生效, 而且因为他是在订阅的时候进行线程切换, 而我们每个操作符中间都有订阅动作, 所以越接近我们的ObservableSource的订阅的subscribeOn越是最后生效的. 而observeOn生效在我们的onNext,onComplete, onError方法内, 所以每次的observeOn针对它的下游都可以生效.

  • 相关阅读:
    【Language】 TIOBE Programming Community Index for February 2013
    【diary】good health, good code
    【web】a little bug of cnblog
    【Git】git bush 常用命令
    【web】Baidu zone ,let the world know you
    【diary】help others ,help yourself ,coding is happiness
    【Git】Chinese messy code in widows git log
    【windows】add some font into computer
    SqlServer启动参数配置
    关于sqlserver中xml数据的操作
  • 原文地址:https://www.cnblogs.com/twodog/p/12137520.html
Copyright © 2011-2022 走看看