zoukankan      html  css  js  c++  java
  • RxJava 2.x 理解-3

    背压:Flowable / Subscriber 

      在RxJava 1.x 理解 中,没有讲到背压这个概念,是因为学习太落后了,RxJava都出2了,所以直接在2上学。  

      背压是下游控制上游流速的一种手段。在rxjava1.x的时代,上游会给下游set一个producer,下游通过producer向上游请求n个数据,这样上游就有记录下游请求了多少个数据,然后下游请求多少个上游就给多少个,这个就是背压。一般来讲,每个节点都有缓存,比如说缓存的大小是64,这个时候下游可以一次性向上游request 64个数据。rxjava1.x的有些操作符不支持背压,也就是说这些操作符不会给下游set一个producer,也就是上游根本不理会下游的请求,一直向下游丢数据,如果下游的缓存爆了,那么下游就会抛出MissingBackpressureException,也就是背压失效了。在rxjava2.x时代,上述的背压逻辑全部挪到Flowable里了,所以说Flowable支持背压。而2.x时代的Observable是没有背压的概念的,Observable如果来不及消费会死命的缓存直到OOM,所以rxjava2.x的官方文档里面有讲,大数据流用Flowable,小数据流用Observable。

    经典教程:

    同步线程_背压:

      /**
         * 首先第一个同步的代码, 为什么上游发送第一个事件后下游就抛出了MissingBackpressureException异常,
         * 这是因为下游没有调用request, 上游就认为下游没有处理事件的能力, 而这又是一个同步的订阅, 既然下游处理不了,
         * 那上游不可能一直等待吧, 如果是这样, 万一这两根水管工作在主线程里, 界面不就卡死了吗, 因此只能抛个异常来提醒我们.
         * 那如何解决这种情况呢, 很简单啦, 下游直接调用request(Long.MAX_VALUE)就行了,
         * 或者根据上游发送事件的数量来request就行了, 比如这里request(3)就可以了.
         */
        private void Backpressure_1() {
            /**
             * 首先是创建Flowable的时候增加了一个参数, 这个参数是用来选择背压,也就是出现上下游流速不均衡的时候应该怎么处理的办法,
             * 这里我们直接用BackpressureStrategy.ERROR这种方式, 这种方式会在出现上下游流速不均衡的时候直接抛出一个异常,
             * 这个异常就是著名的MissingBackpressureException. 其余的策略后面再来讲解.
             */
            Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
                @Override
                public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                    Log.d(TAG, "emit 1");
                    emitter.onNext(1);
                    Log.d(TAG, "emit 2");
                    emitter.onNext(2);
                    Log.d(TAG, "emit 3");
                    emitter.onNext(3);
                    Log.d(TAG, "emit complete");
                    emitter.onComplete();
                }
            }, BackpressureStrategy.ERROR);
    
            Subscriber<Integer> downstream = new Subscriber<Integer>() {
    
                @Override
                public void onSubscribe(Subscription s) {
                    Log.d(TAG, "onSubscribe");
                    //s.request(Long.MAX_VALUE);  // 注意这句代码
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "onNext: " + integer);
    
                }
    
                @Override
                public void onError(Throwable t) {
                    Log.w(TAG, "onError: ", t);
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            };
    
            upstream.subscribe(downstream);
        }

    结果:

    异步线程_背压:

    /**
         * 然后我们再来看看第二段代码, 为什么上下游没有工作在同一个线程时,
         * 上游却正确的发送了所有的事件呢?
         * 这是因为在Flowable里默认有一个大小为128的水缸, 当上下游工作在不同的线程中时, 上游就会先把事件发送到这个水缸中,
         * 因此, 下游虽然没有调用request, 但是上游在水缸中保存着这些事件, 只有当下游调用request时, 才从水缸里取出事件发给下游.
         */
        private void Backpressure_2() {
            Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
                @Override
                public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                    Log.d(TAG, "emit 1");
                    emitter.onNext(1);
                    Log.d(TAG, "emit 2");
                    emitter.onNext(2);
                    Log.d(TAG, "emit 3");
                    emitter.onNext(3);
                    Log.d(TAG, "emit complete");
                    emitter.onComplete();
                }
            }, BackpressureStrategy.ERROR);
    
            Subscriber<Integer> downstream = new Subscriber<Integer>() {
    
                @Override
                public void onSubscribe(Subscription s) {
                    Log.d(TAG, "onSubscribe");
                    //s.request(Long.MAX_VALUE);  // 注意这句代码
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "onNext: " + integer);
    
                }
    
                @Override
                public void onError(Throwable t) {
                    Log.w(TAG, "onError: ", t);
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            };
    
            upstream.
                    observeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(downstream);
        }

    背压策略:

    • BackpressureStrategy.ERROR : 默认缓存大小:128
    • BackpressureStrategy.BUFFER :缓存大小:无限
    • BackpressureStrategy.DROP :Drop就是直接把存不下的事件丢弃
    • BackpressureStrategy.LATEST :,Latest就是只保留最新的事件

    RxJava给我们提供了其他的方法,加入背压:

    • onBackpressureBuffer()
    • onBackpressureDrop()
    • onBackpressureLatest()
    
    
    private void Backpressure_5() {
    Flowable
    .create(new FlowableOnSubscribe<Integer>() {
    @Override
    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
    Log.d(TAG, "First requested = " + emitter.requested()); // 得知请求的量
    boolean flag;
    for (int i = 0; ; i++) {
    flag = false;
    while (emitter.requested() == 0) { // 判断是否发送数据
    if (!flag) {
    Log.d(TAG, "Oh no! I can't emit value!");
    flag = true;
    }
    }
    emitter.onNext(i);
    Log.d(TAG, "emit " + i + " , requested = " + emitter.requested());
    }
    }
    }, BackpressureStrategy.ERROR)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Subscriber<Integer>() {
    @Override
    public void onSubscribe(Subscription s) {
    Log.d(TAG, "onSubscribe");
    mSubscription = s;
    //mSubscription.request(10);
    }

    @Override
    public void onNext(Integer integer) {
    Log.d(TAG, "onNext:" + integer);
    }

    @Override
    public void onError(Throwable t) {
    Log.d(TAG, "onError");
    }

    @Override
    public void onComplete() {
    Log.d(TAG, "onComplete");
    }
    });

    }
    Subscription mSubscription;
    Backpressure_5();
            findViewById(R.id.button).setOnClickListener(new View.OnClickListener() {
                @Override
                public void onClick(View view) {
                    mSubscription.request(96);
                }
            });

    emitter.requested() == 0 就不创建新的数据。

    参考资料:

    这可能是最好的RxJava 2.x 教程(完结版)

  • 相关阅读:
    [BZOJ3105][CQOI2013]新Nim游戏
    [BZOJ4592][SHOI2015]脑洞治疗仪
    [BZOJ3551][ONTAK2010]Peaks加强版
    [BZOJ2229][ZJOI2011]最小割
    [BZOJ4519][CQOI2016]不同的最小割
    [BZOJ3532][SDOI2014]LIS
    [BZOJ2668][CQOI2012]交换棋子
    [BZOJ3504][CQOI2014]危桥
    Java抽象类
    Java方法覆盖重写
  • 原文地址:https://www.cnblogs.com/H-BolinBlog/p/8442537.html
Copyright © 2011-2022 走看看