RxJava: merge/concat/switch
RxJS: merge/concat/switch/exhaust
RxSwift: merge/concat/switchLatest
merge/concat/switch/switchLatest/exhaust 合并两个或多个源数据流。
从示意图中可以看出,这些操作符在合并源数据流时操作有所不同
- merge 无视各个源数据流整体的发送顺序,只按照合并后各个数据的发送顺序来发送数据。
- concat 严格按照各个源数据流整体的发送顺序来发送数据,即首先发送第一个源数据流中的所有数据,然后再发送第二个源数据流中的所有数据,以此类推。
- switch/switchLatest 兼顾各个源数据流整体的发送顺序以及合并后各个数据的发送顺序,即首先发送第一个源数据流中的数据,等到第二个源数据流开始发送数据再转向发送后者中的数据,以此类推。
与 concat 有所不同的是,在发送前一个数据流中数据的途中,如果接收到后一个数据流中的数据,那么 switch/switchLatest 会抛弃前一个数据流中所有余下的数据,转而发送后一个数据流中的数据。 - exhaust 兼顾各个源数据流整体的发送顺序以及合并后各个数据的发送顺序,即首先发送第一个源数据流中的所有数据,然后再发送第二个源数据流在前者发送完毕之后才开始发送的所有余下的数据,以此类推。
与 concat 有所不同的是,在前一个数据流中数据发送完毕后,exhaust 会忽略后一个数据流在此时间点之前已经发送的数据,只发送后一个数据流在此时间点之后发送的数据。
RxJava: flatMap/concatMap/switchMap
RxJS: mergeMap/concatMap/switchMap/exhaustMap
RxSwift: flatMap/concatMap/flatMapLatest/flatMapFirst
flatMap/mergeMap/concatMap/switchMap/flatMapLatest/exhaustMap 转换数据流:将源数据流的每一项都转换成数据流,从而形成数据流的数据流,最后再平坦化将两维数据流合并成一个数据流。
RxJava: FlatMap, SwitchMap and ConcatMap differences & examples
从示意图中可以看出,这些操作符在合并由源数据流中的数据所生成的各个目标数据流时操作有所不同
- flatMap/mergeMap 无视源数据流中数据的发送顺序,只按照各个目标数据流中数据的发送顺序来发送数据。
flatMap/mergeMap 在合并目标数据流时使用 merge 操作符 - concatMap 严格按照源数据流中数据的发送顺序来发送各个目标数据流中的数据。
即首先发送源数据流第一项数据所生成的目标数据流中的所有数据,然后再发送源数据流第二项数据所生成的目标数据流中的所有数据,以此类推。
concatMap 在合并目标数据流时使用 concat 操作符 - switchMap/flatMapLatest 兼顾源数据流中数据的发送顺序和各个目标数据流中数据的发送顺序,即首先发送源数据流第一项数据所生成的目标数据流中的数据,等到源数据流第二项数据所生成的目标数据流开始发送数据时再转向发送后者中的数据,以此类推。
与 concatMap 有所不同的是,在发送源数据流前一项数据所生成数据的途中,如果接收到源数据流后一项数据所生成的数据,那么 switchMap/flatMapLatest 会抛弃源数据流前一项数据所生成的所有余下数据,转而发送源数据流后一项数据所生成的数据。
switchMap/flatMapLatest 在合并目标数据流时使用 switch 操作符 - exhaustMap/flatMapFirst 兼顾源数据流中数据的发送顺序和各个目标数据流中数据的发送顺序,即首先发送源数据流第一项数据所生成的目标数据流中的所有数据,然后再发送源数据流第二项数据所生成的目标数据流在前者发送完毕之后所发送的所有余下的数据,以此类推。
与 concatMap 有所不同的是,在源数据流前一项数据所生成的数据发送完毕之后,exhaustMap 会忽略在此时间点之前源数据流后一项数据所生成的数据,只发送源数据流后一项数据在此时间点之后所生成的所有余下数据。
exhaustMap/flatMapFirst 在合并目标数据流时使用 exhaust 操作符
val items = listOf("a", "b", "c", "d", "e", "f")
val scheduler = TestScheduler()
items.toObservable()
.flatMap { s ->
val delay = Random().nextInt(10)
Observable.just(s + "x")
.delay(delay.toLong(), TimeUnit.SECONDS, scheduler)
}.toList()
.doAfterSuccess { println(it) }
.subscribe()
scheduler.advanceTimeBy(1, TimeUnit.MINUTES)
/*
[dx, ex, bx, cx, fx, ax]
*/
val items = listOf("a", "b", "c", "d", "e", "f")
val scheduler = TestScheduler()
items.toObservable()
.concatMap { s ->
val delay = Random().nextInt(10)
Observable.just(s + "x")
.delay(delay.toLong(), TimeUnit.SECONDS, scheduler)
}.toList()
.doAfterSuccess { println(it) }
.subscribe()
scheduler.advanceTimeBy(1, TimeUnit.MINUTES)
/*
[ax, bx, cx, dx, ex, fx]
*/
val items = listOf("a", "b", "c", "d", "e", "f")
val scheduler = TestScheduler()
items.toObservable()
.switchMap { s ->
val delay = Random().nextInt(10)
Observable.just(s + "x")
.delay(delay.toLong(), TimeUnit.SECONDS, scheduler)
}.toList()
.doAfterSuccess { println(it) }
.subscribe()
scheduler.advanceTimeBy(1, TimeUnit.MINUTES)
/*
[fx]
*/
Rx.NET: ManySelect/SelectMany
ManySelect 转换数据流:为源数据流的每一项数据都生成一个新的数据流(该数据流为源数据流中的当前数据以及之后发送的所有数据),形成数据流的数据流。
SelectMany 转换数据流:将源数据流的每一项都转换成数据流,从而形成数据流的数据流,最后再平坦化将两维数据流合并成一个数据流。
What does the new ManySelect operator do?
Observable.Range(1, 10).ManySelect(xs => xs.Sum(), Scheduler.CurrentThread).Concat().Dump("ManySelect");
/*
ManySelect-->55
ManySelect-->54
ManySelect-->52
ManySelect-->49
ManySelect-->45
ManySelect-->40
ManySelect-->34
ManySelect-->27
ManySelect-->19
ManySelect-->10
ManySelect completed
*/
Observable.Range(1, 10).SelectMany(x => Observable.Range(x, 10 - x + 1).Sum()).Dump("SelectMany");
/*
SelectMany-->55
SelectMany-->54
SelectMany-->52
SelectMany-->49
SelectMany-->45
SelectMany-->40
SelectMany-->34
SelectMany-->27
SelectMany-->19
SelectMany-->10
SelectMany completed
*/
Rx.NET: Zip/CombineLatest/WithLatestFrom/ForkJoin
RxJava: zip/combineLatest/withLatestFrom
RxJS: zip/combineLatest/withLatestFrom/forkJoin
RxSwift: zip/combineLatest/withLatestFrom
Zip 合并两个数据流:将第一个数据流所发送的第一个数据与第二个数据流所发送的第一个数据相结合,然后再结合两个数据流的第二个数据,以此类推。结合的条件是两个数据流都发送过第 n 个数据。
CombineLatest 合并两个数据流:两个数据流中的任何一个发送新数据时,将此数据与另一个数据流最近发送的数据相结合。结合的条件是当一个数据流发送数据时,另一个数据流至少已经发送过一个数据。
WithLatestFrom 合并两个数据流:第一个数据流发送新数据时,将此数据与第二个数据流最近发送的数据相结合。结合的条件是当第一个数据流发送数据时,第二个数据流至少已经发送过一个数据。
ForkJoin 合并两个数据流:将两个数据流所发送的最后一个数据相结合。结合的条件是两个数据流均正常结束并且至少已经发送过一个数据。
IObservable<int> Xs
{
get { return Generate(0, new List<int> { 1, 2, 2, 2, 2 }); }
}
IObservable<int> Ys
{
get { return Generate(100, new List<int> { 2, 2, 2, 2, 2 }); }
}
IObservable<int> Generate(int initialValue, IList<int> intervals)
{
// work-around for Observable.Generate calling timeInterval before resultSelector
intervals.Add(0);
return Observable.Generate(initialValue,
x => x < initialValue + intervals.Count - 1,
x => x + 1,
x => x,
x => TimeSpan.FromSeconds(intervals[x - initialValue]));
}
// Merge
using (Xs.Merge(Ys).Timestamp().Subscribe(
z => Console.WriteLine("{0,3}: {1}", z.Value, z.Timestamp),
() => Console.WriteLine("Completed, press a key")))
{
Console.ReadKey();
}
/*
0: 2020/07/30 5:13:48 +00:00
100: 2020/07/30 5:13:49 +00:00
1: 2020/07/30 5:13:50 +00:00
101: 2020/07/30 5:13:51 +00:00
2: 2020/07/30 5:13:52 +00:00
102: 2020/07/30 5:13:53 +00:00
3: 2020/07/30 5:13:54 +00:00
103: 2020/07/30 5:13:55 +00:00
4: 2020/07/30 5:13:56 +00:00
104: 2020/07/30 5:13:57 +00:00
Completed, press a key
*/
// Zip
using (Xs.Zip(Ys, (x, y) => (x, y)).Timestamp().Subscribe(
z => Console.WriteLine("{0}: {1}", z.Value, z.Timestamp),
() => Console.WriteLine("Completed, press a key")))
{
Console.ReadKey();
}
/*
(0, 100): 2020/07/30 5:14:00 +00:00
(1, 101): 2020/07/30 5:14:02 +00:00
(2, 102): 2020/07/30 5:14:04 +00:00
(3, 103): 2020/07/30 5:14:06 +00:00
(4, 104): 2020/07/30 5:14:08 +00:00
Completed, press a key
*/
// CombineLatest
using (Xs.CombineLatest(Ys, (x, y) => (x, y)).Timestamp().Subscribe(
z => Console.WriteLine("{0}: {1}", z.Value, z.Timestamp),
() => Console.WriteLine("Completed, press a key")))
{
Console.ReadKey();
}
/*
(0, 100): 2020/07/30 5:14:11 +00:00
(1, 100): 2020/07/30 5:14:12 +00:00
(1, 101): 2020/07/30 5:14:13 +00:00
(2, 101): 2020/07/30 5:14:14 +00:00
(2, 102): 2020/07/30 5:14:15 +00:00
(3, 102): 2020/07/30 5:14:16 +00:00
(3, 103): 2020/07/30 5:14:17 +00:00
(4, 103): 2020/07/30 5:14:19 +00:00
(4, 104): 2020/07/30 5:14:19 +00:00
Completed, press a key
*/
// WithLatestFrom
using (Xs.WithLatestFrom(Ys, (x, y) => (x, y)).Timestamp().Subscribe(
z => Console.WriteLine("{0}: {1}", z.Value, z.Timestamp),
() => Console.WriteLine("Completed, press a key")))
{
Console.ReadKey();
}
/*
(1, 100): 2020/07/30 5:14:24 +00:00
(2, 101): 2020/07/30 5:14:26 +00:00
(3, 102): 2020/07/30 5:14:28 +00:00
(4, 103): 2020/07/30 5:14:30 +00:00
Completed, press a key
*/
// ForkJoin
using (Xs.ForkJoin(Ys, (x, y) => (x, y)).Timestamp().Subscribe(
z => Console.WriteLine("{0}: {1}", z.Value, z.Timestamp),
() => Console.WriteLine("Completed, press a key")))
{
Console.ReadKey();
}
/*
(4, 104): 2020/07/30 17:15:31 +00:00
Completed, press a key
*/