zoukankan      html  css  js  c++  java
  • RXJava2响应式编程框架设计<三>---Rxjava2背压、生命周期

    在上一次https://www.cnblogs.com/webor2006/p/12348890.html中已经完成了对RxJava2的整个线程切换原理的详细剖析了,这次继续来学习它其它比较重要的知识点,手写RxJava2核心原理放到下一次。

    RxJava2背压【Backpressure】:

    背压这个概念在RxJava1.x中是木有的,它是在RxJava2.x才提出来的,而这个概念又是很重要的,因为不了解它很容易在实际工作中使用带定时发送消息的场景时出现背压造成的程序崩溃,所以接下来详细来解读一下它。

    出现原因:

    当上下游在不同的线程中,通过Observable发射,处理,响应数据流时,如果上游发射数据的速度快于下游接收处理数据的速度,这样对于那些没来得及处理的数据就会造成积压,这些数据既不会丢失,也不会被垃圾回收机制回收,而是存放在一个异步缓存池中,如果缓存池中的数据一直得不到处理,越积越多,最后就会造成内存溢出,这便是响应式编程中的背压(backpressure)问题。关于背压的学习推荐该博文:https://www.jianshu.com/p/ceb48ed8719d,如博主所描述的背景情况,这里跟着梳理一下:

    先来看一下观察者 & 被观察者 之间存在2种订阅关系:同步 & 异步:

    • 同步订阅

      它的订阅流程如下: 

    • 异步订阅

      其中标红的描述是产生背压的主要原因,下面再来看一下它的整个订阅流程,被观察者发送的事件先存到缓存区中:

      然后观察者则从缓存区中来取事件进行一一处理,如下:

    由这订阅关系所阐述出来的问题:

    对于同步关系而言貌似木有啥问题,因为上游事件的发送都得要下游处理完了才能进行,而对于异步的订阅关系由于缓存区的一个原因会有如下问题产生:

    那有啥影响呢?处理不过来就慢慢处理呗,但是极端情况下会出现这样,如博主的说明:

    这就是为啥对于背压情况是一定得要去处理的,因为如果不了解它在实际工作中很容易就趟雷了。

    背压现象演示:

    接下来通过实验来演示一下背压现象,只有清楚了背压的一个问题之后,你才能够针对性的解决它,好下面的demo直接用博主举的,不过为了能重现OOM的异常我将其数据量加大了:

    • 被观察者的发送事件速度 = 10ms / 个,每个数据大小为1m,可能有点极端,但是能比较好的重现背压的问题。
    • 观察者的接收事件速度 = 4s / 个。

    代码如下:

    public class MainActivity extends AppCompatActivity {
    
        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_main);
            Observable.create(new ObservableOnSubscribe<String>() {
                // 1. 创建被观察者 & 生产事件
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    byte[] data = new byte[1024];
                    for (int i = 0; ; i++) {
                        Log.d("cexo", "发送了事件" + i);
                        Thread.sleep(10);
                        // 发送事件速度:10ms / 个
                        emitter.onNext(new String(data));
    
                    }
    
                }
            }).subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行
                    .observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行
                    .subscribe(new Observer<String>() {
                        // 2. 通过通过订阅(subscribe)连接观察者和被观察者
    
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d("cexo", "开始采用subscribe连接");
                        }
    
                        @Override
                        public void onNext(String value) {
    
                            try {
                                // 接收事件速度:4s / 个
                                Thread.sleep(4000);
                                Log.d("cexo", "接收到了事件" + value);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
    
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d("cexo", "对Error事件作出响应");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d("cexo", "对Complete事件作出响应");
                        }
    
                    });
    
    
        }
    }

    这里在我现在在用的Redmi Note 7没能等到OOM的情况,可能是手机内存比较高,机身内存是6G的,做实验内存中间有回落的情况,但是总的内存是持续的往上增,能增到三四兆,所以OOM肯定是会有的,因为实际商用APP本身加载正常页面就占一定的内存;另外我换了个配置稍低一占的Redmi 3s,机身内存是3G的,用profile运行一段时间的结果为:

    到达198MB左右,应用崩掉了,看一下日志:

    02-24 09:21:35.715 17990-17998/? I/art: Starting a blocking GC Alloc
    02-24 09:21:35.715 17990-18026/? W/art: Throwing OutOfMemoryError "Failed to allocate a 28 byte allocation with 0 free bytes and -2832B until OOM" (recursive case)
    02-24 09:21:35.715 17990-18024/? I/art: Waiting for a blocking GC Alloc
    02-24 09:21:35.741 17990-18026/? W/art: "RxCachedThreadScheduler-1" daemon prio=5 tid=14 Runnable
    02-24 09:21:35.741 17990-18026/? W/art:   | group="main" sCount=0 dsCount=0 obj=0x2ac0b520 self=0x559125c5b0
    02-24 09:21:35.741 17990-18026/? W/art:   | sysTid=18026 nice=0 cgrp=default sched=0/0 handle=0x7f980cc450
    02-24 09:21:35.741 17990-18026/? W/art:   | state=R schedstat=( 7951401546 970377205 16160 ) utm=607 stm=188 core=6 HZ=100
    02-24 09:21:35.741 17990-18026/? W/art:   | stack=0x7f97fca000-0x7f97fcc000 stackSize=1037KB
    02-24 09:21:35.741 17990-18026/? W/art:   | held mutexes= "mutator lock"(shared held)
    02-24 09:21:35.741 17990-18026/? W/art:   native: #00 pc 00000000004903bc  /system/lib64/libart.so (_ZN3art15DumpNativeStackERNSt3__113basic_ostreamIcNS0_11char_traitsIcEEEEiP12BacktraceMapPKcPNS_9ArtMethodEPv+200)
    02-24 09:21:35.741 17990-18026/? W/art:   native: #01 pc 000000000045a100  /system/lib64/libart.so (_ZNK3art6Thread9DumpStackERNSt3__113basic_ostreamIcNS1_11char_traitsIcEEEEP12BacktraceMap+220)
    02-24 09:21:35.741 17990-18026/? W/art:   native: #02 pc 0000000000463b98  /system/lib64/libart.so (_ZN3art6Thread21ThrowOutOfMemoryErrorEPKc+244)
    02-24 09:21:35.741 17990-18026/? W/art:   native: #03 pc 0000000000244e88  /system/lib64/libart.so (_ZN3art2gc4Heap21ThrowOutOfMemoryErrorEPNS_6ThreadEmNS0_13AllocatorTypeE+544)
    02-24 09:21:35.741 17990-18026/? W/art:   native: #04 pc 000000000024b1ac  /system/lib64/libart.so (_ZN3art2gc4Heap22AllocateInternalWithGcEPNS_6ThreadENS0_13AllocatorTypeEmPmS5_S5_PPNS_6mirror5ClassE+1244)
    02-24 09:21:35.741 17990-18026/? W/art:   native: #05 pc 000000000017afd4  /system/lib64/libart.so (_ZN3art6mirror5Class11AllocObjectEPNS_6ThreadE+268)
    02-24 09:21:35.741 17990-18026/? W/art:   native: #06 pc 000000000046314c  /system/lib64/libart.so (_ZN3art6Thread24ThrowNewWrappedExceptionEPKcS2_+668)
    02-24 09:21:35.741 17990-18026/? W/art:   native: #07 pc 0000000000463c40  /system/lib64/libart.so (_ZN3art6Thread21ThrowOutOfMemoryErrorEPKc+412)
    02-24 09:21:35.741 17990-18026/? W/art:   native: #08 pc 0000000000244e88  /system/lib64/libart.so (_ZN3art2gc4Heap21ThrowOutOfMemoryErrorEPNS_6ThreadEmNS0_13AllocatorTypeE+544)
    02-24 09:21:35.741 17990-18026/? W/art:   native: #09 pc 000000000024b1ac  /system/lib64/libart.so (_ZN3art2gc4Heap22AllocateInternalWithGcEPNS_6ThreadENS0_13AllocatorTypeEmPmS5_S5_PPNS_6mirror5ClassE+1244)
    02-24 09:21:35.741 17990-18026/? W/art:   native: #10 pc 00000000003b8144  /system/lib64/libart.so (_ZN3artL32StringFactory_newStringFromCharsEP7_JNIEnvP7_jclassiiP11_jcharArray+396)
    02-24 09:21:35.741 17990-18026/? W/art:   native: #11 pc 0000000000362424  /data/dalvik-cache/arm64/system@framework@boot.oat (Java_java_lang_StringFactory_newStringFromChars__II_3C+168)
    02-24 09:21:35.741 17990-18026/? W/art:     at java.lang.StringFactory.newStringFromChars!(Native method)
    02-24 09:21:35.741 17990-18026/? W/art:     at java.lang.StringFactory.newStringFromChars(StringFactory.java:218)
    02-24 09:21:35.741 17990-18026/? W/art:     at java.lang.StringFactory.newStringFromBytes(StringFactory.java:203)
    02-24 09:21:35.741 17990-18026/? W/art:     at java.lang.StringFactory.newStringFromBytes(StringFactory.java:53)
    02-24 09:21:35.741 17990-18026/? W/art:     at java.lang.StringFactory.newStringFromBytes(StringFactory.java:45)
    02-24 09:21:35.741 17990-18026/? W/art:     at com.android.rxjavastudy.MainActivity$2.subscribe(MainActivity.java:32)
    02-24 09:21:35.741 17990-18026/? W/art:     at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    02-24 09:21:35.741 17990-18026/? W/art:     at io.reactivex.Observable.subscribe(Observable.java:12051)
    02-24 09:21:35.741 17990-18026/? W/art:     at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
    02-24 09:21:35.741 17990-18026/? W/art:     at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:579)
    02-24 09:21:35.741 17990-18026/? W/art:     at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
    02-24 09:21:35.741 17990-18026/? W/art:     at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
    02-24 09:21:35.741 17990-18026/? W/art:     at java.util.concurrent.FutureTask.run(FutureTask.java:237)
    02-24 09:21:35.741 17990-18026/? W/art:     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:154)
    02-24 09:21:35.741 17990-18026/? W/art:     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:269)
    02-24 09:21:35.741 17990-18026/? W/art:     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113)
    02-24 09:21:35.741 17990-18026/? W/art:     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588)
    02-24 09:21:35.741 17990-18026/? W/art:     at java.lang.Thread.run(Thread.java:818)
    02-24 09:21:35.742 17990-18026/? I/art: Waiting for a blocking GC Alloc
    02-24 09:21:35.750 17990-17998/? I/art: Alloc sticky concurrent mark sweep GC freed 0(0B) AllocSpace objects, 0(0B) LOS objects, 0% free, 192MB/192MB, paused 1.279ms total 34.765ms
    02-24 09:21:35.750 17990-18024/? I/art: WaitForGcToComplete blocked for 194.640ms for cause Alloc

    确实是OOM了。。

    解决之道:

    来看博主对于这块的描述:在 异步订阅关系 中,控制事件发送 & 接收的速度,注:背压的作用域 = 异步订阅关系,即 被观察者 & 观察者处在不同线程中。

    背压原理:

    那在RxJava中实现背压策略的原理思想是这样的:

     

    对于上面的策略用下图来说明一下:

    背压策略的具体实现:Flowable

    在RxJava2.0中,是采用Flowable来实现背压策略的,也叫是 “非阻塞式背压” 策略。下面用图来说明下它的特点:

    具体使用:

    采用带背压策略的代码:

    Flowable.create(new FlowableOnSubscribe<String>() {
                @Override
                public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                    byte[] data = new byte[1024];
                    for (int i = 0; ; i++) {
                        Log.d("cexo", "发送了事件" + i);
                        Thread.sleep(10);
                        // 发送事件速度:10ms / 个
                        emitter.onNext(new String(data));
    
                    }
                }
            }, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行
                    .observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行
                    .subscribe(new FlowableSubscriber<String>() {
                        @Override
                        public void onSubscribe(Subscription s) {
                            Log.d("cexo", "开始采用subscribe连接");
                            s.request(Long.MAX_VALUE);
                        }
    
                        @Override
                        public void onNext(String value) {
    
                            try {
                                // 接收事件速度:5s / 个
                                Thread.sleep(4000);
                                Log.d("cexo", "接收到了事件" + value);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
    
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d("cexo", "对Error事件作出响应");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d("cexo", "对Complete事件作出响应");
                        }  
    
                    });

    其中可以看到在创建Flowable这个被观察者时强制要求必须要指定一个背压策略BackpressureStrategy,它可以取如下值:

    咱们来简单看一下这块的源码:

    然后我们在订阅时则会根据我们传进去的背压策略进行相应的处理:

    关于它里面的具体流程这里暂且就不分析了,整体流程跟上一次分析的其实差不多,主要是对其原理有个了解,具体以后如果遇到了再进行研究,明白了大体原理其实再现学也比较轻松了,关于背压还有一个响应式拉取策略,也就是:

    响应式拉取方式,来设置下游对数据的请求数量,上游可以根据下游的需求量,按需发送数据,如果不显示调用request()则默认下游的需求量为零,所以运行上面的代码后,上游Flowable发射的数据不会交给下游Subscriber处理。下面来演示一下这个参数的含义:

     

    运行一下:

    其中关于背压策略提到了缓存池,那它的大小是多少呢,可以定位源码瞅一下:

     

    好了关于背压这块的东东先了解到这,各个策略的具体效果可以参考大神的这篇博文https://www.jianshu.com/p/ceb48ed8719d,对我来说重点是知道背压出现的原因以及如果出现了能有解决之道就可以了。

    RxJava2生命周期:

    问题演示:

    先不多说,看一下例子:

    运行看下效果:

    看到木有,这是木有加上生命周期管理的问题,也就是在Activity退出之后,该RxJava的定时器还在跑,对于RxJava不是有一个Disposable可以用来取消么?确实是的,咱们来修改一下:

    运行看一下:

    这个很easy嘛,有啥有提的,这里是打算采用一个框架来对它的生命周期进行管理,其原理肯定也是用到了这个Disposable的,地址为:https://github.com/trello/RxLifecycle,用起来也比较方便,顺便来拓展一下见识。

    加入生命周期管理:

    先来看一下官网的介绍:

    其使用方式官网也介绍了,比较简单:

    好,下面咱们来使用一下,先添加依赖:

    这里有个注意点,就是这个库对于support包需要是androidx的,如果不是则编译会报错的,比如:

    好,接下来咱们加入生命周期的代码:

    此时再运行:

    接下来简单瞅一下它的代码,这里用到了一个compose()操作符:

     

    这个compose能够实现我们自己的操作符,关于如何实现可以参阅https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators,这里就不细研究了,然后它调用了bindUntilEvent()方法,它来自于:

    返回了一个LifecycleTransformer对像,瞅一眼:

    实际使用时应该没有可能自已来手动定义一个操作符的,所以这里了解一下既可,再来看一下在这个Activity中生命周期中处里了生命周期的管理 :

    其中lifecycleSubject是Rxjava框架提供的类:

  • 相关阅读:
    ngnix+uwsgi+django 部署mezzanine
    shell三剑客之find
    Flask常见面试问题
    redis宕机如何解决?如果是项目上线的宕机呢?
    UiPath,容智Ibot在线接单,有需求的欢迎过来
    CORS和CSRF
    JWT黑名单和白名单
    Django项目常见面试问题
    降低Redis内存占用
    Redis-缓存有效期与淘汰策略
  • 原文地址:https://www.cnblogs.com/webor2006/p/12354782.html
Copyright © 2011-2022 走看看