zoukankan      html  css  js  c++  java
  • akka stream第三课-模块化

    Modularity, Composition and Hierarchy

    模块性、组合性和层次性

    Dependency

    依赖

    To use Akka Streams, add the module to your project:

    要使用Akka Streams,请将模块添加到您的项目中:

    val AkkaVersion = "2.6.9"
    libraryDependencies += "com.typesafe.akka" %% "akka-stream" % AkkaVersion

    Introduction

    介绍

    Akka Streams provide a uniform model of stream processing graphs, which allows flexible composition of reusable components. In this chapter we show how these look like from the conceptual and API perspective, demonstrating the modularity aspects of the library.

    Akka流提供了流处理图的统一模型,允许灵活地组合可重用组件。在本章中,我们将从概念和API的角度展示这些特性,并演示库的模块化方面。

    Basics of composition and modularity

    组合和模块化基础

    Every operator used in Akka Streams can be imagined as a “box” with input and output ports where elements to be processed arrive and leave the operator. In this view, a Source is nothing else than a “box” with a single output port, or, a BidiFlow is a “box” with exactly two input and two output ports. In the figure below we illustrate the most commonly used operators viewed as “boxes”.

    在Akka流中使用的每个操作符都可以想象为一个“盒子”,其中有输入和输出端口,待处理的元素在这里到达和离开操作符。在这个视图中,源就是一个只有一个输出端口的“盒子”,或者BidiFlow就是一个只有两个输入端口和两个输出端口的“盒子”。在下图中,我们展示了被视为“框”的最常用运算符。

    The linear operators are SourceSink and Flow, as these can be used to compose strict chains of operators. Fan-in and Fan-out operators have usually multiple input or multiple output ports, therefore they allow to build more complex graph layouts, not only chains. BidiFlow operators are usually useful in IO related tasks, where there are input and output channels to be handled. Due to the specific shape of BidiFlow it is easy to stack them on top of each other to build a layered protocol for example. The TLS support in Akka is for example implemented as a BidiFlow.

    线性运算符是源、汇和流,因为它们可以用来组成严格的运算符链。扇入和扇出操作符通常有多个输入或多个输出端口,因此它们允许构建更复杂的图形布局,而不仅仅是链。BidiFlow运算符通常在IO相关的任务中很有用,这些任务需要处理输入和输出通道。由于BidiFlow的特定形状,因此很容易将它们堆叠在一起,以构建分层协议。例如,Akka中的TLS支持被实现为BidiFlow。

    These reusable components already allow the creation of complex processing networks. What we have seen so far does not implement modularity though. It is desirable for example to package up a larger graph entity into a reusable component which hides its internals only exposing the ports that are meant to the users of the module to interact with. One good example is the Http server component, which is encoded internally as a BidiFlow which interfaces with the client TCP connection using an input-output port pair accepting and sending ByteString s, while its upper ports emit and receive HttpRequest and HttpResponse instances.

    这些可重用组件已经允许创建复杂的处理网络。不过,到目前为止,我们所看到的并没有实现模块化。例如,我们希望将一个更大的图形实体打包到一个可重用的组件中,该组件隐藏其内部,只公开模块用户要与之交互的端口。一个很好的例子是Http服务器组件,它在内部被编码为BidiFlow,它使用一个输入输出端口对与客户机TCP连接进行接口,通过testrings接受和发送,而其上层端口则发出和接收HttpRequest和HttpResponse实例。

    The following figure demonstrates various composite operators, that contain various other type of operators internally, but hiding them behind a shape that looks like a SourceFlow, etc.

    下图演示了各种复合运算符,这些运算符在内部包含各种其他类型的运算符,但将它们隐藏在看起来像源、流等的形状后面。

    One interesting example above is a Flow which is composed of a disconnected Sink and Source. This can be achieved by using the fromSinkAndSource() constructor method on Flow which takes the two parts as parameters.

    上面一个有趣的例子是由断开连接的接收器和源组成的流。这可以通过对Flow使用fromSinkAndSource()构造函数方法来实现,该方法将两部分作为参数。

    Please note that when combining a Flow using that method, the termination signals are not carried “through” as the Sink and Source are assumed to be fully independent. If however you want to construct a Flow like this but need the termination events to trigger “the other side” of the composite flow, you can use Flow.fromSinkAndSourceCoupled or Flow.fromSinkAndSourceCoupledMat which does just that. For example the cancelation of the composite flows source-side will then lead to completion of its sink-side. Read Flow’s API documentation for a detailed explanation how this works.

    请注意,当使用该方法组合流时,终端信号不会“通过”,因为假设接收器和源是完全独立的。但是,如果您希望构造这样的流,但需要终止事件来触发复合流的“另一面”,则可以使用Flow.fromsink和sourcecoupled或者

    Flow.fromsinkandsourcecoupedmat就是这样。例如,取消复合流源端将导致其汇端的完成。请阅读Flow的API文档,以了解其工作原理的详细说明。

    The example BidiFlow demonstrates that internally a module can be of arbitrary complexity, and the exposed ports can be wired in flexible ways. The only constraint is that all the ports of enclosed modules must be either connected to each other, or exposed as interface ports, and the number of such ports needs to match the requirement of the shape, for example a Source allows only one exposed output port, the rest of the internal ports must be properly connected.

    BidiFlow示例表明,模块内部可以具有任意复杂度,暴露的端口可以以灵活的方式连接。唯一的限制是封闭模块的所有端口必须相互连接,或者作为接口端口公开,并且这些端口的数量需要与形状的要求相匹配,例如一个源只允许一个暴露的输出端口,其余的内部端口必须正确连接。

    These mechanics allow arbitrary nesting of modules. For example the following figure demonstrates a RunnableGraph that is built from a composite Source and a composite Sink (which in turn contains a composite Flow).

     这些机制允许任意嵌套模块。例如,下图演示了一个由复合源和复合接收器(后者又包含一个复合流)构建的RunnableGraph。

    The above diagram contains one more shape that we have not seen yet, which is called RunnableGraph. It turns out, that if we wire all exposed ports together, so that no more open ports remain, we get a module that is closed. This is what the RunnableGraph class represents. This is the shape that a Materializer can take and turn into a network of running entities that perform the task described. In fact, a RunnableGraph is a module itself, and (maybe somewhat surprisingly) it can be used as part of larger graphs. It is rarely useful to embed a closed graph shape in a larger graph (since it becomes an isolated island as there are no open port for communication with the rest of the graph), but this demonstrates the uniform underlying model.

    上面的图中还有一个我们还没有看到的形状,叫做RunnableGraph。结果是,如果我们把所有暴露的端口连接在一起,这样就不再有开放的端口了,我们就得到了一个关闭的模块。这就是RunnableGraph类所表示的。物化器可以采用这种形状,并将其转变为执行所述任务的运行实体网络。事实上,RunnableGraph本身就是一个模块,而且(可能有点令人惊讶)它可以作为更大图形的一部分使用。在一个较大的图中嵌入一个封闭的图形形状是很少有用的(因为它变成了一个孤立的孤岛,因为没有开放的端口与图的其余部分进行通信),但是这证明了统一的底层模型。

    If we try to build a code snippet that corresponds to the above diagram, our first try might look like this:

    如果我们试图构建一个与上图相对应的代码片段,我们的第一次尝试可能如下所示:

    Source.single(0).map(_ + 1).filter(_ != 0).map(_ - 2).to(Sink.fold(0)(_ + _))

    It is clear however that there is no nesting present in our first attempt, since the library cannot figure out where we intended to put composite module boundaries, it is our responsibility to do that. If we are using the DSL provided by the FlowSourceSink classes then nesting can be achieved by calling one of the methods withAttributes() or named() (where the latter is a shorthand for adding a name attribute).

    但是很明显,在我们的第一次尝试中没有嵌套,因为库无法确定我们打算将复合模块边界放在哪里,所以我们有责任这样做。如果我们使用Flow、Source、Sink类提供的DSL,那么可以通过调用其中一个方法withAttributes()或named()来实现嵌套(后者是添加name属性的简写)。

    The following code demonstrates how to achieve the desired nesting:

    以下代码演示如何实现所需的嵌套:

    val nestedSource =
      Source
        .single(0) // An atomic source
        .map(_ + 1) // an atomic processing stage
        .named("nestedSource") // wraps up the current Source and gives it a name
    
    val nestedFlow =
      Flow[Int]
        .filter(_ != 0) // an atomic processing stage
        .map(_ - 2) // another atomic processing stage
        .named("nestedFlow") // wraps up the Flow, and gives it a name
    
    val nestedSink =
      nestedFlow
        .to(Sink.fold(0)(_ + _)) // wire an atomic sink to the nestedFlow
        .named("nestedSink") // wrap it up
    
    // Create a RunnableGraph
    val runnableGraph = nestedSource.to(nestedSink)

    Once we have hidden the internals of our components, they act like any other built-in component of similar shape. If we hide some of the internals of our composites, the result looks just like if any other predefine component has been used:

    一旦我们隐藏了组件的内部,它们的行为就像任何其他类似形状的内置组件一样。如果我们隐藏了复合材料的一些内部结构,结果看起来就像使用了任何其他预定义组件一样:

     If we look at usage of built-in components, and our custom components, there is no difference in usage as the code snippet below demonstrates.

    如果我们看一下内置组件和自定义组件的用法,正如下面的代码片段所示,它们的用法没有区别。

    // Create a RunnableGraph from our components
    val runnableGraph = nestedSource.to(nestedSink)
    
    // Usage is uniform, no matter if modules are composite or atomic
    val runnableGraph2 = Source.single(0).to(Sink.fold(0)(_ + _))

    Composing complex systems

    构成复杂系统

    In the previous section we explored the possibility of composition, and hierarchy, but we stayed away from non-linear, generalized operators. There is nothing in Akka Streams though that enforces that stream processing layouts can only be linear. The DSL for Source and friends is optimized for creating such linear chains, as they are the most common in practice. There is a more advanced DSL for building complex graphs, that can be used if more flexibility is needed. We will see that the difference between the two DSLs is only on the surface: the concepts they operate on are uniform across all DSLs and fit together nicely.

    在上一节中,我们探讨了组合和层次结构的可能性,但是我们远离了非线性的、广义的运算符。在Akka流中没有任何东西强制要求流处理布局只能是线性的。Source和friends的DSL是为创建这种线性链而优化的,因为它们在实践中是最常见的。有一种更高级的DSL用于构建复杂的图形,如果需要更大的灵活性,可以使用它。我们将看到这两个dsl之间的区别只是表面上的:它们操作的概念在所有dsl中是一致的,并且很好地结合在一起。

    As a first example, let’s look at a more complex layout:

    作为第一个示例,让我们看一个更复杂的布局:

     The diagram shows a RunnableGraph (remember, if there are no unwired ports, the graph is closed, and therefore can be materialized) that encapsulates a non-trivial stream processing network. It contains fan-in, fan-out operators, directed and non-directed cycles. The runnable() method of the GraphDSL object allows the creation of a general, closed, and runnable graph. For example the network on the diagram can be realized like this:

    该图显示了一个RunnableGraph(请记住,如果没有未连接的端口,则该图是关闭的,因此可以具体化),它封装了一个非平凡的流处理网络。它包含扇入、扇出运算符、定向和非定向循环。GraphDSL对象的runnable()方法允许创建一个通用的、封闭的、可运行的图形。例如,图中的网络可以这样实现:

    import GraphDSL.Implicits._
    RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
      val A: Outlet[Int]                  = builder.add(Source.single(0)).out
      val B: UniformFanOutShape[Int, Int] = builder.add(Broadcast[Int](2))
      val C: UniformFanInShape[Int, Int]  = builder.add(Merge[Int](2))
      val D: FlowShape[Int, Int]          = builder.add(Flow[Int].map(_ + 1))
      val E: UniformFanOutShape[Int, Int] = builder.add(Balance[Int](2))
      val F: UniformFanInShape[Int, Int]  = builder.add(Merge[Int](2))
      val G: Inlet[Any]                   = builder.add(Sink.foreach(println)).in
    
                    C     <~      F
      A  ~>  B  ~>  C     ~>      F
             B  ~>  D  ~>  E  ~>  F
                           E  ~>  G
    
      ClosedShape
    })

    In the code above we used the implicit port numbering feature (to make the graph more readable and similar to the diagram) and we imported Source s, Sink s and Flow s explicitly. It is possible to refer to the ports explicitly, and it is not necessary to import our linear operators via add(), so another version might look like this:

    在上面的代码中,我们使用了隐式端口编号特性(使图形更具可读性,并与图类似),并且显式地导入了源、汇和流。可以显式引用端口,并且不必通过add()导入线性运算符,因此另一个版本可能如下所示:

    import GraphDSL.Implicits._
    RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
      val B = builder.add(Broadcast[Int](2))
      val C = builder.add(Merge[Int](2))
      val E = builder.add(Balance[Int](2))
      val F = builder.add(Merge[Int](2))
    
      Source.single(0) ~> B.in; B.out(0) ~> C.in(1); C.out ~> F.in(0)
      C.in(0) <~ F.out
    
      B.out(1).map(_ + 1) ~> E.in; E.out(0) ~> F.in(1)
      E.out(1) ~> Sink.foreach(println)
      ClosedShape
    })

    Similar to the case in the first section, so far we have not considered modularity. We created a complex graph, but the layout is flat, not modularized. We will modify our example, and create a reusable component with the graph DSL. The way to do it is to use the create() factory method on GraphDSL. If we remove the sources and sinks from the previous example, what remains is a partial graph:

     与第一节中的情况类似,到目前为止,我们还没有考虑模块化。我们创建了一个复杂的图形,但是布局是平面的,不是模块化的。我们将修改我们的示例,并使用图DSL创建一个可重用的组件。方法是在GraphDSL上使用create()工厂方法。如果我们从前面的示例中删除源和汇,剩下的是一个部分图:

     We can recreate a similar graph in code, using the DSL in a similar way than before:

    我们可以用与以前类似的方式使用DSL,在代码中重新创建类似的图形:

    import GraphDSL.Implicits._
    val partial = GraphDSL.create() { implicit builder =>
      val B = builder.add(Broadcast[Int](2))
      val C = builder.add(Merge[Int](2))
      val E = builder.add(Balance[Int](2))
      val F = builder.add(Merge[Int](2))
    
                                       C  <~  F
      B  ~>                            C  ~>  F
      B  ~>  Flow[Int].map(_ + 1)  ~>  E  ~>  F
      FlowShape(B.in, E.out(1))
    }.named("partial")

    The only new addition is the return value of the builder block, which is a Shape. All operators (including SourceBidiFlow, etc) have a shape, which encodes the typed ports of the module. In our example there is exactly one input and output port left, so we can declare it to have a FlowShape by returning an instance of it. While it is possible to create new Shape types, it is usually recommended to use one of the matching built-in ones.

    唯一新添加的是构建块的返回值,它是一个形状。所有操作符(包括Source、BidiFlow等)都有一个形状,它对模块的类型化端口进行编码。在我们的示例中只剩下一个输入和输出端口,因此我们可以通过返回它的实例来声明它具有FlowShape。虽然可以创建新的形状类型,但通常建议使用匹配的内置形状类型之一。

    The resulting graph is already a properly wrapped module, so there is no need to call named() to encapsulate the graph, but it is a good practice to give names to modules to help debugging.

    生成的图形已经是一个正确包装的模块,因此不需要调用named()来封装图形,但是为模块命名以帮助调试是一个很好的做法。

     Since our partial graph has the right shape, it can be already used in the simpler, linear DSL:

    由于我们的部分图具有正确的形状,因此它可以用于更简单的线性DSL:

    Source.single(0).via(partial).to(Sink.ignore)

    It is not possible to use it as a Flow yet, though (i.e. we cannot call .filter() on it), but Flow has a fromGraph() method that adds the DSL to a FlowShape. There are similar methods on SourceSink and BidiShape, so it is easy to get back to the simpler DSL if an operator has the right shape. For convenience, it is also possible to skip the partial graph creation, and use one of the convenience creator methods. To demonstrate this, we will create the following graph:

     虽然还不能将它用作流(即我们不能对它调用.filter()),但Flow有一个fromGraph()方法,它将DSL添加到FlowShape中。在Source、Sink和BidiShape上有类似的方法,因此如果一个操作符的形状正确,就很容易回到更简单的DSL。为了方便起见,还可以跳过部分图形的创建,使用方便的creator方法之一。为了证明这一点,我们将创建以下图表:

    The code version of the above closed graph might look like this:

    上述闭合图的代码版本可能如下所示:

    // Convert the partial graph of FlowShape to a Flow to get
    // access to the fluid DSL (for example to be able to call .filter())
    val flow = Flow.fromGraph(partial)
    
    // Simple way to create a graph backed Source
    val source = Source.fromGraph( GraphDSL.create() { implicit builder =>
      val merge = builder.add(Merge[Int](2))
      Source.single(0)      ~> merge
      Source(List(2, 3, 4)) ~> merge
    
      // Exposing exactly one output port
      SourceShape(merge.out)
    })
    
    // Building a Sink with a nested Flow, using the fluid DSL
    val sink = {
      val nestedFlow = Flow[Int].map(_ * 2).drop(10).named("nestedFlow")
      nestedFlow.to(Sink.head)
    }
    
    // Putting all together
    val closed = source.via(flow.filter(_ > 1)).to(sink)
    Note:All graph builder sections check if the resulting graph has all ports connected except the exposed ones and will throw an exception if this is violated.

    注意:图生成器的所有部分都会检查结果图是否连接了除公开的端口之外的所有端口,如果违反了这一点,则将引发异常。

    We are still in debt of demonstrating that RunnableGraph is a component like any other, which can be embedded in graphs. In the following snippet we embed one closed graph in another:

    我们仍然需要证明RunnableGraph是一个组件,它可以嵌入到图中。在下面的片段中,我们将一个闭合图形嵌入另一个封闭图形中:

    val closed1 = Source.single(0).to(Sink.foreach(println))
    val closed2 = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
      val embeddedClosed: ClosedShape = builder.add(closed1)
      //
      embeddedClosed
    })

    The type of the imported module indicates that the imported module has a ClosedShape, and so we are not able to wire it to anything else inside the enclosing closed graph. Nevertheless, this “island” is embedded properly, and will be materialized just like any other module that is part of the graph.

    导入模块的类型表示导入的模块具有ClosedShape,因此我们无法将其连接到封闭封闭封闭图中的任何其他模块。然而,这个“孤岛”被正确地嵌入,并且将像图中的任何其他模块一样具体化。

    As we have demonstrated, the two DSLs are fully interoperable, as they encode a similar nested structure of “boxes with ports”, it is only the DSLs that differ to be as much powerful as possible on the given abstraction level. It is possible to embed complex graphs in the fluid DSL, and it is just as easy to import and embed a Flow, etc, in a larger, complex structure.

    正如我们所展示的,这两个dsl是完全可互操作的,因为它们编码了一个类似的“带端口的盒子”的嵌套结构,只有dsl在给定的抽象级别上尽可能强大。在fluidsl中嵌入复杂的图形是可能的,在更大、更复杂的结构中导入和嵌入流等也同样容易。

    We have also seen, that every module has a Shape (for example a Sink has a SinkShape) independently which DSL was used to create it. This uniform representation enables the rich composability of various stream processing entities in a convenient way.

    我们还看到,每个模块都有一个独立的形状(例如Sink有SinkShape),DSL是用来创建它的。这种统一表示使各种流处理实体以一种方便的方式具有丰富的可组合性。

    Materialized values

    物化价值

    After realizing that RunnableGraph is nothing more than a module with no unused ports (it is an island), it becomes clear that after materialization the only way to communicate with the running stream processing logic is via some side-channel. This side channel is represented as a materialized value. The situation is similar to Actor s, where the Props instance describes the actor logic, but it is the call to actorOf() that creates an actually running actor, and returns an ActorRef that can be used to communicate with the running actor itself. Since the Props can be reused, each call will return a different reference.

    在认识到RunnableGraph只不过是一个没有未使用端口的模块(它是一个孤岛)之后,很明显,在具体化之后,与正在运行的流处理逻辑通信的唯一方法是通过某个旁道。这个旁道被表示为一个具体化的值。这种情况类似于Actor s,Props实例描述Actor逻辑,但调用actorOf()创建实际运行的Actor,并返回一个ActorRef,该ActorRef可用于与正在运行的Actor本身通信。由于道具可以重用,因此每次调用都将返回不同的引用。

    When it comes to streams, each materialization creates a new running network corresponding to the blueprint that was encoded in the provided RunnableGraph. To be able to interact with the running network, each materialization needs to return a different object that provides the necessary interaction capabilities. In other words, the RunnableGraph can be seen as a factory, which creates:

    当涉及到流时,每个具体化都会创建一个新的运行网络,该网络对应于在提供的RunnableGraph中编码的蓝图。为了能够与运行中的网络进行交互,每个具体化需要返回一个不同的对象,该对象提供必要的交互功能。换句话说,RunnableGraph可以看作是一个工厂,它创建了:

    • a network of running processing entities, inaccessible from the outside

               运行的处理实体的网络,从外部无法访问

    • a materialized value, optionally providing a controlled interaction capability with the network

             提供了一种可选的价值控制的网络能力

    Unlike actors though, each of the operators might provide a materialized value, so when we compose multiple operators or modules, we need to combine the materialized value as well (there are default rules which make this easier, for example to() and via() takes care of the most common case of taking the materialized value to the left. See Combining materialized values for details). We demonstrate how this works by a code example and a diagram which graphically demonstrates what is happening.

    但是,与actors不同,每个操作符都可能提供一个物化值,因此当我们组合多个运算符或模块时,我们也需要组合物化值(有一些默认规则使其更容易实现,例如to()和via()负责将物化值放在左侧这一最常见的情况。有关详细信息,请参见组合物化值)。我们通过一个代码示例和一个图表来演示这是如何工作的。

    The propagation of the individual materialized values from the enclosed modules towards the top will look like this:

    从封闭模块向顶部传播单个物化值的过程如下所示:

     

    To implement the above, first, we create a composite Source, where the enclosed Source have a materialized type of Promise[[Option[Int]] . By using the combiner function Keep.left, the resulting materialized type is of the nested module (indicated by the color red on the diagram):

    为了实现上述功能,首先,我们创建一个复合源,其中封闭的源具有一个具体化类型Promise[[Option[Int]]。通过使用组合器函数靠左,得到的物化类型为嵌套模块(图中红色表示):

    // Materializes to Promise[Option[Int]]                                   (red)
    val source: Source[Int, Promise[Option[Int]]] = Source.maybe[Int]
    
    // Materializes to NotUsed                                               (black)
    val flow1: Flow[Int, Int, NotUsed] = Flow[Int].take(100)
    
    // Materializes to Promise[Int]                                          (red)
    val nestedSource: Source[Int, Promise[Option[Int]]] =
      source.viaMat(flow1)(Keep.left).named("nestedSource")

    Next, we create a composite Flow from two smaller components. Here, the second enclosed Flow has a materialized type of Future[OutgoingConnection] , and we propagate this to the parent by using Keep.right as the combiner function (indicated by the color yellow on the diagram):

    接下来,我们从两个较小的组件创建一个复合流。在这里,第二个封闭的流有一个物化类型Future[OutgoingConnection],我们使用保持正确作为组合器功能(图中黄色表示):

    // Materializes to NotUsed                                                (orange)
    val flow2: Flow[Int, ByteString, NotUsed] = Flow[Int].map { i =>
      ByteString(i.toString)
    }
    
    // Materializes to Future[OutgoingConnection]                             (yellow)
    val flow3: Flow[ByteString, ByteString, Future[OutgoingConnection]] =
      Tcp().outgoingConnection("localhost", 8080)
    
    // Materializes to Future[OutgoingConnection]                             (yellow)
    val nestedFlow: Flow[Int, ByteString, Future[OutgoingConnection]] =
      flow2.viaMat(flow3)(Keep.right).named("nestedFlow")

    As a third step, we create a composite Sink, using our nestedFlow as a building block. In this snippet, both the enclosed Flow and the folding Sink has a materialized value that is interesting for us, so we use Keep.both to get a Pair of them as the materialized type of nestedSink (indicated by the color blue on the diagram)

    第三步,我们使用嵌套流作为构建块创建一个复合接收器。在这个片段中,封闭流和折叠接收器都有一个物化值,这对我们来说很有趣,所以我们使用两个都留着得到一对作为nestedSink的物化类型(由图中的蓝色表示)

    // Materializes to Future[String]                                         (green)
    val sink: Sink[ByteString, Future[String]] = Sink.fold("")(_ + _.utf8String)
    
    // Materializes to (Future[OutgoingConnection], Future[String])           (blue)
    val nestedSink: Sink[Int, (Future[OutgoingConnection], Future[String])] =
      nestedFlow.toMat(sink)(Keep.both)

    As the last example, we wire together nestedSource and nestedSink and we use a custom combiner function to create a yet another materialized type of the resulting RunnableGraph. This combiner function ignores the Future[String] part, and wraps the other two values in a custom case class MyClass (indicated by color purple on the diagram):

    作为最后一个例子,我们将nestedSource和nestedSink连接在一起,并使用一个定制的组合器函数来创建生成的RunnableGraph的另一个具体化类型。此组合器函数忽略Future[String]部分,并将其他两个值包装在自定义case类MyClass中(在图中用紫色表示):

    case class MyClass(private val p: Promise[Option[Int]], conn: OutgoingConnection) {
      def close() = p.trySuccess(None)
    }
    
    def f(p: Promise[Option[Int]], rest: (Future[OutgoingConnection], Future[String])): Future[MyClass] = {
    
      val connFuture = rest._1
      connFuture.map(MyClass(p, _))
    }
    
    // Materializes to Future[MyClass]                                        (purple)
    val runnableGraph: RunnableGraph[Future[MyClass]] =
      nestedSource.toMat(nestedSink)(f)
    Note

    The nested structure in the above example is not necessary for combining the materialized values, it demonstrates how the two features work together. See Combining materialized values for further examples of combining materialized values without nesting and hierarchy involved.

    上例中的嵌套结构不是组合具体化值所必需的,它演示了两个特性是如何协同工作的。有关不涉及嵌套和层次结构的组合物化值的更多示例,请参见组合物化值。

    Attributes

    属性

    We have seen that we can use named() to introduce a nesting level in the fluid DSL (and also explicit nesting by using create() from GraphDSL). Apart from having the effect of adding a nesting level, named() is actually a shorthand for calling withAttributes(Attributes.name("someName")). Attributes provide a way to fine-tune certain aspects of the materialized running entity. For example buffer sizes for asynchronous operators can be controlled via attributes (see Buffers for asynchronous operators). When it comes to hierarchic composition, attributes are inherited by nested modules, unless they override them with a custom value.

    我们已经看到,我们可以使用named()在fluid DSL中引入嵌套级别(也可以通过使用GraphDSL中的create()显式嵌套)。除了具有添加嵌套级别的效果外,named()实际上是调用withAttributes的简写(Attributes.name属性(“someName”))。属性提供了一种微调具体化运行实体的某些方面的方法。例如,可以通过属性控制异步运算符的缓冲区大小(请参见异步运算符的缓冲区)。除非模块是用一个自定义的值来重写的,否则当它们被一个定制的层次结构覆盖时。

    The code below, a modification of an earlier example sets the inputBuffer attribute on certain modules, but not on others:

    下面的代码是对前面示例的修改,它在某些模块上设置inputBuffer属性,但在其他模块上不设置:

     

    import Attributes._
    val nestedSource =
      Source.single(0).map(_ + 1).named("nestedSource") // Wrap, no inputBuffer set
    
    val nestedFlow =
      Flow[Int]
        .filter(_ != 0)
        .via(Flow[Int].map(_ - 2).withAttributes(inputBuffer(4, 4))) // override
        .named("nestedFlow") // Wrap, no inputBuffer set
    
    val nestedSink =
      nestedFlow
        .to(Sink.fold(0)(_ + _)) // wire an atomic sink to the nestedFlow
        .withAttributes(name("nestedSink") and inputBuffer(3, 3)) // override

     

    The effect is, that each module inherits the inputBuffer attribute from its enclosing parent, unless it has the same attribute explicitly set. nestedSource gets the default attributes from the materializer itself. nestedSink on the other hand has this attribute set, so it will be used by all nested modules. nestedFlow will inherit from nestedSink except the map operator which has again an explicitly provided attribute overriding the inherited one.

    结果是,每个模块都从其封闭父模块继承inputBuffer属性,除非它显式设置了相同的属性。nestedSource从物化器本身获取默认属性。另一方面,nestedSink具有此属性集,因此它将由所有嵌套模块使用。nestedFlow将从nestedSink继承,但map运算符再次显式提供了重写继承的属性的属性。

    compose_attributes.png

    This diagram illustrates the inheritance process for the example code (representing the materializer default attributes as the color red, the attributes set on nestedSink as blue and the attributes set on nestedFlow as green).

    此图说明了示例代码的继承过程(表示物化器默认属性为红色,nestedSink上设置的属性为蓝色,nestedFlow上设置的属性为绿色)。

     

    val sink:Sink[ByteString,Future[String]]=Sink.fold("")(_ + _.utf8String)// Materializes to (Future[OutgoingConnection], Future[String])           (blue)
    val nestedSink:Sink[Int,(Future[OutgoingConnection],Future[String])]=
      nestedFlow.toMat(sink)(Keep.both)

     

  • 相关阅读:
    mongoDB使用
    mac环境下mongodb的安装和使用
    statrc部分
    权限部分
    在Linux 安装Python3.5.6详细文档!!!!
    linux回顾
    linux服务配置
    路飞ORM练习
    考试题-路飞中期(卷一)
    git hub命令,上传到github
  • 原文地址:https://www.cnblogs.com/0205gt/p/13686084.html
Copyright © 2011-2022 走看看