zoukankan      html  css  js  c++  java
  • [RxJS] Build your own RxJS

    JavaScript has multiple APIs that use callback functions that all do nearly the same thing with slight variations. Event listeners, array methods such as .forEach, promises, and NodeJS streams all are very close in the way they are written. Instead, in RxJS you'd unify all of these APIs under one abstraction.

    Normal RxJS API:

    import { from } from "rxjs";
    import { map, filter } from "rxjs/operators";
    
    from([1, 2, 3, 4])
      .pipe(map(x => x * 2))
      .pipe(filter(x => x < 5))
      .subscribe(val => console.log(val)); 
    // 2 
    // 4

    We can build our own RxJS operator

    First, Observable,

      it has API:

    {
      subscribe() {}
      pipe() {}  
    }

    We can create a function call 'createObservable(subscribe)', take a subscribe function, return a subscribe and pipe function:

    function createObservable(subscribe) {
      return {
        subscribe,
        pipe(operator) {
          return operator(this);
        }
      };
    }

    We can use it to create Observables:

    const numberObservable = createObservable(function(observer) {
      [10, 20, 30, 40].forEach(x => {
        observer.next(x);
      });
    
      observer.complete();
    });
    
    const clickObservable = createObservable(function(observer) {
      document.addEventListener("click", function(ev) {
        observer.next(ev);
      });
    });

    Second, Observer: 

      Observer is easy, it takes a object which contains 'next', 'error', 'complete' functions:

    const observer = {
      next(x) {
        console.log(x);
      },
      error(err) {
        console.error(err);
      },
      complete() {
        console.log("DONE");
      }
    };

    Third, Operator, map, filter:

    map(fn)(observable)

    filter(predFn)(observable)  

    It is important to know that map & filter, those operator, takes an inputObservable and will return an outputObservable.

    We subscribe inputObservable, and inputObserver, inside inputObserver, we call outputObserver which is passed in from the consumer.

    const map = fn => inputObservable => {
      const outputObservable = createObservable(function(outputObserver) {
        const observer = {
          next(x) {
            const res = fn(x);
            outputObserver.next(res);
          },
          error(err) {
            outputObserver.error(err);
          },
          complete() {
            outputObserver.complete();
          }
        };
        inputObservable.subscribe(observer);
      });
    
      return outputObservable;
    };
    
    const filter = fn => inputObservable => {
      const outputObservable = createObservable(function(outputObserver) {
        const observer = {
          next(x) {
            if (fn(x)) {
              outputObserver.next(x);
            }
          },
          error(err) {
            outputObserver.error(err);
          },
          complete() {
            outputObserver.complete();
          }
        };
        inputObservable.subscribe(observer);
      });
    
      return outputObservable;
    };

    --

    Full Code:

    function createObservable(subscribe) {
      return {
        subscribe,
        pipe(operator) {
          return operator(this);
        }
      };
    }
    
    const numberObservable = createObservable(function(observer) {
      [10, 20, 30, 40].forEach(x => {
        observer.next(x);
      });
    
      observer.complete();
    });
    
    const clickObservable = createObservable(function(observer) {
      document.addEventListener("click", function(ev) {
        observer.next(ev);
      });
    });
    
    const map = fn => inputObservable => {
      const outputObservable = createObservable(function(outputObserver) {
        const observer = {
          next(x) {
            const res = fn(x);
            outputObserver.next(res);
          },
          error(err) {
            outputObserver.error(err);
          },
          complete() {
            outputObserver.complete();
          }
        };
        inputObservable.subscribe(observer);
      });
    
      return outputObservable;
    };
    
    const filter = fn => inputObservable => {
      const outputObservable = createObservable(function(outputObserver) {
        const observer = {
          next(x) {
            if (fn(x)) {
              outputObserver.next(x);
            }
          },
          error(err) {
            outputObserver.error(err);
          },
          complete() {
            outputObserver.complete();
          }
        };
        inputObservable.subscribe(observer);
      });
    
      return outputObservable;
    };
    
    const observer = {
      next(x) {
        console.log(x);
      },
      error(err) {
        console.error(err);
      },
      complete() {
        console.log("DONE");
      }
    };
    
    numberObservable
      .pipe(map(x => x * 3))
      .pipe(map(x => x - 9))
      .subscribe(observer);
    
    clickObservable
      .pipe(map(ev => [ev.clientX, ev.clientY]))
      .pipe(filter(([x, y]) => x < 200 && y < 200))
      .subscribe(observer);
  • 相关阅读:
    JVM安全退出(如何优雅的关闭java服务)
    二维码(QR code)基本结构及生成原理
    数据库连接池 c3p0 druid
    java Graphics2d消除锯齿,使字体平滑显示
    linux下如何删除行首的数字?
    git 如何revert指定范围内的commit并且只生成一个新的commit?
    linux内核中的两个标记GFP_KERNEL和GFP_ATOMIC是用来干什么的?
    ubuntu如何安装svn客户端?
    linux内核中宏likely和unlikely到底做了些什么?
    vi中如何使用cscope来查找函数的定义
  • 原文地址:https://www.cnblogs.com/Answer1215/p/10662844.html
Copyright © 2011-2022 走看看