zoukankan      html  css  js  c++  java
  • RxJava_基本介绍

     1.1  Rxjava的基本及其使用介绍:

    ===========

     抽空把之前收藏的 扔物线 大神写的RxJava入门文章看了。

    http://gank.io/post/560e15be2dca930e00da1083

    花了一晚上看完,顺便做做笔记,然后试着把上传和压缩图片的逻辑改用RxJava实现。
    发现真的用得很爽。屌爆了。整个流程一口气写完,不需要在Handler中跳来跳去,还自带线程调度,真棒。

    截图

    下面是乱写的笔记,有些是直接Ctrl+C,Ctrl+V的,记得乱七八糟,估计也就我自己清楚这笔记写的是什么鬼了(其实就是就为了水篇博客而已,手动滑稽)。

    下载

    https://github.com/ReactiveX/RxJava
    https://github.com/ReactiveX/RxAndroid

    写笔记时的最新版:
    compile 'io.reactivex:rxjava:1.0.14'
    compile 'io.reactivex:rxandroid:1.0.1'

    理解

    RxJava主要用到观察者模式,观察者被观察者通过订阅完成绑定。
    在程序中,观察者无法或者不好去主动(即定时查询,这样比较浪费性能,而且不及时)去获取被观察者的状态。
    一般是在被观察者发生事件时,再去通知观察者。虽然这和现实的情况有点区别。

    用法

    流程

    ->实现被观察者,绑定事件
    —>实现观察者,定义事件的响应
    —->观察者订阅事件(被观察者.subscribe(观察者))
    ——>被订阅者开始发送事件

    API

    Observable:被观察者

    通过create()just()from()来绑定(输入)事件来构造被观察者对象
    通过subscribe()观察者并联,并开始向观察者发送事件

    通过map(),将输入的事件进行转换(加工)成另一个事件,返回结果给观察者响应

    flatMap(),可以将输入的事件进行转换(加工)成其他事件(1转1,或者1转多),将新事件绑定到新的被观察者,构成新的被观察者对象,返回此新被观察者对象
    (之后会由新被观察者发事件给观察者,所以subscribe()的时候,需要注意观察者响应的是事件类型,要跟新观察者一致)

    lift()目标订阅者进行层层包装,通过Observable.Operator,让包装后的订阅者可以直接处理事件
    compose()是对多个lift()的封装

    throttleFirst() 在每次事件触发后的一定时间间隔内丢弃新的事件,用作防抖什么的,按一次后,多少时间内再按了也没用

    Observer:观察者

    主要响应事件
    onNext()onCompleted()onError()
    onCompleted()onError()互斥

    Subscriber:实现了Observer的抽象类

    • onStart(): 这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,
    • 例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。
    • 需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe() 方法,具体可以在后面的文中看到。
    • unsubscribe(): 这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅。
    • 在这个方法被调用后,Subscriber 将不再接收事件。一般在这个方法调用前,可以使用 isUnsubscribed() 先判断一下状态。
    • unsubscribe() 这个方法很重要,因为在 subscribe() 之后, Observable 会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。

    Action0Action1ActionXObserver的不完整版

    Action0只响应一个事件,对应onCompleted()
    Action1只响应一个事件,对应onNext()onCompleted(),还有其他Action2Action3

    Observable.OnSubscribe:订阅

    其实就是事件的抽象,事件会激发观察者,即调用观察者的响应回调

    Scheduler:线程控制

    1
    2
    3
    4
    5
    6
    7
    8
    Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread())
    .map(mapOperator) // 新线程,由 observeOn() 指定
    .observeOn(Schedulers.io())
    .map(mapOperator2) // IO 线程,由 observeOn() 指定
    .observeOn(AndroidSchedulers.mainThread)
    .subscribe(subscriber); // Android 主线程,由 observeOn() 指定
    • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
    • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
    • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
    • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
    • AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
    • subscribeOn()指定 订阅事件发生的线程(只生效一次,最先调用那次)
    • observeOn()指定 观察者响应的线程,可调用(生效)多次
    • doOnSubscribe() doOnSubscribe()的后面跟一个 subscribeOn() ,就能指定准备工作的线程了。

    doOnSubscribe()Subscriber.onStart()同样是在 subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程。
    默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    Observable.create(onSubscribe)
    .subscribeOn(Schedulers.io())
    .doOnSubscribe(new Action0() {
    @Override
    public void call() {
    progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
    }
    })
    .subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);

    1.1 Rxjava的基本及其使用介绍:
    RxJava的异步实现正是基于观察者模式来实现的,而且是一种扩展的观察者模式。
    1.观察者模式概念?
    观察者模式基于Subject这个概念,Subject是一种特殊对象,又叫做主题或者被观察者。当它改变时那些由它保存的一系列对象将会得到通知,而这一系列对象被称作Observer(观察者)。它们会对外暴漏了一个通知方法(比方说update之类的),当Subject状态发生变化时会调用的这个方法。
    观察者模式很适合下面这些场景中的任何一个:
    当你的架构有两个实体类,一个依赖另一个,你想让它们互不影响或者是独立复用它们时。
    当一个变化的对象通知那些与它自身变化相关联的未知数量的对象时。
    当一个变化的对象通知那些无需推断具体类型的对象时。

    在RxJava中主要有4个角色:
    Observable(被观察者)
    Subject
    Observer(观察者)
    Subscriber(观察者:其实是一个实现了Observer的抽象类)
    Observable和Subject是两个“生产”实体,Observer和Subscriber是两个“消费”实体。说直白点Observable对应于观察者模式中的被观察者,而Observer和Subscriber对应于观察者模式中的观察者。
    后面我们分析源码的时候也会介绍到。Subject比较复杂,以后再分析。

    上一篇文章中我们说到RxJava中有个关键概念:事件。观察者Observer和被观察者Observable通过subscribe()方法实现订阅关系。从而Observable可以在需要的时候发出事件来通知Observer。

    1.2.RxJava如何使用?
    第一步:创建观察者Observer
    Observer<Object> observer = new Observer<Object>() {
    @Override
    public void onCompleted() {

    }
    };
    在普通的观察者模式中观察者一般只会提供一个update()方法用于被观察者的状态发生变化时,用于提供给被观察者调用。开篇我们讲过RxJava是基于一种扩展的观察这模式实现,这里多出的onCompleted和onError正是对观察者模式的扩展。
    ps:onNext就相当于普通观察者模式中的update
    RxJava中添加了普通观察者模式缺失的三个功能:
    第二步:创建被观察者Observable

    Observable.create()方法可以创建一个Observable,使用crate()创建Observable需要一个OnSubscribe对象,这个对象继承Action1。当观察者订阅我们的Observable时,它作为一个参数传入并执行call()函数。

    Observable<Object> observable = Observable.create(new Observable.OnSubscribe<Object>() {
    @Override
    public void call(Subscriber<? super Object> subscriber) {

    }
    });
    除了create(),just()和from()同样可以创建Observable。
    看看下面两个例子:just(T...)将传入的参数依次发送

    Observable observable = Observable.just("One", "Two", "Three");
    //上面这行代码会依次调用
    //onNext("One");
    //onNext("Two");
    //onNext("Three");
    //onCompleted();
    from(T[])/from(Iterable<? extends T>)将传入的数组或者Iterable拆分成Java对象依次发送

    String[] parameters = {"One", "Two", "Three"};
    Observable observable = Observable.from(parameters);
    //上面这行代码会依次调用
    //onNext("One");
    //onNext("Two");
    //onNext("Three");
    //onCompleted();

    第三步:被观察者Observable订阅观察者Observer(ps:你没看错,不同于普通的观察者模式,这里是被观察者订阅观察者)

    有了观察者和被观察者,我们就可以通过subscribe()l,来实现二者的订阅关系了。
    observable.subscribe(observer);
    连在一起写就是这样:
    Observable.create(new Observable.OnSubscribe<Integer>() {

    @Override
    public void call(Subscriber<? super Integer> subscriber) {
    for (int i = 0; i < 5; i++) {
    subscriber.onNext(i);
    }
    subscriber.onCompleted();
    }

    }).subscribe(new Observer<Integer>() {
    @Override
    public void onCompleted() {
    System.out.println("onCompleted");
    }

    });
    至此一个完整的RxJava调用就完成了。

    兄台,你叨逼叨叨逼叨的说了一大堆,可是我没搞定你特么到底在干啥啊?!!不急,我现在就来告诉你们到底发生了什么。
    首先我们使用Observable.create()创建了一个新的Observable<Integer>,并为create()方法传入了一个OnSubscribe,OnSubscribe中包含一个call()方法,一旦我们调用subscribe()订阅后就会自动触发call()方法。
    call()方法中的参数Subscriber其实就是subscribe()方法中的观察者Observer。

    我们在call()方法中调用了5次onNext()和1次onCompleted()方法。一套流程周下来以后输出结果就是下面这样的:
    Item is 0
    Item is 1
    Item is 2
    Item is 3
    Item is 4
    onCompleted

    看到这里可能你又要说了,大兄弟你别唬我啊!OnSubscribe的call()方法中的参数Subscriber怎么就变成了subscribe()方法中的观察者Observer?!!!这俩儿货明明看起来就是两个不同的类啊。
    我们先看看Subscriber这个类:
    public abstract class Subscriber<T> implements Observer<T>, Subscription {

    ...
    }
    从源码中我们可以看到,Subscriber是Observer的一个抽象实现类,所以我首先可以肯定的是Subscriber和Observer类型是一致的。接着往下我们看看subscribe()这个方法:

    public final Subscription subscribe(final Observer<? super T> observer) {

    //这里的if判断对于我们要分享的问题没有关联,可以先无视
    if (observer instanceof Subscriber) {
    return subscribe((Subscriber<? super T>)observer);
    }
    return subscribe(new Subscriber<T>() {

    @Override
    public void onCompleted() {
    observer.onCompleted();
    }

    @Override
    public void onNext(T t) {
    observer.onNext(t);
    }

    });
    }
    我们看到subscribe()方法内部首先将传进来的Observer做了一层代理,将它转换成了Subscriber。
    我们再看看这个方法内部的subscribe()方法:

    public final Subscription subscribe(Subscriber<? super T> subscriber) {
    return Observable.subscribe(subscriber, this);
    }
    进一步往下追踪看看return后面这段代码到底做了什么。精简掉其他无关代码后的subscribe(subscriber, this)方法是这样的:

    private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
    subscriber.onStart();
    try {
    hook.onSubscribeStart(observable,observable.onSubscribe).call(subscriber);
    return hook.onSubscribeReturn(subscriber);
    } catch (Throwable e) {
    return Subscriptions.unsubscribed();
    }
    }
    我们重点看看hook.onSubscribeStart(observable,observable.onSubscribe).call(subscriber),
    前面这个hook.onSubscribeStart(observable, observable.onSubscribe)返回的是它自己括号内的第二个参数observable.onSubscribe,然后调用了它的call方法。而这个observable.onSubscribe正是create()方法中的Subscriber,这样整个流程就理顺了。看到这里是不是对RxJava的执行流程清晰了一点呢?这里也建议大家在学习新技术的时候多去翻一翻源码,知其然还要能知其所以然不是吗。

    subscribe()的参数除了可以是Observer和Subscriber以外还可以是Action1、Action0;这是一种更简单的回调,只有一个call(T)方法;由于太简单这里就不做详细介绍了!


    RxJava的异步:RxJava就是来处理异步任务的。
    但是默认情况下我们在哪个线程调用subscribe()就在哪个线程生产事件,在哪个线程生产事件就在哪个线程消费事件。那怎么做到异步呢?RxJava为我们提供Scheduler用来做线程调度,我们来看看RxJava提供了哪些Scheduler。
    同时RxJava还为我们提供了subscribeOn()和observeOn()两个方法来指定Observable和Observer运行的线程。
    Observable.from(getCommunitiesFromServer())
    .flatMap(community -> Observable.from(community.houses))
    .filter(house -> house.price>=5000000)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(this::addHouseInformationToScreen);
    上面这段代码大家应该有印象吧,没错正是我们上一篇文章中的例子。
    subscribeOn(http://Schedulers.io())指定了获取小区列表、处理房源信息等一系列事件都是在IO线程中运行,
    observeOn(AndroidSchedulers.mainThread())指定了在屏幕上展示房源的操作在UI线程执行。
    这就做到了在子线程获取房源,主线程展示房源。

    好了,RxJava系列的入门内容我们就聊到这。下一篇我们再继续介绍更多的API以及它们内部的原理。

  • 相关阅读:
    【新闻发布系统】登录和注销的实现
    【新闻发布系统】项目文档
    JSP九大内置对象
    JDBC数据库连接技术
    使用SQLyog连接MySQL数据库
    MySql--学习成长过程
    MYSQL--学习记录
    MYSQL
    GIT的使用方法
    java 表单验证
  • 原文地址:https://www.cnblogs.com/awkflf11/p/5481087.html
Copyright © 2011-2022 走看看