zoukankan      html  css  js  c++  java
  • [RxJS] Implement RxJS `mergeMap` through inner Observables to Subscribe and Pass Values Through

    Understanding sources and subscribers makes it much easier to understand what's going on with mergeMap under the hood. Where a typical operator invokes destination.next directly, mergeMap wraps destination.next inside of a new source/subscriber combo so there's an "outer" next and an "inner" next.

    import { fromEvent, of, Subscriber } from "rxjs"
    import {
      scan,
      delay,
      mergeMap
    } from "rxjs/operators"
    
    class MyMergeMapSubscriber extends Subscriber {
      constructor(sub, fn) {
        super(sub)
    
        this.fn = fn
      }
    
      _next(value) {
        console.log(`outer`, value)
        const o$ = this.fn(value)
    
        o$.subscribe({
          next: value => {
            console.log(`  inner`, value)
            this.destination.next(value)
          }
        })
      }
    }
    
    const myMergeMap = fn => source =>
      source.lift({
        call(sub, source) {
          source.subscribe(
            new MyMergeMapSubscriber(sub, fn)
          )
        }
      })
    
    const observable$ = fromEvent(
      document,
      "click"
    ).pipe(
      scan(i => i + 1, 0),
      myMergeMap(value => of(value).pipe(delay(500)))
    )
    
    const subscriber = {
      next: value => {
        console.log(value)
      },
      complete: () => {
        console.log("done")
      },
      error: value => {
        console.log(value)
      }
    }
    
    observable$.subscribe(subscriber)
  • 相关阅读:
    面向对象介绍
    常用模块2
    常用模块1
    常用模块3
    模块导入以及常用模块
    模块介绍
    Astra: Apache Cassandra的未来是云原生
    麦格理银行借助DataStax Enterprise (DSE) 驱动数字化转型
    Apache Cassandra使用报告2020
    比较Apache Cassandra的压力测试工具
  • 原文地址:https://www.cnblogs.com/Answer1215/p/9715051.html
Copyright © 2011-2022 走看看