zoukankan      html  css  js  c++  java
  • ReactiveX 学习笔记(30)操作符辨析

    RxJava: merge/concat/switch

    RxJS: merge/concat/switch/exhaust

    RxSwift: merge/concat/switchLatest

    merge/concat/switch/switchLatest/exhaust 合并两个或多个源数据流。

    Concat VS Merge operator

    merge


    concat


    switch


    exhaust

    从示意图中可以看出,这些操作符在合并源数据流时操作有所不同

    • merge 无视各个源数据流整体的发送顺序,只按照合并后各个数据的发送顺序来发送数据。
    • concat 严格按照各个源数据流整体的发送顺序来发送数据,即首先发送第一个源数据流中的所有数据,然后再发送第二个源数据流中的所有数据,以此类推。
    • switch/switchLatest 兼顾各个源数据流整体的发送顺序以及合并后各个数据的发送顺序,即首先发送第一个源数据流中的数据,等到第二个源数据流开始发送数据再转向发送后者中的数据,以此类推。
      与 concat 有所不同的是,在发送前一个数据流中数据的途中,如果接收到后一个数据流中的数据,那么 switch/switchLatest 会抛弃前一个数据流中所有余下的数据,转而发送后一个数据流中的数据。
    • exhaust 兼顾各个源数据流整体的发送顺序以及合并后各个数据的发送顺序,即首先发送第一个源数据流中的所有数据,然后再发送第二个源数据流在前者发送完毕之后才开始发送的所有余下的数据,以此类推。
      与 concat 有所不同的是,在前一个数据流中数据发送完毕后,exhaust 会忽略后一个数据流在此时间点之前已经发送的数据,只发送后一个数据流在此时间点之后发送的数据。

    RxJava: flatMap/concatMap/switchMap

    RxJS: mergeMap/concatMap/switchMap/exhaustMap

    RxSwift: flatMap/concatMap/flatMapLatest/flatMapFirst

    flatMap/mergeMap/concatMap/switchMap/flatMapLatest/exhaustMap 转换数据流:将源数据流的每一项都转换成数据流,从而形成数据流的数据流,最后再平坦化将两维数据流合并成一个数据流。

    RxJava: FlatMap, SwitchMap and ConcatMap differences & examples

    flatMap


    concatMap


    switchMap


    exhaustMap

    从示意图中可以看出,这些操作符在合并由源数据流中的数据所生成的各个目标数据流时操作有所不同

    • flatMap/mergeMap 无视源数据流中数据的发送顺序,只按照各个目标数据流中数据的发送顺序来发送数据。
      flatMap/mergeMap 在合并目标数据流时使用 merge 操作符
    • concatMap 严格按照源数据流中数据的发送顺序来发送各个目标数据流中的数据。
      即首先发送源数据流第一项数据所生成的目标数据流中的所有数据,然后再发送源数据流第二项数据所生成的目标数据流中的所有数据,以此类推。
      concatMap 在合并目标数据流时使用 concat 操作符
    • switchMap/flatMapLatest 兼顾源数据流中数据的发送顺序和各个目标数据流中数据的发送顺序,即首先发送源数据流第一项数据所生成的目标数据流中的数据,等到源数据流第二项数据所生成的目标数据流开始发送数据时再转向发送后者中的数据,以此类推。
      与 concatMap 有所不同的是,在发送源数据流前一项数据所生成数据的途中,如果接收到源数据流后一项数据所生成的数据,那么 switchMap/flatMapLatest 会抛弃源数据流前一项数据所生成的所有余下数据,转而发送源数据流后一项数据所生成的数据。
      switchMap/flatMapLatest 在合并目标数据流时使用 switch 操作符
    • exhaustMap/flatMapFirst 兼顾源数据流中数据的发送顺序和各个目标数据流中数据的发送顺序,即首先发送源数据流第一项数据所生成的目标数据流中的所有数据,然后再发送源数据流第二项数据所生成的目标数据流在前者发送完毕之后所发送的所有余下的数据,以此类推。
      与 concatMap 有所不同的是,在源数据流前一项数据所生成的数据发送完毕之后,exhaustMap 会忽略在此时间点之前源数据流后一项数据所生成的数据,只发送源数据流后一项数据在此时间点之后所生成的所有余下数据。
      exhaustMap/flatMapFirst 在合并目标数据流时使用 exhaust 操作符
    val items = listOf("a", "b", "c", "d", "e", "f")
    val scheduler = TestScheduler()
    items.toObservable()
        .flatMap { s ->
            val delay = Random().nextInt(10)
            Observable.just(s + "x")
                .delay(delay.toLong(), TimeUnit.SECONDS, scheduler)
        }.toList()
        .doAfterSuccess { println(it) }
        .subscribe()
    scheduler.advanceTimeBy(1, TimeUnit.MINUTES)
    /*
    [dx, ex, bx, cx, fx, ax]
    */
    
    val items = listOf("a", "b", "c", "d", "e", "f")
    val scheduler = TestScheduler()
    items.toObservable()
        .concatMap { s ->
            val delay = Random().nextInt(10)
            Observable.just(s + "x")
                .delay(delay.toLong(), TimeUnit.SECONDS, scheduler)
        }.toList()
        .doAfterSuccess { println(it) }
        .subscribe()
    scheduler.advanceTimeBy(1, TimeUnit.MINUTES)
    /*
    [ax, bx, cx, dx, ex, fx]
    */
    
    val items = listOf("a", "b", "c", "d", "e", "f")
    val scheduler = TestScheduler()
    items.toObservable()
        .switchMap { s ->
            val delay = Random().nextInt(10)
            Observable.just(s + "x")
                .delay(delay.toLong(), TimeUnit.SECONDS, scheduler)
        }.toList()
        .doAfterSuccess { println(it) }
        .subscribe()
    scheduler.advanceTimeBy(1, TimeUnit.MINUTES)
    /*
    [fx]
    */
    

    Rx.NET: ManySelect/SelectMany

    ManySelect 转换数据流:为源数据流的每一项数据都生成一个新的数据流(该数据流为源数据流中的当前数据以及之后发送的所有数据),形成数据流的数据流。
    SelectMany 转换数据流:将源数据流的每一项都转换成数据流,从而形成数据流的数据流,最后再平坦化将两维数据流合并成一个数据流。

    What does the new ManySelect operator do?

    manySelect


    flatMap

    Observable.Range(1, 10).ManySelect(xs => xs.Sum(), Scheduler.CurrentThread).Concat().Dump("ManySelect");
    /*
    ManySelect-->55
    ManySelect-->54
    ManySelect-->52
    ManySelect-->49
    ManySelect-->45
    ManySelect-->40
    ManySelect-->34
    ManySelect-->27
    ManySelect-->19
    ManySelect-->10
    ManySelect completed
    */
    Observable.Range(1, 10).SelectMany(x => Observable.Range(x, 10 - x + 1).Sum()).Dump("SelectMany");
    /*
    SelectMany-->55
    SelectMany-->54
    SelectMany-->52
    SelectMany-->49
    SelectMany-->45
    SelectMany-->40
    SelectMany-->34
    SelectMany-->27
    SelectMany-->19
    SelectMany-->10
    SelectMany completed
    */
    

    Rx.NET: Zip/CombineLatest/WithLatestFrom/ForkJoin

    RxJava: zip/combineLatest/withLatestFrom

    RxJS: zip/combineLatest/withLatestFrom/forkJoin

    RxSwift: zip/combineLatest/withLatestFrom

    Zip 合并两个数据流:将第一个数据流所发送的第一个数据与第二个数据流所发送的第一个数据相结合,然后再结合两个数据流的第二个数据,以此类推。结合的条件是两个数据流都发送过第 n 个数据。
    CombineLatest 合并两个数据流:两个数据流中的任何一个发送新数据时,将此数据与另一个数据流最近发送的数据相结合。结合的条件是当一个数据流发送数据时,另一个数据流至少已经发送过一个数据。
    WithLatestFrom 合并两个数据流:第一个数据流发送新数据时,将此数据与第二个数据流最近发送的数据相结合。结合的条件是当第一个数据流发送数据时,第二个数据流至少已经发送过一个数据。
    ForkJoin 合并两个数据流:将两个数据流所发送的最后一个数据相结合。结合的条件是两个数据流均正常结束并且至少已经发送过一个数据。

    Zip


    CombineLatest


    WithLatestFrom


    ForkJoin
    ForkJoin

    IObservable<int> Xs
    {
        get { return Generate(0, new List<int> { 1, 2, 2, 2, 2 }); }
    }
    IObservable<int> Ys
    {
        get { return Generate(100, new List<int> { 2, 2, 2, 2, 2 }); }
    }
    IObservable<int> Generate(int initialValue, IList<int> intervals)
    {
        // work-around for Observable.Generate calling timeInterval before resultSelector
        intervals.Add(0);
    
        return Observable.Generate(initialValue,
                                    x => x < initialValue + intervals.Count - 1,
                                    x => x + 1,
                                    x => x,
                                    x => TimeSpan.FromSeconds(intervals[x - initialValue]));
    }
    // Merge
    using (Xs.Merge(Ys).Timestamp().Subscribe(
        z => Console.WriteLine("{0,3}: {1}", z.Value, z.Timestamp),
        () => Console.WriteLine("Completed, press a key")))
    {
        Console.ReadKey();
    }
    /*
      0: 2020/07/30 5:13:48 +00:00
    100: 2020/07/30 5:13:49 +00:00
      1: 2020/07/30 5:13:50 +00:00
    101: 2020/07/30 5:13:51 +00:00
      2: 2020/07/30 5:13:52 +00:00
    102: 2020/07/30 5:13:53 +00:00
      3: 2020/07/30 5:13:54 +00:00
    103: 2020/07/30 5:13:55 +00:00
      4: 2020/07/30 5:13:56 +00:00
    104: 2020/07/30 5:13:57 +00:00
    Completed, press a key
    */
    // Zip
    using (Xs.Zip(Ys, (x, y) => (x, y)).Timestamp().Subscribe(
        z => Console.WriteLine("{0}: {1}", z.Value, z.Timestamp),
        () => Console.WriteLine("Completed, press a key")))
    {
        Console.ReadKey();
    }
    /*
    (0, 100): 2020/07/30 5:14:00 +00:00
    (1, 101): 2020/07/30 5:14:02 +00:00
    (2, 102): 2020/07/30 5:14:04 +00:00
    (3, 103): 2020/07/30 5:14:06 +00:00
    (4, 104): 2020/07/30 5:14:08 +00:00
    Completed, press a key
    */
    // CombineLatest
    using (Xs.CombineLatest(Ys, (x, y) => (x, y)).Timestamp().Subscribe(
        z => Console.WriteLine("{0}: {1}", z.Value, z.Timestamp),
        () => Console.WriteLine("Completed, press a key")))
    {
        Console.ReadKey();
    }
    /*
    (0, 100): 2020/07/30 5:14:11 +00:00
    (1, 100): 2020/07/30 5:14:12 +00:00
    (1, 101): 2020/07/30 5:14:13 +00:00
    (2, 101): 2020/07/30 5:14:14 +00:00
    (2, 102): 2020/07/30 5:14:15 +00:00
    (3, 102): 2020/07/30 5:14:16 +00:00
    (3, 103): 2020/07/30 5:14:17 +00:00
    (4, 103): 2020/07/30 5:14:19 +00:00
    (4, 104): 2020/07/30 5:14:19 +00:00
    Completed, press a key
    */
    // WithLatestFrom
    using (Xs.WithLatestFrom(Ys, (x, y) => (x, y)).Timestamp().Subscribe(
        z => Console.WriteLine("{0}: {1}", z.Value, z.Timestamp),
        () => Console.WriteLine("Completed, press a key")))
    {
        Console.ReadKey();
    }
    /*
    (1, 100): 2020/07/30 5:14:24 +00:00
    (2, 101): 2020/07/30 5:14:26 +00:00
    (3, 102): 2020/07/30 5:14:28 +00:00
    (4, 103): 2020/07/30 5:14:30 +00:00
    Completed, press a key
    */
    // ForkJoin
    using (Xs.ForkJoin(Ys, (x, y) => (x, y)).Timestamp().Subscribe(
        z => Console.WriteLine("{0}: {1}", z.Value, z.Timestamp),
        () => Console.WriteLine("Completed, press a key")))
    {
        Console.ReadKey();
    }
    /*
    (4, 104): 2020/07/30 17:15:31 +00:00
    Completed, press a key
    */
    
  • 相关阅读:
    UNIX网络编程——非阻塞connect
    UNIX网络编程——非阻塞式I/O(套接字)
    UNIX网络编程——使用select 实现套接字I/O超时
    UNIX网络编程——设置套接字超时
    UNIX网络编程——名字与地址转换(gethostbyname,gethostbyaddr,getservbyname,getservbyport,getaddrinfo,getnameinfo函数)
    [Java] Java API文档下载方法
    [Selenium] 在Grid模式下打印出当前Case是在哪台Node上运行
    [Selenium] 针对下拉菜单出现之后又立马消失的问题,通过Javascript改变元素的可见属性
    [Java] 通过XPath获取XML中某个节点的属性
    [Java] 获取当前Project所在的路径
  • 原文地址:https://www.cnblogs.com/zwvista/p/11065188.html
Copyright © 2011-2022 走看看