zoukankan      html  css  js  c++  java
  • RxJS速成 (下)

    上一部分: http://www.cnblogs.com/cgzl/p/8641738.html

    Subject

    Subject比较特殊, 它即是Observable又是Observer.

    作为Observable, Subject是比较特殊的, 它可以对多个Observer进行广播, 而普通的Observable只能单播, 它有点像EventEmitters(事件发射器), 维护着多个注册的Listeners.

    作为Observable, 你可以去订阅它, 提供一个Observer就会正常的收到推送的值. 从Observer的角度是无法分辨出这个Observable是单播的还是一个Subject.

    从Subject内部来讲, subscribe动作并没有调用一个新的执行来传递值, 它只是把Observer注册到一个列表里, 就像其他库的AddListener一样.

    作为Observer, 它是一个拥有next(), error(), complete()方法的对象, 调用next(value)就会为Subject提供一个新的值, 然后就会多播到注册到这个Subject的Observers.

    例子 subject.ts:

    import { Subject } from "rxjs/Subject";
    
    const subject = new Subject();
    
    const subscriber1 = subject.subscribe({
        next: (v) => console.log(`observer1: ${v}`)
    });
    const subscriber2 = subject.subscribe({
        next: (v) => console.log(`observer2: ${v}`)
    });
    
    subject.next(1);
    subscriber2.unsubscribe();
    subject.next(2);
    
    const subscriber3 = subject.subscribe({
        next: (v) => console.log(`observer3: ${v}`)
    });
    
    subject.next(3);

    订阅者1,2从开始就订阅了subject. 然后subject推送值1的时候, 它们都收到了. 

    然后订阅者2, 取消了订阅, 随后subject推送值2, 只有订阅者1收到了.

    后来订阅者3也订阅了subject, 然后subject推送了3, 订阅者1,3都收到了这个值.

    下面是一个angular 5的例子:

    app.component.html:

    <h3>从Subject共享Observable到多个Subscribers</h3>
    <input type="text" placeholder="start typing" (input)="mySubject.next($event)" (keyup)="mySubject.next($event)">
    
    <br> Subscriber to input events got {{inputValue}}
    <br>
    <br> Subscriber to keyup events got {{keyValue}}

    app.component.ts:

    import { Component } from '@angular/core';
    import { Subject } from 'rxjs/Subject';
    import 'rxjs/add/operator/filter';
    import 'rxjs/add/operator/map';
    
    @Component({
      selector: 'app-root',
      templateUrl: './app.component.html',
      styleUrls: ['./app.component.css']
    })
    export class AppComponent {
      title = 'app';
    
      keyValue: string;
      inputValue: string;
    
      mySubject: Subject<Event> = new Subject();
    
      constructor() {
        // subscriber 1
        this.mySubject.filter(({ type }) => type === 'keyup')
          .map(e => (<KeyboardEvent>e).key)
          .subscribe(value => this.keyValue = value);
    
        // subscriber 2
        this.mySubject.filter(({ type }) => type === 'input')
          .map(e => (<HTMLInputElement>e.target).value)
          .subscribe(value => this.inputValue = value);
      }
    }

    input和keyup动作都把event推送到mySubject, 然后mySubject把值推送给订阅者, 订阅者1通过过滤和映射它只处理keyup类型的事件, 而订阅者2只处理input事件.

    效果:

    BehaviorSubject

    BehaviorSubject 是Subject的一个变种, 它有一个当前值的概念, 它会把它上一次发送给订阅者值保存起来, 一旦有新的Observer进行了订阅, 那这个Observer马上就会从BehaviorSubject收到这个当前值.

    也可以这样理解BehaviorSubject的特点:

    • 它代表一个随时间变化的值, 例如, 生日的流就是Subject, 而一个人的年龄流就是BehaviorSubject.
    • 每个订阅者都会从BehaviorSubject那里得到它推送出来的初始值和最新的值.
    • 用例: 共享app状态.

    例子 behavior-subject.ts:

    import { BehaviorSubject } from "rxjs/BehaviorSubject";
    
    const subject = new BehaviorSubject(0);
    
    subject.subscribe({
        next: v => console.log(`Observer1: ${v}`)
    });
    
    subject.next(1);
    subject.next(2);
    
    subject.subscribe({
        next: v => console.log(`Observer2: ${v}`)
    });
    
    subject.next(3);

    效果:

    常用Operators:

    concat 

    concat: 按顺序合并observables. 只会在前一个observable结束之后才会订阅下一个observable.

    它适合用于顺序处理, 例如http请求.

    例子: 

    import { Observable } from "rxjs/Observable";
    import 'rxjs/add/observable/timer';
    import 'rxjs/add/operator/mapTo';
    import 'rxjs/add/observable/concat';
    
    let firstReq = Observable.timer(3000).mapTo('First Response');
    let secondReq = Observable.timer(1000).mapTo('Second Response');
    
    Observable.concat(firstReq, secondReq)
        .subscribe(res => console.log(res));

    效果:

    merge

    把多个输入的observable交错的混合成一个observable, 不按顺序.

    merge实际上是订阅了每个输入的observable, 它只是把输入的observable的值不带任何转换的发送给输出的Observable. 只有当所有输入的observable都结束了, 输出的observable才会结束. 任何在输入observable传递来的错误都会立即发射到输出的observable, 也就是把整个流都杀死了 .

    例子:

    import { Observable } from "rxjs/Observable";
    import 'rxjs/add/observable/timer';
    import 'rxjs/add/operator/mapTo';
    import 'rxjs/add/observable/merge';
    
    let firstReq = Observable.timer(3000).mapTo('First Response');
    let secondReq = Observable.timer(1000).mapTo('Second Response');
    
    Observable.merge(firstReq, secondReq)
        .subscribe(res => console.log(res));

    效果:

    mergeMap (原来叫flatMap)

    mergeMap把每个输入的Observable的值映射成Observable, 然后把它们混合成一个Observable.

    mergeMap可以把嵌套的observables拼合成非嵌套的observable.

    它有这些好处:

    • 不必编写嵌套的subscribe()
    • 把每个observable发出来的值转换成另一个observable
    • 自动订阅内部的observable并且把它们(可能)交错的合成一排.

    这个还是通过例子来理解比较好:

    import { Observable } from "rxjs/Observable";
    import 'rxjs/add/observable/from';
    import 'rxjs/add/operator/mergeMap';
    
    function getData() {
        const students = Observable.from([
            { name: 'Dave', age: 17 },
            { name: 'Nick', age: 18 },
            { name: 'Lee', age: 15 }
        ]);
    
        const teachers = Observable.from([
            { name: 'Miss Wan', age: 28 },
            { name: 'Mrs Wang', age: 31 },
        ]);
    
        return Observable.create(
            observer => {
                observer.next(students);
                observer.next(teachers);
            }
        );
    }
    
    getData()
        .mergeMap(persons => persons)
        .subscribe(
            p => console.log(`Subscriber got ${p.name} - ${p.age}`)
        );

    效果:

    switchMap

    switchMap把每个值都映射成Observable, 然后使用switch把这些内部的Observables合并成一个.

    switchMap有一部分很想mergeMap, 但也仅仅是一部分像而已.

    因为它还具有取消的效果, 每次发射的时候, 前一个内部的observable会被取消, 下一个observable会被订阅. 可以把这个理解为切换到一个新的observable上了.

    这个还是看marble图比较好理解:

    例子: 

    // 立即发出值, 然后每5秒发出值
    const source = Rx.Observable.timer(0, 5000);
    // 当 source 发出值时切换到新的内部 observable,发出新的内部 observable 所发出的值
    const example = source.switchMap(() => Rx.Observable.interval(500));
    // 输出: 0,1,2,3,4,5,6,7,8,9...0,1,2,3,4,5,6,7,8
    const subscribe = example.subscribe(val => console.log(val));

    更好的例子是: 网速比较慢的时候, 客户端发送了多次重复的请求, 如果前一次请求在2秒内没有返回的话, 那么就取消前一次请求, 不再需要前一次请求的结果了, 这里就应该使用debounceTime配合switchMap.

    mergeMap vs switchMap的例子

    mergeMap:

    import { Observable } from "rxjs/Observable";
    import 'rxjs/add/observable/interval';
    import 'rxjs/add/operator/take';
    import 'rxjs/add/operator/map';
    import 'rxjs/add/operator/mergeMap';
    import 'rxjs/add/operator/switchMap';
    
    const outer = Observable.interval(1000).take(2);
    
    const combined = outer.mergeMap(x => {
        return Observable.interval(400)
            .take(3)
            .map(y => `outer ${x}: inner ${y}`);
    });
    
    combined.subscribe(res => console.log(`result ${res}`));

    效果:

    switchMap:

    import { Observable } from "rxjs/Observable";
    import 'rxjs/add/observable/interval';
    import 'rxjs/add/operator/take';
    import 'rxjs/add/operator/map';
    import 'rxjs/add/operator/mergeMap';
    import 'rxjs/add/operator/switchMap';
    
    const outer = Observable.interval(1000).take(2);
    
    const combined = outer.switchMap(x => {
        return Observable.interval(400)
            .take(3)
            .map(y => `outer ${x}: inner ${y}`);
    });
    
    combined.subscribe(res => console.log(`result ${res}`));

    zip

    zip操作符也会合并多个输入的observables成为一个observable. 多个输入的observable的值, 按顺序, 按索引进行合并, 如果某一个observable在该索引上的值还没有发射值, 那么会等它, 直到所有的输入observables在该索引位置上的值都发射出来, 输出的observable才会发射该索引的值.

    例子:

    import { Observable } from "rxjs/Observable";
    import 'rxjs/add/observable/of';
    import 'rxjs/add/observable/zip';
    
    let age$ = Observable.of<number>(27, 25, 29);
    let name$ = Observable.of<string>('Foo', 'Bar', 'Beer');
    let isDev$ = Observable.of<boolean>(true, true, false);
    
    Observable
        .zip(age$,
            name$,
            isDev$,
            (age: number, name: string, isDev: boolean) => ({ age, name, isDev }))
        .subscribe(x => console.log(x));

    效果:

    就不往下写了, 其实看文档就行, 最重要的还是上一部分.

  • 相关阅读:
    eclipse如何与git 配合工作。
    git托管代码(二)
    PPC2003 安装 CFNET 3.5成功
    我的Window Mobile WCF 項目 第三篇 WM窗体设计
    我的Window Mobile WCF 項目 第一篇Mobile开发和WinForm开发的区别
    我的Window Mobile WCF 項目 第七天
    我的Window Mobile WCF 項目 第二篇 WindowsMobile访问WCF
    WCF 用vs2010 和 vs2008的简单对比测试
    vs2010beta1 和 搜狗输入法 冲突,按下 Ctrl 键就报错,重装搜狗解决
    我的Window Mobile WCF 項目 第六天 (二)
  • 原文地址:https://www.cnblogs.com/cgzl/p/8649477.html
Copyright © 2011-2022 走看看