zoukankan      html  css  js  c++  java
  • 响应式编程库RxJava初探

    引子

    在读 Hystrix 源码时,发现一些奇特的写法。稍作搜索,知道使用了最新流行的响应式编程库RxJava。那么响应式编程究竟是怎样的呢? 本文对响应式编程及 RxJava 库作一个初步的探索。

    在学习新的编程模型时,我喜欢将其与原来的编程模型联系起来。因为新的编程模型往往是对原来编程模型的承袭和组合。响应式编程的两个基本要素是:

    • 基于观察者模式的事件驱动机制。
    • 函数式编程:通过装饰与组合,让响应式编程的处理更流畅灵活;

    函数式编程,在之前的文章 “完全”函数式编程”“Java8函数式编程探秘”“精练代码:一次Java函数式编程的重构之旅” 等有较多探索,观察者模式在 “设计模式之观察者模式:实现配置更新实时推送” 有讲述过。我们将在这两者的基础上探索响应式编程。

    基础

    初次接触 RxJava ,很容易被一连串的 Observer, Observable, Disposable, subscribeOn, onSubscribe, onNext, onError, onComplete 等绕晕。不过软件里面无新鲜事。大多无非是用一种新的方式来组织逻辑罢了。基于观察者模式的事件驱动也不例外。我们只要梳理清楚脉络,就可以容易地理解。观察者模式有三个基本参与者:

    • 被观察者:Observable ;
    • 发射装置:Emitter;
    • 观察者: Observer。

    基本流程是:被观察者 Observable 装备发射装置 Emitter,发射消息,创建事件;观察者 Observer 监听到事件,接收到被观察者发射的消息,调用对应的函数 onNext, onError 和 onComplete 进行处理。onError 和 OnComplete 只能有一个被触发。

    不妨写个基本 Demo 来模拟下基本流程。为了更好滴理解,我把三者都区分开了。

    Demo

    首先定义观察者 MyObserver,继承抽象类 DefaultObserver ,这样实现成本最小。

    
    package zzz.study.reactor;
    
    import com.alibaba.fastjson.JSON;
    import io.reactivex.observers.DefaultObserver;
    
    /**
     * @Description 观察者定义
     * @Date 2021/1/23 4:13 下午
     * @Created by qinshu
     */
    public class MyObserver extends DefaultObserver {
    
        @Override
        public void onStart() {
            System.out.println("MyObserver: Start");
        }
    
        @Override
        public void onNext(Object o) {
            System.out.println("Observed: " + JSON.toJSONString(o));
        }
    
        @Override
        public void onError(Throwable e) {
            System.out.println("Observed: " + e.getMessage());
        }
    
        @Override
        public void onComplete() {
            System.out.println("MyObserver: Complete");
        }
    }
    

    接着,定义发射装置(发射消息) MyEmitter:

    package zzz.study.reactor;
    
    import io.reactivex.ObservableEmitter;
    import io.reactivex.ObservableOnSubscribe;
    
    import java.util.Random;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @Description 发射装置
     * @Date 2021/1/24 7:04 上午
     * @Created by qinshu
     */
    public class MyEmitter implements ObservableOnSubscribe {
    
        Random random = new Random(System.currentTimeMillis());
    
        @Override
        public void subscribe(ObservableEmitter emitter) throws Exception {
            TimeUnit.SECONDS.sleep(1);
            emitter.onNext("next");
            if (random.nextInt(3) == 0) {
                emitter.onError(new RuntimeException("A RuntimeException"));
            }
            else {
                emitter.onComplete();
            }
        }
    }
    

    最后,创建被观察者,并串起流程:

    package zzz.study.reactor;
    
    import io.reactivex.Observable;
    import io.reactivex.ObservableOnSubscribe;
    import io.reactivex.Observer;
    
    /**
     * @Description RxJava基本Demo
     * @Date 2021/1/23 12:28 下午
     * @Created by qinshu
     */
    public class RxJavaBasic {
    
        public static void main(String[] args) {
            for (int i=0; i<5; i++) {
                ObservableOnSubscribe observableOnSubscribe = new MyEmitter();
                Observable observable = Observable.create(observableOnSubscribe);
                Observer observer = new MyObserver();
                observable.subscribe(observer);
            }
        }
    }
    

    运行,可得结果:

    MyObserver: Start
    Observed: "next"
    MyObserver: Complete
    MyObserver: Start
    Observed: "next"
    MyObserver: Complete
    MyObserver: Start
    Observed: "next"
    Observed: A RuntimeException
    MyObserver: Start
    Observed: "next"
    MyObserver: Complete
    MyObserver: Start
    Observed: "next"
    MyObserver: Complete
    

    讲解

    如何理解上述流程及结果呢?最好的办法就是单步调试。经过单步调试,可以知道整个过程如下:

    步骤1: 整个过程由这一行触发 observable.subscribe(observer); ,会去调用 Observable.subscribeActual 方法,分派给具体实现类 ObservableCreate.subscribeActual ;单步调试的好处就是能确定具体实现者;

    步骤2: ObservableCreate.subscribeActual 所做的事情,调用 observer.onSubscribe ( MyObserver.onStart 方法 ),然后转发给 MyEmitter.subscribe 来发射消息。

        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            observer.onSubscribe(parent);
    
            try {
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    

    步骤3:MyEmitter 执行 onNext ,分派给具体实现类 CreateEmitter.onNext ,进而调用 observer.onNext 方法;
    步骤4:MyEmitter 执行 onError ,分派给具体实现类 CreateEmitter.onError ,进而 调用 observer.onError 方法;如果 MyEmitter 发射 onComplete ,那么就会分派给具体实现类 CreateEmitter.onComplete ,进而调用 observer.onComplete 方法。注意,onError 和 onComplete 两者只可能执行一个。

    基本流程就是这样。

    引申

    Disposable

    除了订阅自定义 Emitter 来发射消息,类 Observable 还提供了各种工具方法,更便捷滴做订阅和推送。比如:

    public static void testDirectSubscribe() {
        Observable.fromArray("I", "Have", "a", "dream").subscribe(new MyObserver());
    }
    

    会输出:

    MyObserver: Start
    Observed: "I"
    Observed: "Have"
    Observed: "a"
    Observed: "dream"
    MyObserver: Complete
    

    具体实现是: fromArray 方法会创建一个 Observable 的具体类 ObservableFromArray,而这个类的 subscribeActual 方法会创建一个 FromArrayDisposable 来处理。FromArrayDisposable 的 run 方法被调用,依次遍历所指定列表,调用 observer.onNext ,最后调用 observer.onComplete。具体源码如下:

    public final class ObservableFromArray<T> extends Observable<T> {
        final T[] array;
        public ObservableFromArray(T[] array) {
            this.array = array;
        }
    
        @Override
        public void subscribeActual(Observer<? super T> observer) {
            FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);
            observer.onSubscribe(d);
            if (d.fusionMode) {
                return;
            }
            d.run();
        }
    
        static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
            final Observer<? super T> downstream;
            final T[] array;
            int index;
            boolean fusionMode;
            volatile boolean disposed;
    
            FromArrayDisposable(Observer<? super T> actual, T[] array) {
                this.downstream = actual;
                this.array = array;
            }
    
           // other methods
    
            @Override
            public void dispose() {
                disposed = true;
            }
    
            @Override
            public boolean isDisposed() {
                return disposed;
            }
    
            void run() {
                T[] a = array;
                int n = a.length;
    
                for (int i = 0; i < n && !isDisposed(); i++) {
                    T value = a[i];
                    if (value == null) {
                        downstream.onError(new NullPointerException("The element at index " + i + " is null"));
                        return;
                    }
                    downstream.onNext(value);
                }
                if (!isDisposed()) {
                    downstream.onComplete();
                }
            }
        }
    }
    

    那么 Disposable 的意义何在呢 ? 我的理解是:它作为订阅完成的一个流程闭环。比如重复订阅同一个观察者,如下代码:

        public static void testDirectSubscribe() {
            Observer observer = new MyObserver();
            Observable.fromArray("I", "Have", "a", "dream").subscribe(observer);
            Observable.fromArray("changed").subscribe(observer);
        }
    

    会抛出异常:

    io.reactivex.exceptions.ProtocolViolationException: It is not allowed to subscribe with a(n) zzz.study.reactor.MyObserver multiple times. Please create a fresh instance of zzz.study.reactor.MyObserver and subscribe that to the target source instead.
        at io.reactivex.internal.util.EndConsumerHelper.reportDoubleSubscription(EndConsumerHelper.java:148)
        at io.reactivex.internal.util.EndConsumerHelper.validate(EndConsumerHelper.java:57)
        at io.reactivex.observers.DefaultObserver.onSubscribe(DefaultObserver.java:70)
        at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:34)
        at io.reactivex.Observable.subscribe(Observable.java:12284)
        at zzz.study.reactor.RxJavaBasic.testDirectSubscribe(RxJavaBasic.java:34)
        at zzz.study.reactor.RxJavaBasic.main(RxJavaBasic.java:17)
    

    这个异常是在调用 DefaultObserver.onSubscribe 抛出的:

        @Override
        public final void onSubscribe(@NonNull Disposable d) {
            if (EndConsumerHelper.validate(this.upstream, d, getClass())) {
                this.upstream = d;
                onStart();
            }
        }
    
       public static boolean validate(Disposable upstream, Disposable next, Class<?> observer) {
            ObjectHelper.requireNonNull(next, "next is null");
            if (upstream != null) {
                next.dispose();
                if (upstream != DisposableHelper.DISPOSED) {
                    reportDoubleSubscription(observer);
                }
                return false;
            }
            return true;
        }
    

    这就是说,如果同一个观察者,它的上一个 Disposable 订阅没有结束,那么再次订阅 Disposable 就会出错。怎么解决呢?可以在 MyObserver 的 onError 和 onComplete 添加 super.cancel 调用,可以结束上一次的订阅,再次订阅就不抛出异常了:

        @Override
        public void onError(Throwable e) {
            System.out.println("Observed: " + e.getMessage());
            super.cancel();
        }
    
        @Override
        public void onComplete() {
            System.out.println("MyObserver: Complete");
            super.cancel();
        }
    
       /**
         * Cancels the upstream's disposable.
         */
        protected final void cancel() {
            Disposable upstream = this.upstream;
            this.upstream = DisposableHelper.DISPOSED;
            upstream.dispose();
        }
    

    但是,即便这样,也无法发射我们新的订阅消息。这是因为上一次的 upstream 不为 null,本次的订阅就无法发射。

    我们没法覆写 DefaultObserver.onSubscribe 方法,因为该方法声明为 final 的,且 upstream 声明为 private ,也没有公共方法可以设置 upstream。这明确表明了设计者的意图:这是 Observer 订阅 Disposable 的前置检测约定,不可被破坏,否则后果自负。

    我们可以绕过 DefaultObserver , 不继承它,而是直接实现 Observer 接口:

    
    public static void testDirectSubscribe() {
        Observer observer = new RepeatedSubscribeMyObserver();
        Observable.fromArray("I", "Have", "a", "dream").subscribe(observer);
        Observable.fromArray("changed").subscribe(observer);
    }
    
    /**
     * @Description 可重复订阅的观察者
     * @Date 2021/1/24 10:11 上午
     * @Created by qinshu
     */
    public class RepeatedSubscribeMyObserver<T> implements Observer<T> {
    
        public Disposable upstream;
    
        @Override
        public void onSubscribe(@NonNull Disposable d){
            System.out.println(getName() + ": Start");
            this.upstream = d;
        }
    
        @Override
        public void onNext(T o) {
            System.out.println(getName() + ": " + JSON.toJSONString(o));
        }
    
        @Override
        public void onError(Throwable e) {
            System.out.println(getName() + ": " + e.getMessage());
            cancel();
        }
    
        @Override
        public void onComplete() {
            System.out.println(getName() + ": Complete");
            cancel();
        }
    
        public String getName() {
            return this.getClass().getSimpleName();
        }
    
        /**
         * Cancels the upstream's disposable.
         */
        protected final void cancel() {
            Disposable upstream = this.upstream;
            this.upstream = DisposableHelper.DISPOSED;
            upstream.dispose();
        }
    }
    

    这样就可以实现多次订阅同一个 Observer 了。运行结果:

    RepeatedSubscribeMyObserver: Start
    RepeatedSubscribeMyObserver: "I"
    RepeatedSubscribeMyObserver: "Have"
    RepeatedSubscribeMyObserver: "a"
    RepeatedSubscribeMyObserver: "dream"
    RepeatedSubscribeMyObserver: Complete
    RepeatedSubscribeMyObserver: Start
    RepeatedSubscribeMyObserver: "changed"
    RepeatedSubscribeMyObserver: Complete
    

    弄懂了 Observable.fromArray 的实现原理,就弄清楚了 Observable 中很多基本方法的基本套路。比如 just 方法有两个及以上参数时,其实是 fromArray 的包装,而 range 方法则是创建一个 RangeDisposable 来处理。

    Observable.just(1,2,3).subscribe(observer);
    Observable.range(1,4).subscribe(observer);
    

    装饰器

    RxJava 大量使用了装饰器模式。在 observable 包下有一个继承自 Observable 的抽象类 AbstractObservableWithUpstream,所有继承它的子类,都遵循一个相同的套路:一个 Observable 子类,以及一个相应的 Observer 装饰者。 比如 ObservableDelay 里面就相应有一个 DelayObserver ,实现观察者的延迟或周期性接收。学会举一反三、触类旁通。相应实现如下:

    public final class ObservableDelay<T> extends AbstractObservableWithUpstream<T, T> {
        final long delay;
        final TimeUnit unit;
        final Scheduler scheduler;
        final boolean delayError;
    
        public ObservableDelay(ObservableSource<T> source, long delay, TimeUnit unit, Scheduler scheduler, boolean delayError) {
            super(source);
            this.delay = delay;
            this.unit = unit;
            this.scheduler = scheduler;
            this.delayError = delayError;
        }
    
        @Override
        @SuppressWarnings("unchecked")
        public void subscribeActual(Observer<? super T> t) {
            Observer<T> observer;
            if (delayError) {
                observer = (Observer<T>)t;
            } else {
                observer = new SerializedObserver<T>(t);
            }
    
            Scheduler.Worker w = scheduler.createWorker();
    
            source.subscribe(new DelayObserver<T>(observer, delay, unit, w, delayError));
        }
    
        static final class DelayObserver<T> implements Observer<T>, Disposable {
            final Observer<? super T> downstream;
            final long delay;
            final TimeUnit unit;
            final Scheduler.Worker w;
            final boolean delayError;
    
            Disposable upstream;
    
            DelayObserver(Observer<? super T> actual, long delay, TimeUnit unit, Worker w, boolean delayError) {
                super();
                this.downstream = actual;
                this.delay = delay;
                this.unit = unit;
                this.w = w;
                this.delayError = delayError;
            }
    
            @Override
            public void onSubscribe(Disposable d) {
                if (DisposableHelper.validate(this.upstream, d)) {
                    this.upstream = d;
                    downstream.onSubscribe(this);
                }
            }
    
            @Override
            public void onNext(final T t) {
                w.schedule(new OnNext(t), delay, unit);
            }
    
            final class OnNext implements Runnable {
                private final T t;
    
                OnNext(T t) {
                    this.t = t;
                }
    
                @Override
                public void run() {
                    downstream.onNext(t);
                }
            }
        }
    }
    
    

    组合

    上文谈到了响应式编程的一大基本元素是函数式编程。函数式的优势是可以无限叠加组合,构建出灵活多变的函数和行为。这使得观察者的行为也可以定制得更加灵活。可以组合多个 Observable 的发射行为。

    合并

    简单的组合使用 merge 方法,构造一个 Observable 的列表,依次遍历合并后的每个 Observable 的发射信息:

    Iterable<? extends ObservableSource<? extends Integer>> observableSourceSet = Sets.newHashSet(
                    Observable.fromArray(3,4,5),
                    Observable.range(10,3)
            );
            Observable.merge(observableSourceSet).subscribe(observer);
    

    流式

    Observable 可以通过 Stream 进行组合,这里就是函数式编程的用武之地了。如下代码所示:

    Observable.range(1,10).filter(x -> x%2 ==0).subscribe(observer);
    

    注意到,这里使用到了装饰器模式。filter 方法会创建一个 ObservableFilter 对象,而在这个对象里,subscribeActual 方法会创建一个 FilterObserver 将传入的 observer 装饰起来。downstream 即是传入的 observer。

    
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Observable<T> filter(Predicate<? super T> predicate) {
        ObjectHelper.requireNonNull(predicate, "predicate is null");
        return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
    }
    
    public final class ObservableFilter<T> extends AbstractObservableWithUpstream<T, T> {
        final Predicate<? super T> predicate;
        public ObservableFilter(ObservableSource<T> source, Predicate<? super T> predicate) {
            super(source);
            this.predicate = predicate;
        }
    
        @Override
        public void subscribeActual(Observer<? super T> observer) {
            source.subscribe(new FilterObserver<T>(observer, predicate));  // FilterObserver 装饰了传入的自定义的 observer 
        }
    
        static final class FilterObserver<T> extends BasicFuseableObserver<T, T> {
            final Predicate<? super T> filter;
    
            FilterObserver(Observer<? super T> actual, Predicate<? super T> filter) {
                super(actual);
                this.filter = filter;
            }
    
            @Override
            public void onNext(T t) {     // 这里对传入的 Observer.onNext 做了个装饰,仅当条件成立时才调用
                if (sourceMode == NONE) {
                    boolean b;
                    try {
                        b = filter.test(t);
                    } catch (Throwable e) {
                        fail(e);
                        return;
                    }
                    if (b) {
                        downstream.onNext(t);  // downstream 即是我们传入的自定义的 Observer
                    }
                } else {
                    downstream.onNext(null);
                }
            }
    }
    

    正如 filter 对发射数据流进行过滤,map 或 flatMap 则对发射数据流进行映射变换,与 stream.map 或 stream.flatMap 的功能类似:

    Observable.range(1,10).map(x -> x*x).subscribe(observer);
    Observable.range(1,10).flatMap(x -> Observable.just(x*x)).subscribe(observer);
    

    map 方法将创建一个 ObservableMap 对象,在 subscribeActual 中用 MapObserver 将所传入的 observer 装饰起来;flatMap 将创建一个 ObservableFlatMap 对象,在 subscribeActual 中 MergeObserver 将传入的 observer 装饰起来。

    还可以使用 scan:对于生成的每个值,使用累加器 (x,y) -> x*y 生成新的值并发射。

    Observable.range(1, 10).scan(1, (x,y) -> x*y).subscribe(observer);
    

    最后再给个分组的示例:

    Observable.just(28,520,25,999).groupBy( i -> ( i > 100 ? "old": "new")).subscribe(new GroupedRepeatedSubscribeMyObserver());
    
    /**
     * @Description 可重复订阅的分组观察者
     * @Date 2021/1/24 10:11 上午
     * @Created by qinshu
     */
    public class GroupedRepeatedSubscribeMyObserver extends RepeatedSubscribeMyObserver<GroupedObservable> {
        @Override
        public void onNext(GroupedObservable o) {
            o.subscribe(new RepeatedSubscribeMyObserver() {
                @Override
                public void onNext(Object v) {
                    String info = String.format("GroupedRepeatedSubscribeMyObserver: [group=%s][value=%s]", o.getKey(), JSON.toJSONString(v));
                    System.out.println(info);
                }
            });
    
        }
    
    }
    

    groupBy 方法生成的是一个 GroupedObservable ,因此要订阅一个 Observer 的观察者实现。

    本文先写到这里。

    项目代码见工程: “ALLIN” 的包 zzz.study.reactor 下。需要引入 Maven 依赖:

    <dependency>
        <groupId>io.reactivex.rxjava2</groupId>
        <artifactId>rxjava</artifactId>
        <version>2.2.20</version>
    </dependency>
    

    小结

    本文讲解了响应式编程及 RxJava 库的最基本概念:Observable , Observer 及 Emitter, Disposable ,也讲到了如何组合 Observable 来构建更灵活的消息发射机制。这些基本构成了响应式编程的基本骨架流程。

    响应式编程的强大能力构建在事件驱动机制和函数式编程上,里面大量应用了装饰器模式。因此,熟悉这些基本编程思想,对掌握响应式编程模型亦大有裨益。

  • 相关阅读:
    CF763C Timofey and Remoduling
    CF762E Radio Stations
    CF762D Maximum Path
    CF763B Timofey and Rectangles
    URAL1696 Salary for Robots
    uva10884 Persephone
    LA4273 Post Offices
    SCU3037 Painting the Balls
    poj3375 Network Connection
    Golang zip压缩文件读写操作
  • 原文地址:https://www.cnblogs.com/lovesqcc/p/14320843.html
Copyright © 2011-2022 走看看