zoukankan      html  css  js  c++  java
  • rxjs-流式编程

    前言

    第一次接触rxjs也是因为angular2应用,内置了rxjs的依赖,了解之后发现它的强大,是一个可以代替promise的框架,但是只处理promise的东西有点拿尚方宝剑砍蚊子的意思。

    如果我们的应用是完全rxjs的应用,会显得代码比较清晰,代码写的爽。

    angular团队和微软合作,采用的typescript和rxjs,互相宣传。。

    rxjs

    rxjs是一个比较简单的库,它只有Observable,Observer,subscription,subject,Operators,Scheduler6个对象概念。比较类似于观察者模式,如果再了解一些函数式编程和node的stream就更好了。

    中文文档

    observable API

    observable 可观察对象

    observable是一个可观察对象,也类似观察者模式中的可观察对象,后面的Subscription就相当于观察者模式中的订阅者。

    给一个例子:

    var observable = Rx.Observable.create(function (observer) {
        observer.next(1);
        observer.next(2);
        observer.next(3);
        setTimeout(() => {
        observer.next(4);
            observer.complete();
        }, 1000);
    });
    

    创建了一个Obervable对象,这里用到了create操作符。

    create操作符:创建一个新的 Observable ,当观察者( Observer )订阅该 Observable 时,它会执行指定的函数。

    observer 观察者

    如上例子中的observer,给一个典型的observer例子:

    var observer={
        next:x=>console.log('Observer got a next value: ' + x),
        error: err => console.error('Observer got an error: ' + err),
        complete: () => console.log('Observer got a complete notification')
    }
    

    有点类似promise的返回,每来一个“流”就会执行一个next,出错会执行一个observer的error,完成后或者调用complete便不再监听observable,执行complete函数。这些函数的集合也就是observer。

    要使用观察者,需要订阅可观察对象:

    observable.subscribe(observer)
    

    Subscription订阅

    订阅是一个表示一次性资源的对象,通常是一个可观察对象的执行。

    它有一个重要的方法:unsubscribe,顾名思义。。。

    比如observable的例子:

    var observable = Rx.Observable.create(function (observer) {
        observer.next(1);
        observer.next(2);
        observer.next(3);
        setTimeout(() => {
            observer.next(4);
            observer.complete();
        }, 1000);
    });
    var observer={
        next:x=>console.log('Observer got a next value: ' + x),
        error: err => console.error('Observer got an error: ' + err),
        complete: () => console.log('Observer got a complete notification')
    };
    observable.subscribe(observer);
    //返回
    Observer got a next value: 1
    Observer got a next value: 2
    Observer got a next value: 3
    Observer got a next value: 4 //after 1s return
    Observer got a complete notification
    

    如果在最后调用subscription.unsubscribe();那么4就不会执行,complete也不会执行,就会取消掉这个观察。

    Subject

    Subject是允许值被多播到多个观察者的一种特殊的Observable。然而纯粹的可观察对象是单播的(每一个订阅的观察者拥有单独的可观察对象的执行)。

    subject是Observable对象,并且自带next,error,complete函数,所以我们不用在定义observer:

    var subject = new Rx.Subject();
    
    subject.subscribe({
      next: (v) => console.log('observerA: ' + v)
    });
    subject.subscribe({
      next: (v) => console.log('observerB: ' + v)
    });
    
    subject.next(1);
    subject.next(2);
    //返回
    observerA: 1
    observerB: 1
    observerA: 2
    observerB: 2
    

    由于subject自带next等等的函数,所以它也是个observer,也可以这样用:

    var subject = new Rx.Subject();
    
    subject.subscribe({
      next: (v) => console.log('observerA: ' + v)
    });
    subject.subscribe({
      next: (v) => console.log('observerB: ' + v)
    });
    
    var observable = Rx.Observable.from([1, 2, 3]);
    
    observable.subscribe(subject); // You can subscribe providing a Subject
    

    Operators操作符

    rx因为operators强大,我们可以流式的处理主要因为有operators在。

    操作符是可观察对象上定义的方法,例如.map(...),.filter(...),.merge(...),等等。他们类似fp,返回新的observable而subscription对象也会继承。

    比如

    Rx.Observable.interval(500).filter(x => x%2==1).subscribe( res => console.log(res) );
    // 一秒输出一个数,返回单数。
    

    这里的filter就是操作符,我们通过操作符来完成一系列的神奇操作。

    Scheduler调度者

    什么是调度者?调度者控制着何时启动一个订阅和何时通知被发送。

    名称 类型 属性 描述
    queue Scheduler 在当前事件帧中调度队列(trampoline 调度器)。迭代操作符使用此调度器。
    asap Scheduler 微任务队列上的调度, 使用尽可能快的转化机制, 或者是 Node.js 的 process.nextTick(),或者是 Web Worker 的消息通道,或者 setTimeout , 或者其他。异步转化使用此调度器.
    async Scheduler 使用 setInterval 调度工作。基于时间的操作符使用此调度器。
    animationFrame Scheduler 使用 requestAnimationFrame 调度工作。与平台的重绘同步使用此调度器。
    var observable = Rx.Observable.create(function (observer) {
      observer.next(1);
      observer.next(2);
      observer.next(3);
      observer.complete();
    })
    .observeOn(Rx.Scheduler.async);
    
    console.log('just before subscribe');
    observable.subscribe({
      next: x => console.log('got value ' + x),
      error: err => console.error('something wrong occurred: ' + err),
      complete: () => console.log('done'),
    });
    console.log('just after subscribe');
    
    //返回
    just before subscribe
    just after subscribe
    got value 1
    got value 2
    got value 3
    done
    

    这是因为observeOn(Rx.Scheduler.async)在Observable.create和最终的Observer之间引入了一个代理Observer。

    var proxyObserver = {
      next: (val) => {
        Rx.Scheduler.async.schedule(
          (x) => finalObserver.next(x),
          0 /* delay */,
          val /* will be the x for the function above */
        );
      },
    
      // ...
    }
    

    使用rxjs

    搜索功能

    <input id="text"></input>
    <script>
        var text = document.querySelector('#text');
        text.addEventListener('keyup', (e) =>{
            var searchText = e.target.value;
            // 发送输入内容到后台
            $.ajax({
                url: `xx.com/${searchText}`,
                success: data => {
                  // 拿到后台返回数据,并展示搜索结果
                  render(data);
                }
            });
        });
    </script>
    

    之前实现一个搜索效果,其实需要这样的代码,应用到函数节流还需要写为

      clearTimeout(timer);
      // 定时器,在 250 毫秒后触发
       timer = setTimeout(() => {
            console.log('发起请求..');
        },250)
    

    还要考虑一种情况,如果我们搜索了a,然后马上改为了b,会返回a的结果,这样我们就需要判断一下:

        clearTimeout(timer)
        timer = setTimeout(() => {
            // 声明一个当前所搜的状态变量
            currentSearch = '书'; 
    
            var searchText = e.target.value;
            $.ajax({
                url: `xx.com/${searchText}`,
                success: data => {
                    // 判断后台返回的标志与我们存的当前搜索变量是否一致
                    if (data.search === currentSearch) {
                        // 渲染展示
                        render(data);
                    } else {
                        // ..
                    }
                }           
        });
    

    这种代码其实就很杂乱了。

    如果用rxjs,我们的代码能简单并且清楚很多:

    var text = document.querySelector('#text');
    var inputStream = Rx.Observable.fromEvent(text, 'keyup')
                        .debounceTime(250)
                        .pluck('target', 'value')
                        .switchMap(url => Http.get(url))
                        .subscribe(data => render(data));
    

    rxjs几个操作符

    forkJoin

    rxjs版的promise.all

    const getPostOne$ = Rx.Observable.timer(1000).mapTo({id: 1});
    const getPostTwo$ = Rx.Observable.timer(2000).mapTo({id: 2});
    
    Rx.Observable.forkJoin(getPostOne$, getPostTwo$).subscribe(res => console.log(res)) 
    //返回
    [ { id: 1 }, { id: 2 } ]
    

    pairwise

    可以保存上一个值

    Rx.Observable
      .fromEvent(document, 'scroll')
      .map(e => window.pageYOffset)
      .pairwise()
      .subscribe(pair => console.log(pair)); // pair[1] - pair[0]
    

    switchMap

    合并两个流的值,并只发出最新的值

    const clicks$ = Rx.Observable.fromEvent(document, 'click');
    const innerObservable$ = Rx.Observable.interval(1000);
    
    clicks$.switchMap(event => innerObservable$)
                        .subscribe(val => console.log(val));
    

    每次点击触发才发送interval值,并且点击之后interval重新发送,取消掉之前的值。如果是mergeMap,则不取消之前的值。

    toPromise

    返回promise

    let source = Rx.Observable
      .of(42)
      .toPromise();
    
    source.then((value) => console.log('Value: %s', value));
    // => Value: 42
    

    fromPromise

    将 Promise 转化为 Observable。

    var result = Rx.Observable.fromPromise(fetch('http://myserver.com/'));
    result.subscribe(x => console.log(x), e => console.error(e));
    

    有了和promise相互转化的api,就很方便的用rx,ng2中内置rx,用着不爽就任意改成promise来写。

    takeUntil

    public takeUntil(notifier: Observable): Observable
    发出源 Observable 发出的值,直到notifier:Observable 发出值。

    rx.Observable.interval(1000).takeUntil(rx.Observable.fromEvent(document,'click'))
    

    触发interval,然后每次点击停止触发。

    所以它还有一个用法就是建立一个stop流,来避免手动调用unsubscribe。

       const data$ = this.getData();
       const cancelBtn = this.element.querySelector('.cancel-button');
       const rangeSelector = this.element.querySelector('.rangeSelector');
    
       const cancel$ = Observable.fromEvent(cancelBtn, 'click');
       const range$ = Observable.fromEvent(rangeSelector, 'change').map(e => e.target.value);
       
       const stop$ = Observable.merge(cancel$, range$.filter(x => x > 500))
       this.subscription = data$.takeUntil(stop$).subscribe(data => this.updateData(data));
    

    rxjs在ng2

    先提BehaviorSubject

    BehaviorSubject继承自Observable类,它储存着要发射给消费者的最新的值。
    无论何时一个新的观察者订阅它,都会立即接受到这个来自BehaviorSubject的"当前值"。

    比如

    var subject = new Rx.BehaviorSubject(0); // 0 is the initial value
    
    subject.subscribe({
      next: (v) => console.log('observerA: ' + v)
    });
    
    subject.next(1);
    subject.next(2);
    
    subject.subscribe({
      next: (v) => console.log('observerB: ' + v)
    });
    
    subject.next(3);
    
    //返回
    observerA: 0
    observerA: 1
    observerA: 2
    observerB: 2
    observerA: 3
    observerB: 3
    

    每次next就传一个值,在observer里面写函数处理。

    例子

    我们有一个material table的例子来看。

    代码看文最后

    我们做的是一个table中的filter功能,类似find item by name。

    一般的思路就是获取这个input的值,函数节流,在我们的table数据中filter这个name,然后给原来绑定的data赋值。

    对于rx的写法就很清楚了。

     Observable.fromEvent(this.filter.nativeElement, 'keyup')
            .debounceTime(150)
            .distinctUntilChanged()
            .subscribe(() => {
              if (!this.dataSource) { return; }
              this.dataSource.filter = this.filter.nativeElement.value;
            });
    

    我们获取输入的值,节流,去重,赋值给this.dataSource,this.dataSource其实是ExampleDataSource的实例。

    ExampleDatabase类是生成数据的类,可以忽略,ExampleDataSource是我们做处理的一个类,material暴露了一个connect方法,返回的observable直接绑定table的data。

    主要的处理在ExampleDataSource里:

    export class ExampleDataSource extends DataSource<any> {
      _filterChange = new BehaviorSubject('');
      get filter(): string { return this._filterChange.value; }
      set filter(filter: string) { this._filterChange.next(filter); }
    
      constructor(private _exampleDatabase: ExampleDatabase) {
        super();
      }
    
      /** Connect function called by the table to retrieve one stream containing the data to render. */
      connect(): Observable<UserData[]> {
        const displayDataChanges = [
          this._exampleDatabase.dataChange,
          this._filterChange,
        ];
    
        return Observable.merge(...displayDataChanges).map(() => {
          return this._exampleDatabase.data.slice().filter((item: UserData) => {
            let searchStr = (item.name + item.color).toLowerCase();
            return searchStr.indexOf(this.filter.toLowerCase()) != -1;
          });
        });
      }
    

    我们设置了filter这个属性的get和set,每次我们按下按键,给this.dataSource.filter赋值的时候,实际上,我们调用了BehaviorSubject的next方法,

    发了一个事件。我们还需要merge一下_exampleDatabase.dataChange事件,为了当table数据改变的时候,我们能做出相应的处理。

    然后就用map操作符,filter一下我们的data数据。给table数据绑定material已经帮我们做了。

    附文:

    import {Component, ElementRef, ViewChild} from '@angular/core';
    import {DataSource} from '@angular/cdk';
    import {BehaviorSubject} from 'rxjs/BehaviorSubject';
    import {Observable} from 'rxjs/Observable';
    import 'rxjs/add/operator/startWith';
    import 'rxjs/add/observable/merge';
    import 'rxjs/add/operator/map';
    import 'rxjs/add/operator/debounceTime';
    import 'rxjs/add/operator/distinctUntilChanged';
    import 'rxjs/add/observable/fromEvent';
    
    @Component({
      selector: 'table-filtering-example',
      styleUrls: ['table-filtering-example.css'],
      templateUrl: 'table-filtering-example.html',
    })
    export class TableFilteringExample {
      displayedColumns = ['userId', 'userName', 'progress', 'color'];
      exampleDatabase = new ExampleDatabase();
      dataSource: ExampleDataSource | null;
    
      @ViewChild('filter') filter: ElementRef;
    
      ngOnInit() {
        this.dataSource = new ExampleDataSource(this.exampleDatabase);
        Observable.fromEvent(this.filter.nativeElement, 'keyup')
            .debounceTime(150)
            .distinctUntilChanged()
            .subscribe(() => {
              if (!this.dataSource) { return; }
              this.dataSource.filter = this.filter.nativeElement.value;
            });
      }
    }
    
    /** Constants used to fill up our data base. */
    const COLORS = ['maroon', 'red', 'orange', 'yellow', 'olive', 'green', 'purple',
      'fuchsia', 'lime', 'teal', 'aqua', 'blue', 'navy', 'black', 'gray'];
    const NAMES = ['Maia', 'Asher', 'Olivia', 'Atticus', 'Amelia', 'Jack',
      'Charlotte', 'Theodore', 'Isla', 'Oliver', 'Isabella', 'Jasper',
      'Cora', 'Levi', 'Violet', 'Arthur', 'Mia', 'Thomas', 'Elizabeth'];
    
    export interface UserData {
      id: string;
      name: string;
      progress: string;
      color: string;
    }
    
    /** An example database that the data source uses to retrieve data for the table. */
    export class ExampleDatabase {
      /** Stream that emits whenever the data has been modified. */
      dataChange: BehaviorSubject<UserData[]> = new BehaviorSubject<UserData[]>([]);
      get data(): UserData[] { return this.dataChange.value; }
    
      constructor() {
        // Fill up the database with 100 users.
        for (let i = 0; i < 100; i++) { this.addUser(); }
      }
    
      /** Adds a new user to the database. */
      addUser() {
        const copiedData = this.data.slice();
        copiedData.push(this.createNewUser());
        this.dataChange.next(copiedData);
      }
    
      /** Builds and returns a new User. */
      private createNewUser() {
        const name =
            NAMES[Math.round(Math.random() * (NAMES.length - 1))] + ' ' +
            NAMES[Math.round(Math.random() * (NAMES.length - 1))].charAt(0) + '.';
    
        return {
          id: (this.data.length + 1).toString(),
          name: name,
          progress: Math.round(Math.random() * 100).toString(),
          color: COLORS[Math.round(Math.random() * (COLORS.length - 1))]
        };
      }
    }
    
    /**
     * Data source to provide what data should be rendered in the table. Note that the data source
     * can retrieve its data in any way. In this case, the data source is provided a reference
     * to a common data base, ExampleDatabase. It is not the data source's responsibility to manage
     * the underlying data. Instead, it only needs to take the data and send the table exactly what
     * should be rendered.
     */
    export class ExampleDataSource extends DataSource<any> {
      _filterChange = new BehaviorSubject('');
      get filter(): string { return this._filterChange.value; }
      set filter(filter: string) { this._filterChange.next(filter); }
    
      constructor(private _exampleDatabase: ExampleDatabase) {
        super();
      }
    
      /** Connect function called by the table to retrieve one stream containing the data to render. */
      connect(): Observable<UserData[]> {
        const displayDataChanges = [
          this._exampleDatabase.dataChange,
          this._filterChange,
        ];
    
        return Observable.merge(...displayDataChanges).map(() => {
          return this._exampleDatabase.data.slice().filter((item: UserData) => {
            let searchStr = (item.name + item.color).toLowerCase();
            return searchStr.indexOf(this.filter.toLowerCase()) != -1;
          });
        });
      }
    
      disconnect() {}
    }
    
  • 相关阅读:
    Entity Framework 出现 "此 ObjectContext 实例已释放,不可再用于需要连接的操作" 的错误
    JS 页面加载触发事件 document.ready和window.onload的区别
    C# 控制台程序实现 Ctrl + V 粘贴功能
    网站推广必备的16个营销工具
    C# 如何捕获键盘按钮和组合键以及KeyPress/KeyDown事件之间的区别 (附KeyChar/KeyCode值)
    jQuery问题集锦
    zend studio打开文件提示unsupported character encoding
    简单的Jquery焦点图切换效果
    HTML实体符号代码速查表
    心得感悟
  • 原文地址:https://www.cnblogs.com/dh-dh/p/7211245.html
Copyright © 2011-2022 走看看