zoukankan      html  css  js  c++  java
  • RxJava2|Flowable以及背压

    RxJava2 Flowable以及背压

    前述

    java-1.8

    maven-3

    rxjava-2.2.3

    背压

    背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。

    ----https://www.jianshu.com/p/0cd258eecf60

    Flowable的官方介绍:

    io.reactivex.Flowable: 0..N flows, supporting Reactive-Streams and backpressure

    0...N flows, 支持响应式流和背压(backpressure)

    只有在需要处理背压问题时,才需要使用Flowable。

    由于只有在上下游运行在不同的线程中,且上游发射数据的速度大于下游接收处理数据的速度时,才会产生背压问题;
    所以,如果能够确定:

    1. 上下游运行在同一个线程中,

    2. 上下游工作在不同的线程中,但是下游处理数据的速度不慢于上游发射数据的速度,

    3. 上下游工作在不同的线程中,但是数据流中只有一条数据
      则不会产生背压问题,就没有必要使用Flowable,以免影响性能。

      由于基于Flowable发射的数据流,以及对数据加工处理的各操作符都添加了背压支持,附加了额外的逻辑,其运行效率要比Observable慢得多。

    此段出处: https://www.jianshu.com/p/ff8167c1d191

    示例(Flowable简单使用)

    Flowable逻辑类 - HelloFlowable.java

    package yag;
    
    
    import io.reactivex.*;
    import org.reactivestreams.Subscriber;
    import org.reactivestreams.Subscription;
    
    
    public class HelloFlowable {
    
        public void helloFlowable(){
            // 基本上和Observable一样.
            Flowable
                    .create((FlowableOnSubscribe<Integer>) flowableEmitter -> {
                        Integer i = 0;
                        while ( i < 7) {
                            i++;
                            flowableEmitter.onNext(i);
                        }
                    }, BackpressureStrategy.ERROR/* 背压 */)
    
                    .subscribe(new Subscriber<Integer>() {
    
                        private Subscription subscription;
                        @Override
                        public void onSubscribe(Subscription subscription) {
                            subscription.request(Long.MAX_VALUE);
                            this.subscription = subscription;
                        }
    
                        @Override
                        public void onNext(Integer i) {
                            if (i == 5){
                                // 退出接收
                                subscription.cancel();
                            }else {
                                System.out.println("现在接收到的信号是: 第" + i + "信号");
                            }
                        }
    
                        @Override
                        public void onError(Throwable throwable) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    
    
        }
    }
    

    执行者 - Runner.java

    package yag;
    
    public class Runner {
    
        public static void main(String[] args){
            HelloFlowable helloFlowable = new HelloFlowable();
            helloFlowable.helloFlowable();
        }
    }
    

    执行结果

    现在接收到的信号是: 第1信号
    现在接收到的信号是: 第2信号
    现在接收到的信号是: 第3信号
    现在接收到的信号是: 第4信号
    
    Process finished with exit code 0
    

    小结

    request()

    subscription.request(Long.MAX_VALUE);
    

    这个方法就是用来向生产者申请可以消费的事件数量。这样我们便可以根据本身的消费能力进行消费事件。

    当调用了request()方法后,生产者便发送对应数量的事件供消费者消费。

    BackpressureStrategy.ERROR

    参考: https://www.jianshu.com/p/1f4867ce3c01

    这是一个背压操作策略. (BackpressureStrategy - 背压策略)

    ERROR策略下,如果缓存池溢出,就会立刻抛出MissingBackpressureException异常。即保证在异步操作中,事件累积不能超过128,超过即出现异常。消费者不能再接收事件了,但生产者并不会停止。

    其他

    • BUFFER - 所谓BUFFER就是把RxJava中默认的只能存128个事件的缓存池换成一个大的缓存池,支持存很多很多的数据。消费者通过request()即使传入一个很大的数字,生产者也会生产事件,并将处理不了的事件缓存。

      比较消耗内存, 除非是我们比较了解消费者的消费能力,能够把握具体情况,不会产生OOM。(OutOfMemoryError)

    • DROP - 当消费者处理不了事件,就丢弃。

    • LATEST - 消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。
      唯一的区别就是LATEST总能使消费者能够接收到生产者产生的最后一个事件

    个人补充:

    • MISSING - 写入过程中没有任何缓冲或丢弃, 即不操作.
  • 相关阅读:
    java 无符号byte转换
    MySQL分区总结
    eclipse @override注解出错
    git 利用hook 实现服务器自动更新代码
    Centos 安装mysql
    Centos Python3安装共存
    chromedriver 代理设置(账号密码)
    PyQt5整体介绍
    python图片云
    PyPt5 浏览器实例
  • 原文地址:https://www.cnblogs.com/shwo/p/9874680.html
Copyright © 2011-2022 走看看