zoukankan      html  css  js  c++  java
  • RxJS Subject学习

    一个Observable的例子

    import { interval } from "rxjs";
    import { take } from "rxjs/operators";
    
    const interval$ = interval(1000).pipe(take(3));
    
    interval$.subscribe(value => console.log("Observer A get value: " + value));
    
    setTimeout(() => {
      interval$.subscribe(value => console.log("Observer B get value: " + value));
    }, 1000);
    

    输出

    Observer A get value: 0
    Observer A get value: 1
    Observer B get value: 0
    Observer A get value: 2
    Observer B get value: 1
    Observer B get value: 2
    

    可以看到

    • Observable 对象可以被重复订阅。
    • Observable 对象每次被订阅后,都会重新执行。

    一个Subject的例子

    import { interval, Subject } from "rxjs";
    import { take } from "rxjs/operators";
    
    const interval$ = interval(1000).pipe(take(3));
    const subject = new Subject();
    
    const observerA = {
      next: value => console.log("Observer A get value: " + value),
      error: error => console.log("Observer A error: " + error),
      complete: () => console.log("Observer A complete!")
    };
    
    const observerB = {
      next: value => console.log("Observer B get value: " + value),
      error: error => console.log("Observer B error: " + error),
      complete: () => console.log("Observer B complete!")
    };
    
    subject.subscribe(observerA); // 添加观察者A
    interval$.subscribe(subject); // 订阅interval$对象
    setTimeout(() => {
      subject.subscribe(observerB); // 添加观察者B
    }, 1000);
    

    输出

    Observer A get value: 0
    Observer A get value: 1
    Observer B get value: 1
    Observer A get value: 2
    Observer B get value: 2
    Observer A complete!
    Observer B complete!
    

    可以看到

    • Subject 是 Observable 对象。
    • Subject 是保持内部状态的 Observable 对象。
    • Subject 还是 Observe 对象。

    除了 Subject 之外,还有BehaviorSubject、ReplaySubject 和 AsyncSubject。

    BehaviorSubject

    先看一个Subject的例子。

    import { Subject } from "rxjs";
    
    const subject = new Subject();
    
    const observerA = {
      next: value => console.log("Observer A get value: " + value),
      error: error => console.log("Observer A error: " + error),
      complete: () => console.log("Observer A complete!")
    };
    
    const observerB = {
      next: value => console.log("Observer B get value: " + value),
      error: error => console.log("Observer B error: " + error),
      complete: () => console.log("Observer B complete!")
    };
    
    subject.subscribe(observerA);
    
    subject.next(1);
    subject.next(2);
    subject.next(3);
    
    setTimeout(() => {
      subject.subscribe(observerB); // 1秒后订阅
    }, 1000);
    

    输出

    Observer A get value: 1
    Observer A get value: 2
    Observer A get value: 3
    

    这里的observerB没有订阅。
    因为 Subject 对象没有再调用 next() 方法。
    这里的Subject 不能保存当前的最新状态。
    如果希望 Subject 对象能够保存当前的状态,当新增订阅者的时候,自动把当前最新的值发送给订阅者。
    使用 BehaviorSubject。

    import { BehaviorSubject } from "rxjs";
    const subject = new BehaviorSubject(0);
    
    const observerA = {
      next: value => console.log("Observer A get value: " + value),
      error: error => console.log("Observer A error: " + error),
      complete: () => console.log("Observer A complete!")
    };
    
    const observerB = {
      next: value => console.log("Observer B get value: " + value),
      error: error => console.log("Observer B error: " + error),
      complete: () => console.log("Observer B complete!")
    };
    
    subject.subscribe(observerA);
    
    subject.next(1);
    subject.next(2);
    subject.next(3);
    
    setTimeout(() => {
      subject.subscribe(observerB); // 1秒后订阅
    }, 1000);
    

    输出

    Observer A get value: 0
    Observer A get value: 1
    Observer A get value: 2
    Observer A get value: 3
    Observer B get value: 3
    

    同时我们看到const subject = new BehaviorSubject(0);有一个初始值为0,它用于表示 Subject 对象当前的状态。
    subject.subscribe(observerA);这条语句执行后,便会输出Observer A get value: 0

    ReplaySubject

    如果我们希望新增的订阅者,可以接收到数据源最近发送的几个值。
    可以使用ReplaySubject。

    import { ReplaySubject } from "rxjs";
    const subject = new ReplaySubject(2);
    
    const observerA = {
      next: value => console.log("Observer A get value: " + value),
      error: error => console.log("Observer A error: " + error),
      complete: () => console.log("Observer A complete!")
    };
    
    const observerB = {
      next: value => console.log("Observer B get value: " + value),
      error: error => console.log("Observer B error: " + error),
      complete: () => console.log("Observer B complete!")
    };
    
    subject.subscribe(observerA);
    
    subject.next(1);
    subject.next(2);
    subject.next(3);
    
    setTimeout(() => {
      subject.subscribe(observerB); // 1秒后订阅
    }, 1000);
    

    输出

    Observer A get value: 1
    Observer A get value: 2
    Observer A get value: 3
    Observer B get value: 2
    Observer B get value: 3
    

    当你把const subject = new ReplaySubject(2);改为const subject = new ReplaySubject(1);
    输出

    Observer A get value: 1
    Observer A get value: 2
    Observer A get value: 3
    Observer B get value: 3
    

    AsyncSubject

    AsyncSubject 类似于 last 操作符,它会在 Subject 结束后发出最后一个值。

    import { AsyncSubject } from "rxjs";
    const subject = new AsyncSubject();
    
    const observerA = {
      next: value => console.log("Observer A get value: " + value),
      error: error => console.log("Observer A error: " + error),
      complete: () => console.log("Observer A complete!")
    };
    
    const observerB = {
      next: value => console.log("Observer B get value: " + value),
      error: error => console.log("Observer B error: " + error),
      complete: () => console.log("Observer B complete!")
    };
    
    subject.subscribe(observerA);
    
    subject.next(1);
    subject.next(2);
    subject.next(3);
    
    subject.complete();
    
    setTimeout(() => {
      subject.subscribe(observerB); // 1秒后订阅
    }, 1000);
    

    输出

    Observer A get value: 3
    Observer A complete!
    Observer B get value: 3
    Observer B complete!
    

    当你注释掉subject.complete();则什么也不会输出。
    因为subject没有结束。

    参考:
    RxJS Subject

  • 相关阅读:
    采购订单打印并预览PDF
    KiCad 如何在原理图添加元件时看到 PCB 封装?
    KiCad 开源元件库收集 (2019-05-31)
    KiCad 的 Digikey 元件库
    MySQL 出现 Host is blocked because of many connection errors; unblock with 'mysqladmin flush-hosts'
    AD10 没有原理图是否可以修改 PCB
    KiCad 如何画板框
    当 1117 遇到 MLCC 后
    RequireJS 学习资料收集
    MEMS 硅麦资料收集
  • 原文地址:https://www.cnblogs.com/samwu/p/12781800.html
Copyright © 2011-2022 走看看