RxJs 中创建操作符是创建数据流的起点,这些操作符可以凭空创建一个流或者是根据其它数据形式创建一个流。 Observable的构造函数可以直接创建一个数据流,比如:
const $source=new Observable(observer=>{ observer.next(1); observer.next(2); observer.next(3); })
但是在真正的使用过程中很少使用这种方式去创建,RxJx 提供了大量的创建操作符供我们在开发中去使用。创建型操作符打不风都是静态操作符。
一、创建同步数据流
同步数据流,或者说同步Observable对象,需要关⼼的就是: 1.产⽣哪些数据 2. 数据之间的先后顺序如何。 对于同步数据流,数据之间的时间间隔不存在,所以不需要考虑时间 ⽅⾯的问题。
1、create 最简单的操作符
它的使用像这样:
import {Observable} from 'rxjs/Observable'; const onSubscribe=observer=>{ observer.next(1); observer.next(2); observer.next(3); } const source$=Observable.create(onSubscribe); var theObserver={ next:item=>console.log(item) } // var theObserver={}; // theObserver.next=item=>console.log(item) source$.subscribe(theObserver);//output:1,2,3
2、of 列举数据
of 可以根据一个指定的数据集创建一个Observable
(1)Observabe 的静态方法创建(这种静态创建是将of操作符挂载在Javascript的prototype上类似这样Observable.prototype.of=of 这样就成了Observable的全局函数了)
import { Observable } from "rxjs/Observable"; import 'rxjs/add/observable/of'; const source$=Observable.of(1,2,3); source$.subscribe(console.log);
(2)从rxjs/observable下导入包创建 (这种of没有直接挂载在prototype上)
import { of } from "rxjs/observable/of"; const source$=of(1,2,3); source$.subscribe(console.log);//output 1,2,3
3、range 指定范围
range可以产生一个连续的序列,指定起始值和长度
import { Observable } from 'rxjs/Observable'; import { range } from 'rxjs/add/observable/range'; const source$ = Observable.range(1, 100); source$.subscribe(console.log);//output 1 to 100
range 的第一个参数可以是小数,大师每次只能递增1
像这样
import { Observable } from 'rxjs/Observable'; import { range } from 'rxjs/add/observable/range'; const source$ = Observable.range(1.5, 100); source$.subscribe(console.log);//output 1.5 to 100.5
4、generate 产生一个定制的序列
import { Observable } from "rxjs/Observable"; import { generate } from 'rxjs/add/observable/generate'; const source=Observable.generate(3, condation=>condation<10, step=>step+2, result=>result*result) .subscribe(console.log);//output 9,25,49,91
第一个参数是初始值,第二个是结束的条件,第三个是每次递增的步长,第四个参数是对每次产生结果的一个回调处理,这个操作符和 for循环类似,上面的这个语句的意思是和下面的语句一样
for (let i = 3; i < 10; i = i + 2) {
console.log(i * i);
}
generate 不限于产生数字的序列,比如像 这样:
Observable.generate('a', c=>c.length<5, s=>s+'a', r=>r).subscribe(console.log);//a aa aaa aaaa
5、repeat 重复数据的数据流
这个是一个实例操作符 可以通过下面两种方式导入
import {repeat} from 'rxjs/add/operator/repeat'; //or import {repeat} from 'rxjs/operators/repeat';
将上游的数据流的数据重复设置的次数:
import {repeat} from 'rxjs/add/operator/repeat'; import 'rxjs/add/Observable/of'; import { Observable } from 'rxjs/Observable'; //or //import {repeat} from 'rxjs/operators/repeat'; Observable.of(1,2,3) .repeat(3) .subscribe(console.log);//output:1,2,3 1,2,3 1,2,3
j
将上游数据流重复输出3次, 实质上repeat是通过重复订阅上游数据流来达到重复的效果,如果上游数据流不complete repeat就没有效果。
repeat 的参数标识重复的次数,如果必须输入预期的重复次数参数应该大于0, 如果小于0会进行无限次循环,程序的的执行结果将不可预期。
6、empty 直接产生一个完结的数据流.
import 'rxjs/add/observable/empty'; const source$ = Observable.empty();
7、throw 直接接抛出一个错误
import 'rxjs/add/observable/throw'; const source$ = Observable.throw(new Error('Somthing error'));
因为throw 是javascript的关键字如果不用静态的方式使用导入时赢改是 _throw,避免和javascript关键字冲突
import {_throw} from 'rxjs/observable/throw'; const source$ = _throw(new Error('some error'));
8、never 创建一个Obervable 对象什么都不错,即不吐数据也不出错
import 'rxjs/add/observable/never'; const source$ = Observable.never();
前面的8个操作符都是同步操作符,产生的都是同步数据流。
9、interval 和timer 定时产生数据流
这俩个是最简单的产生异步数据流的操作符, 它们类似javascript中的setInterval 和setTimeout。
interval接受一个参数就是产生从0开始的序列的毫秒数
import { Observable } from "rxjs/Observable"; import { interval } from "rxjs/add/observable/interval"; Observable.interval(1000).subscribe(console.log);
每次只能从0开始且每次只能递增1,如果想改起始数字可以结合map操作符来实现, 如下是从2开始每次递增1.
Observable.interval(1000).map(x=>x+2).subscribe(console.log);
timer地第一个参数可以传数字也也可以穿Date,传数字表示多好毫秒输出0, 如果传入的是Date 表示时间到这个Date时输出0,第二个参数表示间隔的时间,如果第一个参数和第二个参数都输入那就和interval的功能一样了
下面这两条语句输出的结果相同:
import { Observable } from "rxjs/Observable"; import "rxjs/add/observable/interval"; import "rxjs/add/observable/timer"; Observable.interval(1000).subscribe(console.log); Observable.timer(1000,1000).subscribe(console.log);
10、from 可以将一切转化成Observable
import { Observable } from "rxjs/Observable"; import "rxjs/add/observable/from"; import "rxjs/add/observable/of"; Observable.from([1, 2, 3]).subscribe(console.log);//output:1,2,3 Observable.from("Hello").subscribe(console.log);//output:H,e,l,l,o function toObservable() { return Observable.from(arguments);//在JavaScript中,任何⼀个函数体中都可以通过arguments访问所有的调 ⽤参数 } toObservable(1, 2, 3, 4).subscribe(console.log);//output:1,2,3,4 function * generateNumber(max) { for (let i = 1; i <= max; ++i) { yield i; } } Observable.from(generateNumber(3)).subscribe(console.log);//output:1,2,3 const source$=Observable.of(1,2,3); Observable.from(source$).subscribe(console.log);//output:1,2,3
11.fromPromise 异步处理的交接
这操作符在实际使用中用的非常多。
import { Observable } from "rxjs/Observable"; import 'rxjs/add/observable/from'; const promise = Promise.resolve('successfuly'); const source$ = Observable.from(promise); source$.subscribe(console.log, error => console.log('catch', error), () => console.log('complete'));
12.fromEvent 将对DOM的操作转化成Observable
第一个参数是事件源DOM 中就是具体的HTML元素,第二个参数是事件名称 如:click,mouseover等。
HTML:
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>from event</title> <script src="https://unpkg.com/rxjs@5.5.2/bundles/Rx.min.js"></script> </head> <body> <div> <button id="btnTest">I'm button</button> <label id="lblTest"></label> </div> <script src="./fromEventTest.js"></script> </body> </html>
Javascript:
let number = 0; Rx.Observable.fromEvent(document.querySelector('#btnTest'), 'click') .subscribe(x => { document.querySelector('#lblTest').innerText = ++number; });
点击按钮label中的数字递增1.
13、fromEventPattern 更灵活的事件转化成Observable操作符
fromEventPattern接受两个函数参数,分别对应产⽣的Observable对象 被订阅和退订时的动作,因为这两个参数是函数,具体的动作可以任意定 义,所以可以⾮常灵活。
import { Observable } from "rxjs/Observable"; import { EventEmitter } from "events"; import 'rxjs/add/observable/fromEventPattern'; const emitter = new EventEmitter(); const addHandler = handler => { emitter.addListener('msg', handler); } const removeHandler = handler => { emitter.removeListener('msg', handler); } const source$ = Observable.fromEventPattern(addHandler, removeHandler); const subscription = source$.subscribe(console.log, error => console.log("error:" + error), console.log("complete")); emitter.emit('msg', 1); emitter.emit('msg', '2'); emitter.emit('msg', 3); subscription.unsubscribe(); emitter.emit('msg', 4);
14、ajax 通过ajax 返回的结果 创建一个Observable
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>ajax</title> <script src="https://unpkg.com/rxjs@5.5.2/bundles/Rx.min.js"></script> </head> <body> <div> <button id="btnTest">I'm button</button> <label id="lblTest"></label> </div> <script src="./ajax.js"></script> </body> </html>
Javascript
Rx.Observable.fromEvent(document.querySelector("#btnTest"), "click") .subscribe(y => { Rx.Observable.ajax("https://api.github.com/repos/ReactiveX/rxjs") .subscribe(x => { console.log(x); document.querySelector("#lblTest").innerText = x.response; }, error => { console.log("error:" + error); }, () => console.log("complete")); })
15、repeatWhen反复订阅上游的数据流并且可以设置一个等待的时间,repeat 只能反复订阅不能设置时间
import { Observable } from "rxjs"; Observable.of(1,2,3).repeatWhen(()=>{ return Observable.interval(1000); }).subscribe(console.log);
16.defer 延迟资源调用操作符
import 'rxjs/add/observable/defer'; import 'rxjs/add/observable/of'; import {Observable} from 'rxjs/Observable'; const observableFactory = () => Observable.of(1, 2, 3); const source$ = Observable.defer(observableFactory); source$.subscribe(console.log);