zoukankan      html  css  js  c++  java
  • rxjs 学习实践笔记

    rxjs笔记部分随笔,权威地址:https://rxjs-cn.github.io/learn-rxjs-operators/operators/

    Import {subject}fromrxjs

    Subject 数据的订阅与分发,结合报刊的发布与订阅进行功能的模拟,subject即是observeable对象也是observer对象,subject对于后期没有数据更新时所添加的订阅者是不怎么友好的,因为不跟新数据时订阅者就不在收到返回的数值

        const interval$ = interval(1000).pipe(take(10));

        const subject = new Subject();

     

        const observerA = {

          next: value => console.log('Observer A get value: ' + value),

          error: error => console.log('Observer A error: ' + error),

          complete: () => console.log('Observer A complete!'),

        };

     

        const observerB = {

          next: value => console.log('Observer B get value: ' + value),

          error: error => console.log('Observer B error: ' + error),

          complete: () => console.log('Observer B complete!'),

        };

     

        subject.subscribe(observerA); // 添加观察者A

        interval$.subscribe(subject); // 订阅interval$对象

        setTimeout(() => {

          subject.subscribe(observerB); // 添加观察者B

        }, 1000);

     

    Import{BehaviorSubject}fromrxjs;

    behaviorSubject subject的变种,最大的区别就是 behaviorSubject是用于保存最新的数值,而不是单纯的发送事件,会将最后一次发送的值作为当前值保存在内部属性中。

        const subject = new BehaviorSubject(0);  //BehaviorSubject小括号0代表的是状态

        const observerA = {

          next: value => console.log('Observer A get value: ' + value),

          error: error => console.log('Observer A error: ' + error),

          complete: () => console.log('Observer A complete!'),

        };

     

        const observerB = {

          next: value => console.log('Observer B get value: ' + value),

          error: error => console.log('Observer B error: ' + error),

          complete: () => console.log('Observer B complete!'),

        };

     

        subject.subscribe(observerA); // 添加观察者A

        // interval$.subscribe(subject); // 订阅interval$对象

        subject.next(1);

        subject.next(2);

        subject.next(3);

        setTimeout(() => {

          subject.subscribe(observerB); // 添加观察者B

        }, 1000);

     

    Import {ReplaySubject}fromrxjs;

    ReplaySubject 用于重复发送最近几次的值给订阅者

        const subject = new ReplaySubject(2); //ReplaySubject后的2为最后两次发送的数值

        const observerA = {

          next: value => console.log('Observer A get value: ' + value),

          error: error => console.log('Observer A error: ' + error),

          complete: () => console.log('Observer A complete!'),

        };

     

        const observerB = {

          next: value => console.log('Observer B get value: ' + value),

          error: error => console.log('Observer B error: ' + error),

          complete: () => console.log('Observer B complete!'),

        };

     

        subject.subscribe(observerA); // 添加观察者A

        // interval$.subscribe(subject); // 订阅interval$对象

        subject.next(1);

        subject.next(2);

        subject.next(3);

        setTimeout(() => {

          subject.subscribe(observerB); // 添加观察者B

        }, 1000);

     

    Import{AsyncSubject}fromrxjs;

    AsyncSubject他会在subject完成后才返回一个值

        const subject = new AsyncSubject();

        const observerA = {

          next: value => console.log('Observer A get value: ' + value),

          error: error => console.log('Observer A error: ' + error),

          complete: () => console.log('Observer A complete!'),

        };

     

        const observerB = {

          next: value => console.log('Observer B get value: ' + value),

          error: error => console.log('Observer B error: ' + error),

          complete: () => console.log('Observer B complete!'),

        };

     

        subject.subscribe(observerA); // 添加观察者A

        // interval$.subscribe(subject); // 订阅interval$对象

        subject.next(1);

        subject.next(2);

        subject.next(3);

        subject.complete();

        setTimeout(() => {

          subject.subscribe(observerB); // 添加观察者B

        }, 1000);

     

     

     

     

     

    Import {Observable}from rxjs

    Create 创建在订阅函数中发出 'wocao' 'woqu' observable

    const obs = Observable.create(observer => {

          observer.next('wocao');

          observer.next('woqu');

        });

        const data = obs.subscribe(e => {

          console.log(e);

        });

    创建定时发送数据 并在12秒后取消订阅数据

    const obs = Observable.create(observer => {

          let num = 0;

          const interve = setInterval(() => {

            if (num % 2 === 0) {

              observer.next('这个是' + num);

            }

            num++;

          }, 1000);

          // return clearInterval(interve);

        });

        const data = obs.subscribe(e => {

          console.log(e);

        });

        setTimeout(() => {

          data.unsubscribe();

        }, 12000);

     

    Import {empty} fromrxjs

    Empty 立即完成observable

     const subscribe = empty().subscribe({

          next: () => console.log('Next'),

          complete: () => console.log('Complete!'),

        });

     

     

    Import {from} fromrxjs

    From 将数组、promise 或迭代器转换成 observable

    const map = new Map();

        map.set(1'wocao');

        map.set(2'擦从年');

        map.set('color'2);

    //Map对象

    const map = from([1,2,3,'wocao'])

    //Map数组

    const data = from(

          new Promise((resolvereject=> {

            resolve('测试');

          }),

    );

    //promise

     

        data.subscribe(res => {

          console.log(res);

        });

     

    Import {fromEvent} fromrxjs

    fromEvent 将事件转换为Observable 序列

     

    const source = fromEvent(document'click');

        const data = source.pipe(map(event => `Event time: ${event.timeStamp}`));

        data.subscribe(res => {

          console.log(res);

        });

     

    Import {fromPromise}fromrxjs/boservable/fromPromise

    fromPromise 创建由 promise 转换而来的 observable,并发出 promise 的结果

    import { of } from 'rxjs/observable/of';

    import { fromPromise } from 'rxjs/observable/fromPromise';

    import { mergeMapcatchError } from 'rxjs/operators';

     

    // 基于输入来决定是 resolve 还是 reject 的示例 promise

    const myPromise = willReject => {

      return new Promise((resolvereject=> {

        if (willReject) {

          reject('Rejected!');

        }

        resolve('Resolved!');

      });

    };

    // 先发出 true,然后是 false

    const source = of(truefalse);

    const example = source.pipe(

      mergeMap(val =>

        fromPromise(myPromise(val)).pipe(

          // 捕获并优雅地处理 reject 的结果

          catchError(error => of(`Error: ${error}`))

        )

      )

    );

    // 输出: 'Error: Rejected!', 'Resolved!'

    const subscribe = example.subscribe(val => console.log(val));

     

    Import {interval,timer} fromrxjs

    Imterval  基于给定时间间隔发出数字序列

    Timer  基于给定时间单次发送数字序列

    const source = interval(1000);

         source.subscribe(e => {

          console.log(e);

        });

        const timers = timer(2000);

        timers.subscribe(e => {

          console.log(e);

        });

     

    Import {of} from rxjs

    Of 依次发出提供的任意数量的值

    const sourse = of(1234'测试1''测试2''测试3'5);

        sourse.subscribe(e => {

          console.log(e);

        });

    其发送的内容共包括对象数组和函数

    const sourse = of({ name: '丛草' }, [123], function get() {

          return 'lalala';

        });

        sourse.subscribe(e => {

          console.log(e);

        });

     

    Import {range} fromrxjs

    range依次发出给定区间内的数字。

        const source = range(210);

        source.subscribe(e => {

          console.log(e);

        });

     

    Import {throwErrow}fromrxjs

    Import{catchError}fromrxjs/operators

    catchError 优雅地处理 observable 序列中的错误

        const source = throwError('this is error');

        const data = source.pipe(catchError(val => of(`I get:${val}`)));

        data.subscribe(res => {

          console.log(res);

        });

     

    Import {retry}fromrxjs/operators;

    Retry 如果发生错误,以指定次数重试observable序列

        const source = interval(1000);

        const example = source.pipe(

          mergeMap(val => {

            if (val > 6) {

              return throwError('this is errow');

            } else {

              return of(val);

            }

          }),

          retry(2),

        );

        example.subscribe({

          next: val => console.log(val),

          error: val => console.log(`${val}: Retried 2 times then quit!`),

        });

     

     

    import{retryWhendelayWhen} from rxjs/operators

    retryWhen 当发生错误时,基于自定义的标准来重试 observable 序列。

    const source = interval(1000);

        const example = source.pipe(

          map(val => {

            if (val > 3) {

              throw val;

            } else {

              return val;

            }

          }),

          retryWhen(errors =>

            errors.pipe(

              tap(val => console.log(`Value ${val} was too high!`)),

              // 5秒后重启

              delayWhen(val => timer(val * 1000)),

            ),

          ),

        );

        example.subscribe({

          next: val => console.log(val),

          error: val => console.log(`${val}: Retried 2 times then quit!`),

        });

     

    import{combineAll} fromrxjs/operators; combineAll中文 结合所有)

    combineAll 收集内部完成的observables

       const source = interval(1000).pipe(take(1));

        const example = source.pipe(

          map(val =>

            interval(1000).pipe(

              map(i => `Result (${val}): ${i}`),

              take(5),

            ),

          ),

        );

        const combined = example.pipe(combineAll());

        combined.subscribe(val => console.log(val));

     

    Import{combineLatest}fromrxjs   combineLatest (中文 聚合)

    combineLatest 当任意 observable 发出值时,发出每个 observable 的最新值

        const one = timer(10003000);

        const tow = timer(20004000);

        const three = timer(30006000);

        const data = combineLatest(onetowthree);

        data.subscribe(val => {

          const [abc] = val;

          console.log(a + 'aaaaa' + b + 'bbbbb' + c);

        });

     

    combineLatest 第三个参数 收集输出的结果

    const combinedProject = combineLatest(

      timerOne,

      timerTwo,

      timerThree,

      (onetwothree=> {

        return `Timer One (Proj) Latest: ${one}

                  Timer Two (Proj) Latest: ${two}

                  Timer Three (Proj) Latest: ${three}`;

      }

    );

    使用聚合功能 组合两个按钮组

    <div>

      <button id='red'>Red</button>

      <button id='black'>Black</button>

    </div>

    <div>Red: <span id="redTotal"></span> </div>

    <div>Black: <span id="blackTotal"></span> </div>

    <div>Total: <span id="total"></span> </div>

     

    const setHtml = id => val => (document.getElementById(id).innerHTML = val);

        const addOneClick$ = id =>

          fromEvent(document.getElementById(id), 'click').pipe(

            mapTo(2),

            startWith(0),

            scan((acccurr=> acc + curr),

            tap(setHtml(`${id}Total`)),

          );

        combineLatest(addOneClick$('red'), addOneClick$('black'))

          .pipe(map(([val1val2]) => val1 + val2))

          .subscribe(setHtml('total'));

     

    Import{concat} fromrxjs/operators;

    Concat 按照顺序,前一个 observable 完成了再订阅下一个 observable 并发出值

        const source = of(12345667788);

        const source2 = from([12'soclok''ksdj', { name: 'goujieba'age: 111 }]);

        const data = source.pipe(concat(source2)).subscribe(val => console.log(val));

    静态:

    Import {concat}fromrxjs

        const source = of(12345667788);

        const source2 = from([12'soclok''ksdj', { name: 'goujieba'age: 111 }]);

        const data = concat(sourcesource2);

        data.subscribe(val => console.log(val));

     

    Import{concatAll}fromrxjs/operators;

    concatAll 收集 observables,当前一个完成时订阅下一个。

    const source = interval(2000);

        const data = source.pipe(

          map(val => of(val + 10)),

          concatAll(),

        );

        data.subscribe(val => {

          console.log(val);

        });

    CancatAll可连接promise 或者 内部延问题

    const obs1 = interval(1000).pipe(take(5));

        const obs2 = interval(500).pipe(take(2));

        const obs3 = interval(2000).pipe(take(1));

        const source = of(obs1obs2obs3);

        const example = source.pipe(concatAll());

        example.subscribe(val => {

          console.log(val);

        });

     

    Import {forkJoin} fromrxjs

    forkJoin等待序列中所有的observable完成后 以数组返回结果  若其中有未完成的observable将没有返回结果

    const source = val => new Promise(resolve => setTimeout(i => resolve(val), 8000));

        const example = forkJoin(

          // 立即发出 'Hello'

          of('Hello'),

          // 1秒后发出 'World'

          of('World').pipe(delay(1000)),

          // 1秒后发出0

          interval(1000).pipe(take(1)),

          // 以1秒的时间间隔发出0和1

          interval(1000).pipe(take(2)),

          // 5秒后解析 'Promise Resolved' 的 promise

          source('RESULT'),

        );

        // 输出: ["Hello", "World", 0, 1, "Promise Resolved: RESULT"]

        const subscribe = example.subscribe(val => console.log(val));

     

    Import {merge} fromrxjs; 静态方法

    Merge 将多个observables转换为observable,返回的结果顺序是随机的;

        const one = interval(4000);

        const two = interval(2000);

        const three = interval(2500);

        const four = interval(1500);

        const source = merge(

          one.pipe(mapTo('1111')).pipe(take(1)),

          two.pipe(mapTo('22222')).pipe(take(1)),

          three.pipe(mapTo('33333')).pipe(take(1)),

          four.pipe(mapTo('44444')).pipe(take(1)),

        );

        source.subscribe(val => {

          console.log(val);

        });

    Import {merge} fromrxjs/operators; 实例方法

     const one = interval(4000).pipe(

          mapTo('111'),

          take(1),

        );

        const two = interval(2000).pipe(

          mapTo('222'),

          take(1),

        );

        const source = one.pipe(merge(two));

        source.subscribe(val => {

          console.log(val);

        });

     

    Import {mergeAll}fromrxjs/operators;

    mergeAll 收集并订阅所有的ovservables; 并发所有返回的结果

    const mypromise = val => new Promise(resolve => setTimeout(() => resolve(val), 2000));

        const source = of(123454);

        source.pipe(

          delay(1000),

          map(val => mypromise(val)),

          mergeAll(),

        );

        source.subscribe(val => {

          console.log(val);

        });

     

    Import {pairwise}fromrxjs/operators;

    Pairwish 用数组返回前一个及当前值;

      interval(1000)

          .pipe(

            pairwise(),

            take(5),

          )

          .subscribe(console.log);

     

    Import{race}fromrxjs;

    Race 比速度,首先返回先完成的数值,忽略之后的数据

    race(interval(2000), interval(1000), interval(3500), of(234'dfsd')).subscribe(val => console.log(val));

     

     

     

     

    import { startWith } from 'rxjs/operators';

    startWith 给定第一个数值

    const source = interval(1000);

    const example = source.pipe(startWith(-3, -2, -1));

    const subscribe = example.subscribe(val => console.log(val));

     

    import { withLatestFrom} from 'rxjs/operators';

    withLatestFrom 给定另一个值observeable

        const source = interval(1500);

        const info = interval(4500);

        source

          .pipe(

            withLatestFrom(info),

            map(([onetwo]) => {

              return `first:${one},sconed:${two}`;

            }),

          )

          .subscribe(val => console.log(val));

     

    Import {zip}fromrxjs

    Zip 订阅所有的ovserveables 待发出后统一以数组返回结果

    const sourceOne = of('Hello');

    const sourceTwo = of('World!');

    const sourceThree = of('Goodbye');

    const sourceFour = of('World!');

    const example = zip(

      sourceOne,

      sourceTwo.pipe(delay(1000)),

      sourceThree.pipe(delay(2000)),

      sourceFour.pipe(delay(3000))

    );

    const subscribe = example.subscribe(val => console.log(val));

     

    Import{defaultIfEmpty}fromrxjs/operators;

    defaultIfEmpty 当返回的observeables为空时,返回默认的数据

        const exampleOne = of().pipe(defaultIfEmpty('Observable.of() Empty!'));

        const subscribe = exampleOne.subscribe(val => console.log(val));

     

    Import{every}fromrxjs/operators;

    Every 断言所有返回的值,如果通过则返回true ,否则返回false;

        const source = from([444333111222555666]);

        source.pipe(every(val => val > 100)).subscribe(val => {

          console.log(val);

        });

     

    Import {debounce}fromrxjs;

    Import{debounceTime}fromrxjs/operators;

    Debounce 输出给定时间范围的数值

        const source = interval(1000);

        source.pipe(debounce(val => timer(val * 200))).subscribe(val => console.log(val));

    DebounceTime 输出一定时间返回内的数据,可用于放反跳;多次触发不必要的事件

      ngAfterViewInit() {

        const btn = document.getElementById('btn');

        fromEvent(btn'click')

          .pipe(

            debounceTime(100),

            mergeMap(val => this.http.post('/admin/transactions/list'fiterObj(this.form.value))),

          )

          .subscribe(i => console.log(i));

      }

     

    debouneceTimeswitchMap同样可作为防止多余数据请求的方法,switchMap可以取消在一定范围内的请求,debouneceTime(debounece)则只会在一定范围内生效一条数据请求

     

    Import{distinctUntilChanged}fromrxjs/operators;

    distinctUntilChanged 检测栈数据是否重复,数据发生变化才返回,堆中数据不变;

    const source = from([12334455'11112k,3', { name: 'ppp' }, { name: 'ppp' }, { name: 'aaa' }]);

        source.pipe(distinctUntilChanged()).subscribe(val => console.log(val));

     

    Import {filter}fromrxjs/operators;

    Filter过滤各类数据返回需要数值

        const source = from([12334455'11112k,3', { name: 'ppp' }, { name: 'ppp' }, { name: 'aaa' }]);

        source.pipe(filter(val => val > 2)).subscribe(val => console.log(val));

     

    Import {first}fromrxjs;

    First 没有参数是则返回第一项,有参数则返回通过断言的第一项

    const source = from([12345]);

    // 没有参数则发出第一个值

    const example = source.pipe(first());

    // 输出: "First value: 1"

    const subscribe = example.subscribe(val => console.log(`First value: ${val}`));

    Firstnum =>num ===5

    const source = from([12345]);

    // 发出通过测试的第一项

    const example = source.pipe(first(num => num === 5));

    // 输出: "First to pass test: 5"

    const subscribe = example.subscribe(val =>

      console.log(`First to pass test: ${val}`)

    );

     

    Import{ignoreElements}fromrxjs/operators;

    ignoreElements 忽略所有的next(),只返回error()complete()

    const source = interval(100);

    // 略所有值,只发出 complete

    const example = source.pipe(

      take(5),

      ignoreElements()

    );

    // 输出: "COMPLETE!"

    const subscribe = example.subscribe(

      val => console.log(`NEXT: ${val}`),

      val => console.log(`ERROR: ${val}`),

      () => console.log('COMPLETE!')

    );

     

    Import{last}fromrxjs/operators;

    last没有参数则返回最后一个值,具体参照first()

    const source = from([1, 2, 3, 4, 5]);

    // 没有参数则发出最后一个值

    const example = source.pipe(last());

    // 输出: "Last value: 5"

    const subscribe = example.subscribe(val => console.log(`Last value: ${val}`));

     

    Import{sample}fromrxjs/operators;

    sample从内部发出源从取样;

    const source = interval(1000);

    // 每2秒对源 observable 最新发出的值进行取样

    const example = source.pipe(sample(interval(2000)));

    // 输出: 2..4..6..8..

    const subscribe = example.subscribe(val => console.log(val));

     

    Import{single}fromrxjs/operators;

    Single 发出通过表达式的一项内容,且返回的结果只有一项,没什么卵用

        const source = from([12345]);

        const example = source.pipe(single(val => val === 3));

        example.subscribe(val => console.log(val));

     

    Import{skip}fromrxjs/operators;

    skip跳过参数内的数据

    const source = interval(1000);

    const example = source.pipe(skip(5));

    // 输出: 5...6...7...8........

    const subscribe = example.subscribe(val => console.log(val));

     

    Import{skipUntil}fromrxjs/operators;

    skipUntil 跳过源obeservable中的值,知道直到的observable发出值;

    const source = interval(1000);

    const example = source.pipe(skipUntil(timer(6000)));

    // 输出: 5...6...7...8........

    const subscribe = example.subscribe(val => console.log(val));

     

    Import{skipWhile}fromrxjs/operators;

    skipWhile skipUntil相反 ,跳过源obeservable的值,直到内部的断言为false

    const source = interval(1000);

    const example = source.pipe(skipWhile(val => val < 5));

    // 输出: 5...6...7...8........

    const subscribe = example.subscribe(val => console.log(val));

     

    Import{take}fromrxjs/operators;

    Take 从源obeservable中取指定参数的值

    const interval$ = interval(1000);

    const example = interval$.pipe(take(5));

    // 输出: 0,1,2,3,4

    const subscribe = example.subscribe(val => console.log(val));

     

    Import{takeUntil}fromrxjs/operators;

    takeUntil 从源obeservable发出值,直到内部obeservable发出值为止

    const source = interval(1000);

    const timer$ = timer(5000);

    const example = source.pipe(takeUntil(timer$));

    // 输出: 0,1,2,3

    const subscribe = example.subscribe(val => console.log(val));

     

    Import{takeWhile}fromrxjs/operators;

    takeWhile 发出值,直到内部obeservable发出值为止

    const source = of(12345);

    const example = source.pipe(takeWhile(val => val <= 4));

    // 输出: 1,2,3,4

    const subscribe = example.subscribe(val => console.log(val));

     

    Import{throttle}fromrxjs/operatolrs;

    Throttle 以某个时间为阈值,抑制源obeservable发出值

    const source = interval(1000);

    const example = source.pipe(throttle(val => interval(2000)));

    // 输出: 0...3...6...9

    const subscribe = example.subscribe(val => console.log(val));

     

    Import{throttleTime}fromrxjs/operators;

    torottleTimethrottle的简写

    const example = source.pipe(throttleTime(5000));

     

     

    TransFrommation 转化

    Import{buffer}fromrxjs/operators;

    Buffer 收集源obeservable发出的值直到提供的obeserveable发出才将数值以数组形式发出

     

        const myInterval = interval(1000);

        const bufferBy = fromEvent(document'click');

        const myBufferedInterval = myInterval.pipe(buffer(bufferBy));

        // 例如 输出: [1,2,3] ... [4,5,6,7,8]

        const subscribe = myBufferedInterval.subscribe(val => console.log(' Buffered Values:'val));

     

     

    Import {bufferCount}fromrxjs/operators;

    bufferCount收集发出的值,直到收集完提供数量的值才以数组发出

        const myInterval = interval(1000);

        const myBufferedInterval = myInterval.pipe(bufferCount(3));

        myBufferedInterval.subscribe(val => console.log(' Buffered Values:'val));

     

    Import{bufferTime}fromrxjs/operators;

    bufferTime收集指定时间内的值,直到指定时间内才以数组发出

    const myInterval = interval(1000);

        const myBufferedInterval = myInterval.pipe(bufferTime(3000));

        myBufferedInterval.subscribe(val => console.log(' Buffered Values:'val));

     

    Import{bufferToggle}fromrxjs/operators;

    bufferToggle 开启开关捕获observable的数值,关闭开关将数值以数组发出

        const sourceInterval = interval(1000);

        // 5秒后开启第一个缓冲区,然后每5秒钟开启新的缓冲区

        const startInterval = interval(5000);

        // 3秒后发出值以关闭相应的缓冲区

        const closingInterval = val => {

          console.log(`Value ${val} emitted, starting buffer! Closing in 3s!`);

          return interval(3000);

        };

        // 每5秒会开启一个新的缓冲区以收集发出的值,3秒后发出缓冲的值

        const bufferToggleInterval = sourceInterval.pipe(bufferToggle(startIntervalclosingInterval));

        // 输出到控制台

        // 输出: Emitted Buffer: [4,5,6]...[9,10,11]

        const subscribe = bufferToggleInterval.subscribe(val => console.log('Emitted Buffer:'val));

     

    Import {bufferWhen}fromrxjs/operators;

    bufferWhen 缓冲指定时间内的值,关闭缓冲以数组发出数值

        const oneSecondInterval = interval(1000);

        // 返回的 observable 每5秒发出值

        const fiveSecondInterval = () => interval(5000);

        // 每5秒发出缓冲的值

        const bufferWhenExample = oneSecondInterval.pipe(bufferWhen(fiveSecondInterval));

        // 输出值

        // 输出: [0,1,2,3]...[4,5,6,7,8]

        const subscribe = bufferWhenExample.subscribe(val => console.log('Emitted Buffer: 'val));

     

  • 相关阅读:
    nyist 541最强DE 战斗力
    nyist 231 Apple Tree
    nyist 543 遥 控 器
    nyist 233 Sort it
    nyist 517 最小公倍数
    hdu 1sting
    nyist A+B Problem IV
    nyist 522 Interval
    nyist 117 求逆序数
    nyist 600 花儿朵朵
  • 原文地址:https://www.cnblogs.com/bomdeyada/p/12433341.html
Copyright © 2011-2022 走看看