zoukankan      html  css  js  c++  java
  • [RxJS] Reusable multicasting with Subject factories

    The way we use publish() (or multicast with an RxJS Subject) makes the shared Observable not reusable if the shared execution happens to complete or emit an error. In this lesson we will see how to use a simple Subject factory function in order to create a new Subject, one for each shared execution, whenever connect() is called.

    var shared = Rx.Observable.interval(1000).take(3)
      .do(x => console.log('source ' + x))
      .multicast(new Rx.Subject())
      .refCount();

    The code above, after subject emit 0,1,2, three values, then it completes. It means if you want to subscribe the subject again, it won't emit anything because it is completed. 

    If you want to reuse the 'shared' subject even after subject complete, you need to use subject factories, which simply just a function return new Subject():

    function subjectFactory() {
      return new Rx.Subject(); 
    }
    
    var shared = Rx.Observable.interval(1000).take(3)
      .do(x => console.log('source ' + x))
      .multicast(subjectFactory)
      .refCount();

    So now even you resubscribe after subject complete, it will emit you new value.

    function subjectFactory() {
      return new Rx.Subject(); 
    }
    
    var shared = Rx.Observable.interval(1000).take(3)
      .do(x => console.log('source ' + x))
      .multicast(subjectFactory)
      .refCount();
    
    // subject: --0--1--2--3--4--5|
    //                               A
    // subject2:                     --0--1--2--3--4--5|
    
    var observerA = {
      next: function (x) { console.log('A next ' + x); },
      error: function (err) { console.log('A error ' + err); },
      complete: function () { console.log('A done'); },
    };
    
    var subA = shared.subscribe(observerA); // 0 => 1
    console.log('subscribed A');
    
    var observerB = {
      next: function (x) { console.log('B next ' + x); },
      error: function (err) { console.log('B error ' + err); },
      complete: function () { console.log('B done'); },
    };
    
    var subB;
    setTimeout(function () {
      subB = shared.subscribe(observerB);
      console.log('subscribed B');
    }, 2000);
    
    setTimeout(function () {
      subA.unsubscribe();
      console.log('unsubscribed A');
    }, 3000);
    
    setTimeout(function () {
      subB.unsubscribe();
      console.log('unsubscribed B');
    }, 5000);
    
    setTimeout(function () {
      subA = shared.subscribe(observerA); // 0 => 1 (connect)
      console.log('subscribed A');
    }, 6000);
    /**
    "subscribed A"
    "source 0"
    "A next 0"
    "source 1"
    "A next 1"
    "subscribed B"
    "source 2"
    "A next 2"
    "B next 2"
    "A done"
    "B done"
    "unsubscribed A"
    "unsubscribed B"
    "subscribed A"
    "source 0"
    "A next 0"
    "source 1"
    "A next 1"
    "source 2"
    "A next 2"
    "A done"
    
    */
  • 相关阅读:
    shell脚本学习
    docker容器的安装与使用
    admin源码分析
    ajax提交文件,django测试脚本环境书写,froms组件,钩子函数
    javascript语法 1.运算符 2. 流程控制 3. 函数 4. 四种变量 5. 数据类型的运用 6. js页面交互
    from提交数据,高级选择器,伪类选择器,前端样式等
    前端HTML介绍,标签介绍,基础选择器,CSS引入方法
    数据库知识总结
    day46
    day45
  • 原文地址:https://www.cnblogs.com/Answer1215/p/6001403.html
Copyright © 2011-2022 走看看