zoukankan      html  css  js  c++  java
  • [RxJS] Subject: an Observable and Observer hybrid

    This lesson teaches you how a Subject is simply a hybrid of Observable and Observer which can act as a bridge between the source Observable and multiple observers, effectively making it possible for multiple observers to share the same Observable execution.

    var observable = Rx.Observable.interval(1000).take(5);
    
    var observerA = {
      next: function (x) { console.log('A next ' + x); },
      error: function (err) { console.log('A error ' + err); },
      complete: function () { console.log('A done'); },
    };
    
    
    var observerB = {
      next: function (x) { console.log('B next ' + x); },
      error: function (err) { console.log('B error ' + err); },
      complete: function () { console.log('B done'); },
    };
    
    observable.subscribe(observerA);
    
    setTimeout(
      () => {
        observable.subscribe(observerB);
      },2000
    )

    In the code above, we have two 'observers', because we call subscribe twice:

    observable.scbscribe(ObserverA);
    observable.scbscribe(ObserverB);

    If we want to have one observer, so we need to call subscribe only once.

    For that we can build a bridgeObservers, which will loop though the observers:

    const observable = Rx.Observable.interval(1000).take(5);
    
    const ObserverA = {
      next: function(x){
        console.log("A next " + x)
      },
      error: function(x){
        console.error("A error " + x)
      },
      complete: function(){
        console.log("A Done")
      },
    };
    
    
    const ObserverB = {
      next: function(x){
        console.log("B next " + x)
      },
      error: function(x){
        console.error("B error " + x)
      },
      complete: function(){
        console.log("B Done")
      },
    };
    
    
    const BridgeObservers = {
      next: function(x){
        this.observers.forEach(
          o => o.next(x)
        )
      },
      error: function(x){
        this.observers.forEach(
          o => o.error(x)
        )
      },
      complete: function(){
        this.observers.forEach(
          o => o.complete()
        )
      },
      observers: [],
      addObserver: function(observer){
        this.observers.push(observer)
      }
    };
    
    observable.subscribe(BridgeObservers);
    BridgeObservers.addObserver(ObserverA);
    
    setTimeout(function(){
      BridgeObservers.addObserver(ObserverB);
    }, 2000)

    And this partten:

    observable.subscribe(BridgeObservers);
    BridgeObservers.addObserver(ObserverA); // BirdegeObservers.subscribe(ObserverA)

    is actually 'subject' partten, works both as Observer and Observable.

    Subject:

    const observable = Rx.Observable.interval(1000).take(5);
    
    const ObserverA = {
      next: function(x){
        console.log("A next " + x)
      },
      error: function(x){
        console.error("A error " + x)
      },
      complete: function(){
        console.log("A Done")
      },
    };
    
    
    const ObserverB = {
      next: function(x){
        console.log("B next " + x)
      },
      error: function(x){
        console.error("B error " + x)
      },
      complete: function(){
        console.log("B Done")
      },
    };
    
    const subject = new Rx.Subject();
    /*const BridgeObservers = {
      next: function(x){
        this.observers.forEach(
          o => o.next(x)
        )
      },
      error: function(x){
        this.observers.forEach(
          o => o.error(x)
        )
      },
      complete: function(){
        this.observers.forEach(
          o => o.complete()
        )
      },
      observers: [],
      subscribe: function(observer){
        this.observers.push(observer)
      }
    };*/
    
    observable.subscribe(subject);
    subject.subscribe(ObserverA);
    //BridgeObservers.subscribe(ObserverA);
    
    setTimeout(function(){
      subject.subscribe(ObserverB);
     // BridgeObservers.subscribe(ObserverB);
    }, 2000)

     In the end, ObserverA and ObserverB share one single observer. 

  • 相关阅读:
    TOMCAT热部署 catalina.home catalina.base
    spring boot test MockBean
    源码分析ConcurrentHashMap
    源码分析Thread
    源码分析String
    jvm 占用高的问题定位
    docker 资源限制
    数据库设计方案与优化
    linux搜索查找类命令|--grep指令
    linux搜索查找类命令|--locate命令
  • 原文地址:https://www.cnblogs.com/Answer1215/p/5932638.html
Copyright © 2011-2022 走看看