zoukankan      html  css  js  c++  java
  • [RxJS] Subject: an Observable and Observer hybrid

    This lesson teaches you how a Subject is simply a hybrid of Observable and Observer which can act as a bridge between the source Observable and multiple observers, effectively making it possible for multiple observers to share the same Observable execution.

    var observable = Rx.Observable.interval(1000).take(5);
    
    var observerA = {
      next: function (x) { console.log('A next ' + x); },
      error: function (err) { console.log('A error ' + err); },
      complete: function () { console.log('A done'); },
    };
    
    
    var observerB = {
      next: function (x) { console.log('B next ' + x); },
      error: function (err) { console.log('B error ' + err); },
      complete: function () { console.log('B done'); },
    };
    
    observable.subscribe(observerA);
    
    setTimeout(
      () => {
        observable.subscribe(observerB);
      },2000
    )

    In the code above, we have two 'observers', because we call subscribe twice:

    observable.scbscribe(ObserverA);
    observable.scbscribe(ObserverB);

    If we want to have one observer, so we need to call subscribe only once.

    For that we can build a bridgeObservers, which will loop though the observers:

    const observable = Rx.Observable.interval(1000).take(5);
    
    const ObserverA = {
      next: function(x){
        console.log("A next " + x)
      },
      error: function(x){
        console.error("A error " + x)
      },
      complete: function(){
        console.log("A Done")
      },
    };
    
    
    const ObserverB = {
      next: function(x){
        console.log("B next " + x)
      },
      error: function(x){
        console.error("B error " + x)
      },
      complete: function(){
        console.log("B Done")
      },
    };
    
    
    const BridgeObservers = {
      next: function(x){
        this.observers.forEach(
          o => o.next(x)
        )
      },
      error: function(x){
        this.observers.forEach(
          o => o.error(x)
        )
      },
      complete: function(){
        this.observers.forEach(
          o => o.complete()
        )
      },
      observers: [],
      addObserver: function(observer){
        this.observers.push(observer)
      }
    };
    
    observable.subscribe(BridgeObservers);
    BridgeObservers.addObserver(ObserverA);
    
    setTimeout(function(){
      BridgeObservers.addObserver(ObserverB);
    }, 2000)

    And this partten:

    observable.subscribe(BridgeObservers);
    BridgeObservers.addObserver(ObserverA); // BirdegeObservers.subscribe(ObserverA)

    is actually 'subject' partten, works both as Observer and Observable.

    Subject:

    const observable = Rx.Observable.interval(1000).take(5);
    
    const ObserverA = {
      next: function(x){
        console.log("A next " + x)
      },
      error: function(x){
        console.error("A error " + x)
      },
      complete: function(){
        console.log("A Done")
      },
    };
    
    
    const ObserverB = {
      next: function(x){
        console.log("B next " + x)
      },
      error: function(x){
        console.error("B error " + x)
      },
      complete: function(){
        console.log("B Done")
      },
    };
    
    const subject = new Rx.Subject();
    /*const BridgeObservers = {
      next: function(x){
        this.observers.forEach(
          o => o.next(x)
        )
      },
      error: function(x){
        this.observers.forEach(
          o => o.error(x)
        )
      },
      complete: function(){
        this.observers.forEach(
          o => o.complete()
        )
      },
      observers: [],
      subscribe: function(observer){
        this.observers.push(observer)
      }
    };*/
    
    observable.subscribe(subject);
    subject.subscribe(ObserverA);
    //BridgeObservers.subscribe(ObserverA);
    
    setTimeout(function(){
      subject.subscribe(ObserverB);
     // BridgeObservers.subscribe(ObserverB);
    }, 2000)

     In the end, ObserverA and ObserverB share one single observer. 

  • 相关阅读:
    linux 解压tgz 文件指令
    shell 脚本没有执行权限 报错 bash: ./myshell.sh: Permission denied
    linux 启动solr 报错 Your Max Processes Limit is currently 31202. It should be set to 65000 to avoid operational disruption.
    远程查询批量导入数据
    修改 MZTreeView 赋权节点父节点选中子节点自动选中的问题
    关于乱码的问题解决记录
    我的网站优化之路
    对设计及重构的一点反思
    我的五年岁月
    奔三的路上
  • 原文地址:https://www.cnblogs.com/Answer1215/p/5932638.html
Copyright © 2011-2022 走看看