zoukankan      html  css  js  c++  java
  • akka-stream之异常处理

    背景介绍

    在项目中使用了akk-stream的source.queue功能,如下:

    Pair<SourceQueueWithComplete<Integer>, Source<Integer, NotUsed>> reqSourceQueue = Source<Integer>queue(1024,OverflowStrategy.backpressure()).preMaterialize(materializer);
    

    但是,如果在使用reqSourceQueue.second().runForeach方法来对流中的元素进行处理时,如果处理代码出现错误,也不抛异常,并且整个流中断了,继续往queue中offer元素也不起作用了。查看官方文档,有以下两种解决方案

    recover方法

    该方法在流运行的过程中出现异常时,会使用recover中的match进行处理。示例代码如下:

    final Materializer mat = ActorMaterializer.create(system);
    Source.from(Arrays.asList(0, 1, 2, 3, 4, 5, 6))
        .map(
            n -> {
              if (n < 5) return n.toString();
              else throw new RuntimeException("Boom!");
            })
        .recover(new PFBuilder().match(RuntimeException.class, ex -> "stream truncated").build())
        .runForeach(System.out::println, mat);
    

    但是这种方案也存在一个问题,他在出现错误,并且在recover匹配上以后,会结束整个流,继续往queue中offer不起作用了,显然这不是我们想要的处理方式。

    监督机制

    和akka actor的监督机制相同,我们同样可以在流运行的过程中使用监督机制,这种监督机制是在流运行时使用的materializer中设置的,具体有以下几种监督机制:

    • Stop 出现错误,立即结束流的运行
    • Resume 出现错误的流元素会被抛弃,流继续正常运行
    • Restart 出现错误的流元素会被抛弃,但是流是会在restart以后继续运行,这也就意味着附加在流上的所有状态都会被清空
      通常Resume方法使我们想要的,因为这样即使一个流元素的处理出现错误,也不会影响整个流的运行,导致用户的请求没有响应等各种情况。
      具体示例代码如下:
    final Function<Throwable, Supervision.Directive> decider =
        exc -> {
          //出现该异常时,这个流元素会被抛弃掉
          if (exc instanceof ArithmeticException) return Supervision.resume();
          else return Supervision.stop();
        };
    final Materializer mat =
        ActorMaterializer.create(
            ActorMaterializerSettings.create(system).withSupervisionStrategy(decider), system);
    final Source<Integer, NotUsed> source =
        Source.from(Arrays.asList(0, 1, 2, 3, 4, 5)).map(elem -> 100 / elem);
    final Sink<Integer, CompletionStage<Integer>> fold = Sink.fold(0, (acc, elem) -> acc + elem);
    final CompletionStage<Integer> result = source.runWith(fold, mat);
    
  • 相关阅读:
    PAT (Advanced Level) 1080. Graduate Admission (30)
    PAT (Advanced Level) 1079. Total Sales of Supply Chain (25)
    PAT (Advanced Level) 1078. Hashing (25)
    PAT (Advanced Level) 1077. Kuchiguse (20)
    PAT (Advanced Level) 1076. Forwards on Weibo (30)
    PAT (Advanced Level) 1075. PAT Judge (25)
    PAT (Advanced Level) 1074. Reversing Linked List (25)
    PAT (Advanced Level) 1073. Scientific Notation (20)
    PAT (Advanced Level) 1072. Gas Station (30)
    PAT (Advanced Level) 1071. Speech Patterns (25)
  • 原文地址:https://www.cnblogs.com/junjiang3/p/11473789.html
Copyright © 2011-2022 走看看