zoukankan      html  css  js  c++  java
  • akka-stream第一课

    1、sbt依赖包

    val AkkaVersion = "2.6.8"
    libraryDependencies += "com.typesafe.akka" %% "akka-stream" % AkkaVersion

    2、通常从源头Source开始

    import akka.stream._
    import akka.stream.scaladsl._

     如果是想要执行示例代码,则需要同时导入

    import akka.{ Done, NotUsed }
    import akka.actor.ActorSystem
    import akka.util.ByteString
    import scala.concurrent._
    import scala.concurrent.duration._
    import java.nio.file.Paths

    3、一个入门示例

    object Main extends App {
      implicit val system = ActorSystem("QuickStart")
      // Code here
    }
    //从一个非常简单的源开始,发出1到100的整数:
    val source: Source[Int, NotUsed] = Source(1 to 100)

    Source类型有两种类型的参数化:Source[Int, NotUsed]

    • 第一种是该源发出的元素的类型,
    • 第二种是“物化值”, 允许运行该源以产生一些辅助值(例如,网络源可以提供有关该值的信息)。绑定的端口或对等方的地址)。

          如果没有产生辅助信息,则使用该类型akka.NotUsed一个简单的整数范围属于此类别-运行我们的流会生成一个NotUsed

    此源意味着我们已经对如何发出前100个自然数进行了描述,但是该源尚未激活。为了获得这些数字,我们必须运行它:

    source.runForeach(i => println(i))

    runForeach,在流结束时返回一个 Future[Done],同时注意要关闭ActorSystem

    val done: Future[Done] = source.runForeach(i => println(i))
    
    implicit val ec = system.dispatcher
    done.onComplete(_ => system.terminate())

    4、Stream的好处

    Source是对您要运行内容的描述,并且可以像建筑师的蓝图一样重复使用,并整合到更大的设计中。我们可以选择转换整数的源并将其写入文件:

    val factorials = source.scan(BigInt(1))((acc, next) => acc * next)
    //
    scan运算符对整个流进行计算:从数字(1)开始,我们将每个传入的数字相乘,一个接一个
    val result: Future[IOResult] = factorials.map(num => ByteString(s"$num
    "))
    .runWith(FileIO.toPath(Paths.get("factorials.txt")))//得到文件

         

        所耗时间通过下列代码,能够知道(运算+文档)导出大概100毫秒,akka的流处理能够看做是毫秒级别的大数据运算

    val  dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS", Locale.CHINA)
    val txntime11: String = LocalDateTime.now.format(dateTimeFormatter)
    println(txntime11)
    val source=Source(1 to 100)
    val factorials = source.scan(BigInt(1))((acc, next) => acc * next)

    val result: Future[IOResult] =
    factorials.map(num => ByteString(s"$num ")).runWith(FileIO.toPath(Paths.get("factorials.txt")))
    result
    val txntime12: String = LocalDateTime.now.format(dateTimeFormatter)
    println(txntime12)

    //2020-08-27 15:16:59.889
    //2020-08-27 15:16:59.974

    5、类的流处理的案例:

    package test
    
    import akka.NotUsed
    import akka.actor.ActorSystem
    import akka.stream.scaladsl._
    
    object TweetSourceTest extends App{
    
    
      final case class Author(handle: String)
    
      final case class Hashtag(name: String)
    
      final case class Tweet(author: Author, timestamp: Long, body: String) {
        def hashtags: Set[Hashtag] =
          body
            .split(" ")
            .collect {
              case t if t.startsWith("#") => Hashtag(t.replaceAll("[^#\w]", ""))
            }
            .toSet
      }
    
      val akkaTag = Hashtag("#akka")
    
      val tweets: Source[Tweet, NotUsed] = Source(
        Tweet(Author("rolandkuhn"), System.currentTimeMillis, "#akka rocks!") ::
          Tweet(Author("patriknw"), System.currentTimeMillis, "#akka !") ::
          Tweet(Author("bantonsson"), System.currentTimeMillis, "#akka !") ::
          Tweet(Author("drewhk"), System.currentTimeMillis, "#akka !") ::
          Tweet(Author("ktosopl"), System.currentTimeMillis, "#akka on the rocks!") ::
          Tweet(Author("mmartynas"), System.currentTimeMillis, "wow #akka !") ::
          Tweet(Author("akkateam"), System.currentTimeMillis, "#akka rocks!") ::
          Tweet(Author("bananaman"), System.currentTimeMillis, "#bananas rock!") ::
          Tweet(Author("appleman"), System.currentTimeMillis, "#apples rock!") ::
          Tweet(Author("drama"), System.currentTimeMillis, "we compared #apples to #oranges!") ::
          Nil)
    
      implicit val system = ActorSystem("reactive-tweets")
    
      tweets
        .map(_.hashtags) // 得到source内的每个Tweet的Set[Hashtag]
     // Set(Hashtag(#akka))
    //  Set(Hashtag(#akka))
    //  Set(Hashtag(#akka))
    //  Set(Hashtag(#akka))
    //  Set(Hashtag(#akka))
    //  Set(Hashtag(#akka))
    //  Set(Hashtag(#akka))
    //  Set(Hashtag(#bananas))
    //  Set(Hashtag(#apples))
    //  Set(Hashtag(#apples), Hashtag(#oranges)
        .reduce(_ ++ _) ////并将它们减少到一组,删除所有tweet中的重复项
    //Set(Hashtag(#akka), Hashtag(#bananas), Hashtag(#apples), Hashtag(#oranges))
        .mapConcat(identity) // 将hashtags集合展平为hashtags流
    //Hashtag(#akka)
    //Hashtag(#bananas)
    //Hashtag(#apples)
    //Hashtag(#oranges)
       .map(_.name.toUpperCase) // 使之大写
        .runWith(Sink.foreach(println)) // 将流附加到最终将打印标签的接收器
    //#AKKA
    //#BANANAS
    //#APPLES
    //#ORANGES
    
    }

    6、可重复使用

            Akka Streams的一个很好的部分和其他流库没有提供的是,不仅源代码可以像蓝图一样重用,所有其他元素也可以。即类似数据公式一般,toMat是运算

    我们可以使用文件写入接收器,预先准备从传入字符串中获取ByteString元素所需的处理步骤,并将其打包为可重用的部分。

    由于编写这些流的语言总是从左到右(就像普通英语一样),我们需要一个起点,它就像一个源Source,但有一个“开放”的输入。

           在Akka Stream中,这被称为流Flow:

    def lineSink(filename: String): Sink[String, Future[IOResult]] =
      Flow[String].map(s => ByteString(s + "
    ")).toMat(FileIO.toPath(Paths.get(filename)))(Keep.right)

         与上面的factorial案例相差的是,从字符串流开始,我们将每个字符串转换为ByteString,然后将其馈送到已知的文件写入接收器。最终的蓝图是一个Sink[String,Future[IOResult]],

    这意味着它接受字符串作为输入,当具体化时,它将创建Future[IOResult]类型的辅助信息(当链接源或流上的操作时,称为“物化值”的辅助信息类型由最左边的起点给出;因为我们希望保留文件路径下沉我们得说保持正确).

    我们可以使用我们刚刚创建的新的、闪亮的Sink,方法是在稍作修改后将其附加到阶乘源,将数字转换为字符串:

    actorials.map(_.toString).runWith(lineSink("factorial2.txt"))
    Source[+Out, +Mat]       //Out代表元素类型,Mat为运算结果类型
    Flow[-In, +Out, +Mat]    //In,Out为数据流元素类型,Mat是运算结果类型
    Sink[-In, +Mat]          //In是数据元素类型,Mat是运算结果类型

    7、基于时间的处理

    在我们开始看一个更复杂的例子之前,我们先探讨一下Akka Streams可以做什么的流媒体特性。从阶乘源开始,我们通过将其与另一个流压缩在一起来转换流,该流由发出数字0到100的源表示:

    factorials
      .zipWith(Source(0 to 100))((num, idx) => s"$idx! = $num")
      .throttle(1, 1.second)
      .runForeach(println)

         到目前为止,所有操作都是独立于时间的,并且可以在严格的元素集合上以相同的方式执行。下一行说明我们实际上处理的是可以以一定速度流动的流:我们使用throttle操作符将流速度降低到每秒1个元素。

    如果你运行这个程序,你会看到每秒打印一行。但是,有一个不立即可见的方面值得一提:如果您尝试将流设置为每个流产生10亿个数字,那么您将注意到JVM不会因OutOfMemoryError而崩溃,即使您也会注意到流是在后台运行的,

    异步地(这就是在将来提供辅助信息的原因)。这项工作的秘密在于,Akka Streams隐含地实现了普遍的流量控制,所有运营商都尊重背压。这使得节流阀操作员可以向其所有上游数据源发出信号,即只有当输入速率高于每秒一个时,

    节流阀操作员才能以特定速率接收元件。节流阀操作员将向上游施加背压。这就是Akka流的全部,一言以蔽之,Akka流有几十个源和汇,还有更多的流转换运算符可供选择,请参见运算符索引。

    阶乘源发出的第一个数是零的阶乘,第二个是1的阶乘,依此类推。我们把这两者结合起来,形成“3!=6。

    8、反应式推文

        流处理的典型用例是使用实时数据流,我们要从中提取或聚合其他数据。在此示例中,我们将考虑使用一条推文流,并从中提取有关Akka的信息。

    我们还将考虑所有非阻塞流解决方案固有的问题:“如果订户太慢而无法消费实时数据流怎么办?” 传统上,解决方案通常是缓冲元素,但这可能会(通常会)导致此类系统最终出现缓冲区溢出和不稳定。

    相反,Akka Streams依靠内部背压信号来控制这种情况下应该发生的情况。

  • 相关阅读:
    POJ 3253 Fence Repair
    POJ 2431 Expedition
    NYOJ 269 VF
    NYOJ 456 邮票分你一半
    划分数问题 DP
    HDU 1253 胜利大逃亡
    NYOJ 294 Bot Trust
    NYOJ 36 最长公共子序列
    HDU 1555 How many days?
    01背包 (大数据)
  • 原文地址:https://www.cnblogs.com/0205gt/p/13572182.html
Copyright © 2011-2022 走看看