zoukankan      html  css  js  c++  java
  • Reactive stream Programming 踩坑记录(zip)

    前提介绍吧:
    用的全是异步非阻塞的操作符,框架选用的是AkkaR2DBC
    最外层流是Akka stream, 内部和DB交互用的是Project Reactor
    先看代码:

        @Test
        void name() throws ExecutionException, InterruptedException {
            final List<Integer> lists = Source.single(1)
                                              .map(any -> Arrays.asList(1, 2, 3))
                                              .flatMapConcat(Source::from)
                                              .grouped(2)
                                              .zip(Source.single(4))
                                              .map(pair -> {
                                                        log.info("map-pair.first:{}", pair.first());
                                                        log.info("map-pair.second:{}", pair.second());
                                                        return pair.first();
                                                    })
                                              .withAttributes(ActorAttributes.withSupervisionStrategy(e -> {
                                                        log.info("withAttributes:{}", e.getMessage());
                                                        return Supervision.resume();
                                                    })).runWith(Sink.head(), actorSystem)
                                              .toCompletableFuture().get();
            Assertions.assertEquals(2, lists.size());
    
        }
    


    从测试结果看只有两个数据,然后从log我们也发现map只被执行了一次。

    解释一下流吧:

    然后就是zip。
    干掉zip

        @Test
        void name() throws ExecutionException, InterruptedException {
            final List<List<Integer>> lists = Source.single(1)
                                                    .map(any -> Arrays.asList(1, 2, 3))
                                                    .flatMapConcat(Source::from)
                                                    .grouped(2)
    //                                                .zip(Source.single(4))
                                                    .map(item -> {
                                                        log.info("map:{}", item);
    //                                                    log.info("map-pair.second:{}", pair.second());
                                                        return item;
    //                                                    return pair.first();
                                                    })
                                                    .withAttributes(ActorAttributes.withSupervisionStrategy(e -> {
                                                        log.info("withAttributes:{}", e.getMessage());
                                                        return Supervision.resume();
                                                    })).runWith(Sink.seq(), actorSystem)
                                                    .toCompletableFuture().get();
            assertThat(lists).containsExactly(Arrays.asList(1, 2), Arrays.asList(3));
        }
    

    看下效果

    是不是很神奇。
    有两种解决方案

    1. 在zip之前 将流中数据全都收集起来,也就是将group 换成普通函数式编程的reduce, akka中叫 Fold
       @Test
        void name() throws ExecutionException, InterruptedException {
            final List<List<Integer>> lists = Source.single(1)
                                                    .map(any -> Arrays.asList(1, 2, 3))
                                                    .flatMapConcat(Source::from)
                                                    .fold(Collections.<Integer>emptyList(), (array, item) ->{
                                                        List<Integer> list = new ArrayList<>(array);
                                                        list.add(item);
                                                        return Collections.unmodifiableList(list);
                                                    })
                                                    .zip(Source.single(4))
                                                    .map(pair -> {
                                                        log.info("map-pair.first:{}", pair.first());
                                                        log.info("map-pair.second:{}", pair.second());
                                                        return pair.first();
                                                    })
                                                    .withAttributes(ActorAttributes.withSupervisionStrategy(e -> {
                                                        log.info("withAttributes:{}", e.getMessage());
                                                        return Supervision.resume();
                                                    })).runWith(Sink.seq(), actorSystem)
                                                    .toCompletableFuture().get();
            assertThat(lists).containsExactly(Arrays.asList(1,2,3));
        }
    

    方法二: 不用zip 换成mapAsync

       @Test
        void name() throws ExecutionException, InterruptedException {
            final List<List<Integer>> lists = Source.single(1)
                                                    .map(any -> Arrays.asList(1, 2, 3))
                                                    .flatMapConcat(Source::from)
                                                    .grouped(2)
                                                    .mapAsyncUnordered(1, list -> Mono.just(4).map(s -> Pair.create(list,s)).toFuture())
                                                    .map(pair -> {
                                                        log.info("map-pair.first:{}", pair.first());
                                                        log.info("map-pair.second:{}", pair.second());
                                                        return pair.first();
                                                    })
                                                    .withAttributes(ActorAttributes.withSupervisionStrategy(e -> {
                                                        log.info("withAttributes:{}", e.getMessage());
                                                        return Supervision.resume();
                                                    })).runWith(Sink.seq(), actorSystem)
                                                    .toCompletableFuture().get();
            assertThat(lists).containsExactly(Arrays.asList(1,2), Arrays.asList(3));
        }
    }
    

    运行结果

    为什么会这样:
    看下官网怎么说

    Combines elements from each of multiple sources into Pair and passes the pairs downstream.
    官网demo

    import akka.stream.javadsl.Source;
    import akka.stream.javadsl.Sink;
    import java.util.Arrays;
    
    Source<String, NotUsed> sourceFruits = Source.from(Arrays.asList("apple", "orange", "banana"));
    Source<String, NotUsed> sourceFirstLetters = Source.from(Arrays.asList("A", "O", "B"));
    sourceFruits.zip(sourceFirstLetters).runWith(Sink.foreach(System.out::print), system);
    // this will print ('apple', 'A'), ('orange', 'O'), ('banana', 'B')
    

    是不是很鸡贼:两个流中的元素数量是一致的,不一致会导致流会丢东西,以少的元素的流为基准,去生成pair。
    当流中数据元素不匹配的时候非要用zip 请使用zipAll方法,他会有个default参数,当流中数据元素不匹配的时候,会以多的为准,然后用default的值去构建pair

    个人建议:zip能不用就不用,如果非要用请用zipAll

  • 相关阅读:
    Java实现HadoopHA集群的hdfs控制
    Hadoop-2.8.5的HA集群搭建
    Python实现bing必应壁纸下载
    使用Python3将Markdown(.md)文本转换成 html、pdf
    使用GitHub作Free图床
    JavaMail实践--实现邮件发送
    Python3实现图片转字符画
    Java编写的Java编辑器_JavaIDE
    Java实现简易版计算器
    Java实现Excel表格操作--API:jxl
  • 原文地址:https://www.cnblogs.com/qulianqing/p/12623403.html
Copyright © 2011-2022 走看看