zoukankan      html  css  js  c++  java
  • scala akka基础编程

    序言

    欢迎来到第一个使用Akka和Scala的指南。我们假设你已经知道Akka和Scala是什么,现在需要了解开始第一个项目的步骤。

    本指南有两种方式:

    • 创建一个独立项目,从命令行运行
    • 创建SBT项目,在SBT中运行

    因为这两种方式非常相像,我们都会进行讲解。

    我们要创建的示例应用是使用actor来计算PI的值。计算PI是一项CPU密集的操作,我们将使用Akka Actor来编写一个可以垂直扩展到多个处理器核上的并发解决方案。在将来的指南中,这个示例应用将被扩展国使用Akka远程Actor来在水平到集群中的多台机器上。

    我们所使用的算法叫“embarrassingly parallel” 意思是每个子任务是独立完成的,与其它子任务无关。这个算法可以高度并行化,所以非常适合使用actor模型。

    以下是我们所使用的算法的公式:

    ../_images/pi-formula.png

    在这个特定的算法中,有一个主actor将序列分割成段并发送给工作actor来进行计算。当工作actor完成自己的序列段的计算后将结果传给主actor,由主actor进行汇总。

    源码

    如果你不想把源码用键盘敲一遍而且/或不想创建SBT项目,你可以从Akka GitHub仓库中下载整个指南。它的位置在 akka-tutorials/akka-tutorial-first 模块. 你也可以到 这里在线浏览。实际的代码在 这里

    要使用Git下载代码运行下面的命令:

    Linux/Unix/Mac 系统:

    1. $ git clone git://github.com/akka/akka.git
    2. $ cd akka/akka-tutorials/akka-tutorial-first

    Windows 系统:

    1. C:Usersjbonersrc> git clone git://github.com/akka/akka.git
    2. C:Usersjbonersrc> cd akkaakka-tutorialsakka-tutorial-first

    准备工作

    本指南假设你安装了Java 1.6或更高版本并且 Java 命令在你的 PATH上. 你还需要知道如何在shell(ZSH, Bash, DOS 等等.)中运行命令,需要一个文本编辑器或IDE来输入Scala代码。

    你必须保证 $JAVA_HOME 环境变量被正确设置为Java安装位置的根目录,还必须保证 $JAVA_HOME/bin 在你的PATH上。

    Linux/Unix/Mac 系统:

    1. $ export JAVA_HOME=..root of Java distribution..
    2. $ export PATH=$PATH:$JAVA_HOME/bin

    检测 java 正确安装:

    1. $ java -version
    2. java version "1.6.0_24"
    3. Java(TM) SE Runtime Environment (build 1.6.0_24-b07-334-10M3326)
    4. Java HotSpot(TM) 64-Bit Server VM (build 19.1-b02-334, mixed mode)

    Windows 系统:

    1. C:Usersjbonersrcakka> set JAVA_HOME=..root of Java distribution..
    2. C:Usersjbonersrcakka> set PATH=%PATH%;%JAVA_HOME%/bin

    检测 java 正确安装:

    1. C:Usersjbonersrcakka> java -version
    2. java version "1.6.0_24"
    3. Java(TM) SE Runtime Environment (build 1.6.0_24-b07-334-10M3326)
    4. Java HotSpot(TM) 64-Bit Server VM (build 19.1-b02-334, mixed mode)

    下载安装 Akka

    要从命令行编译和运行本指南示例,需要下载Akka。如果你希望用SBT来编译和运行它,建议跳过这一部分直接看下一节。

    从 http://akka.io/downloads/ 下载 akka-2.0.zip 发布包,它包含本指南所需的全部模块。下载完成后,将发布包解压至安装目录。我的安装目录下 /Users/jboner/tools/

    要正确安装Akka,需要再做一件事:设置 AKKA_HOME 环境变量到安装目录的根.

    Linux/Unix/Mac 系统:

    1. $ cd /Users/jboner/tools/akka-2.0
    2. $ export AKKA_HOME=`pwd`
    3. $ echo $AKKA_HOME
    4. /Users/jboner/tools/akka-2.0

    Windows 系统:

    1. C:Usersjbonersrcakka> cd akka-2.0
    2. C:Usersjbonersrcakkaakka-2.0> set AKKA_HOME=%cd%
    3. C:Usersjbonersrcakkaakka-2.0> echo %AKKA_HOME%
    4. C:Usersjbonersrcakkaakka-2.0

    安装路径的内容如下:

    Linux/Unix/Mac 系统:

    1. $ ls -1
    2. bin
    3. config
    4. deploy
    5. doc
    6. lib
    7. src

    Windows 系统:

    1. C:Usersjbonersrcakkaakka-2.0> dir
    2. bin
    3. config
    4. deploy
    5. doc
    6. lib
    7. src
    • bin 目录中是用来启动Akka微内核的脚本。
    • In the config 目录中是Akka配置文件。
    • In the deploy 目录用来放置随微内核一起运行的应用。
    • In the doc 目录中是文档、API的jar包。
    • In the lib 目录中是Scala和Akka的jar包。
    • In the src 目录中是Akka源码jar包。

    本指南所需要的唯一一个jar包 (除了 scala-library.jar 以外) 是 lib/akka 目录中的 akka-actor-2.0.jar. 这个jar包没有外部依赖,有了它我们就可以编写一个使用actor的系统。

    Akka模块化做得很好,包含有实现不同功能的各个jar包,其模块包括:

    • akka-actor – Actor
    • akka-remote – 远程 Actor
    • akka-slf4j – SLF4J 事件处理监听器
    • akka-testkit – 测试actor的工具包
    • akka-kernel – 运行一个基础的最小应用服务器的微内核
    • akka-durable-mailboxes – 持久邮箱: 基于文件, MongoDB, Redis, Zookeeper
    • akka-amqp – AMQP 集成

    下载安装 Scala

    要从命令行编译和运行本指南示例,需要安装Scala发布包。如果你要使用SBT,在SBT中编译和运行,可以跳过这一部分直接进入下一节。

    可以从 http://www.scala-lang.org/downloads下载Scala. 在那里下载 Scala 2.9.1 版本. 如果你选择的是 tgz 或zip 包则需要进行解压。如果选择的是IzPack安装程序,只需要双击它然后按照指示操作。

    还必须保证 scala-2.9.1/bin (scala-2.9.1是你的Scala安装目录) 在你的 PATH上。

    Linux/Unix/Mac 系统:

    1. $ export PATH=$PATH:scala-2.9.1/bin

    Windows 系统:

    1. C:Usersjbonersrcakkaakka-2.0> set PATH=%PATH%;scala-2.9.1bin

    你可以运行scala命令来测试你的安装是否正确。

    Linux/Unix/Mac 系统:

    1. $ scala -version
    2. Scala code runner version 2.9.1.final -- Copyright 2002-2011, LAMP/EPFL

    Windows 系统:

    1. C:Usersjbonersrcakkaakka-2.0> scala -version
    2. Scala code runner version 2.9.1.final -- Copyright 2002-2011, LAMP/EPFL

    看来一切顺利. 最后让我们创建一个源码文件 Pi.scala 并将它放在Akka发布版本的根目录下的 tutorial 目录 (如果这个目录不存在你需要创建它)。

    有一些工具要求你设置 SCALA_HOME 环境变量为Scala发布包的根目录,不过Akka并没有这一要求。

    下载安装SBT

    SBT, Simple Build Tool的简称, 是用Scala语言编写的优秀的build工具。 它使用Scala语言来编写build脚本,功能非常强大。它拥有一个插件体系,已经有很多插件可供使用,我们很快就会用到它们。SBT是用来build用Scala编写的软件的推荐方法,而且可能是学习本指南的最简单的方法。如果你决定使用SBT,请按下面的指示操作,否则可以跳过这一部分和下一部分内容。

    要安装SBT并创建本指南的项目,最简单的方法见 https://github.com/harrah/xsbt/wiki/Setup

    现在我们需要创建我们的第一个Akka项目,你可以手动向build脚本中添加依赖,不过更简单的方法是使用下一部分中介绍的Akka SBT插件。

    创建Akka SBT项目

    如果你还没做过,那么现在就开始创建本指南所讲的SBT项目,所要做的是在你希望创建项目的目录下添加一个build.sbt 文件:

    1. name := "My Project"
    2.  
    3. version := "1.0"
    4.  
    5. scalaVersion := "2.9.1"
    6.  
    7. resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
    8.  
    9. libraryDependencies += "com.typesafe.akka" % "akka-actor" % "2.0"

    再创建一个名为 src/main/scala 的目录来存放源码.

    虽然本指南并不需要,但是你可能愿意添加除了 akka-actor 外的其它Akka模块, 这些是添加在 build.sbt 的libraryDependencies 部分。 注意其中的每一项之间必须有一个空行。下面是一个添加 akka-remote的例子:

    1. libraryDependencies += "com.typesafe.akka" % "akka-actor" % "2.0"
    2.  
    3. libraryDependencies += "com.typesafe.akka" % "akka-remote" % "2.0"

    好,现在我们都弄好了。

    SBT 自身有一堆依赖,不过我们的项目只需要其中一个: akka-actor-2.0.jar. SBT 会下载它。

    开始编写代码

    终于可以开始写代码了。

    我们先创建 Pi.scala 文件并在文件顶部添加以下这些 import:

    1. import akka.actor._
    2. import akka.routing.RoundRobinRouter
    3. import akka.util.Duration
    4. import akka.util.duration._

    如果你使用SBT,那么将 Pi.scala 放在 src/main/scala 目录下.

    如果你使用命令行,那么可以将它放在随便哪儿。我是在Akka安装目录下创建了一个名为 tutorial 的目录, 也就是 $AKKA_HOME/tutorial/Pi.scala

    创建消息

    我们要做的设计是由一个  actor来启动整个计算过程,创建一组 工作 actor. 整个工作会被分割成具体的小段, 各小段会以round-robin的方式发送到不同的工作 actor. 主actor等待所有的工作actor完全各自的工作并将其回送的结果进行汇总。当计算完成以后,主actor将结果发送给 监听器 acotr, 由它来输出结果。

    在这个基础上, 现在让我们创建在这个系统中流动的消息。我们需要4种不同的消息:

    • Calculate – 发送给  actor 来启动计算。
    • Work – 从  actor 发送给各 工作 actor,包含工作分配的内容。
    • Result – 从 工作 actors 发送给  actor,包含工作actor的计算结果。
    • PiApproximation – 从  actor发送给 监听器 actor,包含pi的最终计算结果和整个计算耗费的时间。

    发送给actor的消息应该永远是不可变的,以避免共享可变状态。 在scala里我们有 ‘case classes’ 来构造完美的消息。现在让我们用case class创建3种消息。我们还为消息们创建一个通用的基础trait(定义为sealed以防止在我们不可控的地方创建消息):

    1. sealed trait PiMessage
    2. case object Calculate extends PiMessage
    3. case class Work(start: Int, nrOfElements: Int) extends PiMessage
    4. case class Result(value: Double) extends PiMessage
    5. case class PiApproximation(pi: Double, duration: Duration)

    创建工作 actor

    现在我们来创建工作 actor。 方法是混入 Actor trait 并定义其中的 receive 方法. receive 方法定义我们的消息处理器。我们让它能够处理 Work 消息,所以添加一个针对这种消息的处理器:

    1. class Worker extends Actor {
    2.  
    3. // calculatePiFor ...
    4.  
    5. def receive = {
    6. case Work(start, nrOfElements)
    7. sender ! Result(calculatePiFor(start, nrOfElements)) // perform the work
    8. }
    9. }

    可以看到我们现在创建了一个 Actor 和一个 receive 方法作为 Work 消息的处理器. 在这个处理器中我们调用calculatePiFor(..) 方法, 将结果包在 Result 消息里并使用sender异步发送回消息的原始发送者。 在Akka里,sender引用是与消息一起隐式发送的,这样接收者可以随时回复或将sender引用保存起来以备将来使用。

    现在在我们的 Worker actor 中唯一缺少的就是实现 calculatePiFor(..) 方法。 虽然在Scala里我们可以有很多方法来实现这个算法,在这个入门指南中我们选择了一种命令式的风格,使用了for写法和一个累加器:

    1. def calculatePiFor(start: Int, nrOfElements: Int): Double = {
    2. var acc = 0.0
    3. for (i start until (start + nrOfElements))
    4. acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1)
    5. acc
    6. }

    创建主actor

    主actor会稍微复杂一些。 在它的构造方法里我们创建一个round-robin的路由器来简化将工作平均地分配给工作actor们的过程,先做这个:

    1. val workerRouter = context.actorOf(
    2. Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), name = "workerRouter")

    现在我们有了一个路由,可以在一个单一的抽象中表达所有的工作actor。现在让我们创建主actor. 传递给它三个整数变量:

    • nrOfWorkers – 定义我们会启动多少工作actor
    • nrOfMessages – 定义会有多少整数段发送给工作actor
    • nrOfElements – 定义发送给工作actor的每个整数段的大小

    下面是主actor:

    1. class Master(nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, listener: ActorRef)
    2. extends Actor {
    3.  
    4. var pi: Double = _
    5. var nrOfResults: Int = _
    6. val start: Long = System.currentTimeMillis
    7.  
    8. val workerRouter = context.actorOf(
    9. Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), name = "workerRouter")
    10.  
    11. def receive = {
    12. // handle messages ...
    13. }
    14.  
    15. }

    有一些需要进一步解释的事。

    注意我们向  actor传进了一个 ActorRef . 这是用来向外界报告最终的计算结果。

    但是还没完。我们还缺少  actor的消息处理器. 这个处理器需要能够对两种消息进行响应:

    • Calculate – 用来启动计算过程
    • Result – 用来汇总不同的计算结果

    Calculate 处理器会通过其路由器向所有的 工作 actor 发送工作内容.

    Result 处理器从 Result 消息中获取值并汇总到我们的 pi 成员变量中. 我们还会记录已经接收的结果数据的数量,它是否与发送出去的任务数量一致 。 actor 发现计算完成了,会将最终结果发送给 监听者. 当整个过程都完成了,它会调用 context.stop(self) 方法来终止自己  它所监管的所有actor. 在本例中,主actor监管一个actor,我们的路由器,而路由器监管着所有 nrOfWorkers 个工作actors. 所有的actor都会在其监管者的stop方法被调用时自动终止,并会传递给所有它监管的子actor。

    让我们在代码中实现这些:

    1. def receive = {
    2. case Calculate
    3. for (i 0 until nrOfMessages) workerRouter ! Work(i * nrOfElements, nrOfElements)
    4. case Result(value)
    5. pi += value
    6. nrOfResults += 1
    7. if (nrOfResults == nrOfMessages) {
    8. // Send the result to the listener
    9. listener ! PiApproximation(pi, duration = (System.currentTimeMillis - start).millis)
    10. // Stops this actor and all its supervised children
    11. context.stop(self)
    12. }
    13. }

    创建计算结果监听者

    监听者很简单,当它接收到从 Master发来的PiApproximation ,就将结果打印出来并关闭整个 Actor系统

    1. class Listener extends Actor {
    2. def receive = {
    3. case PiApproximation(pi, duration)
    4. println(" Pi approximation: %s Calculation time: %s"
    5. .format(pi, duration))
    6. context.system.shutdown()
    7. }
    8. }

    启动计算

    现在只剩下实现启动和运行计算的执行者了。我们创建一个调用 Pi的对象, 这里我们可以继承Scala中的 Apptrait, 这个trait使我们能够在命令行上直接运行这个应用.

    Pi 对象是我们的actor和消息的很好的容器。所以我们把它们都放在这儿。我们还创建一个 calculate 方法来启动  actor 并等待它结束:

    1. object Pi extends App {
    2.  
    3. calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000)
    4.  
    5. // actors and messages ...
    6.  
    7. def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) {
    8. // Create an Akka system
    9. val system = ActorSystem("PiSystem")
    10.  
    11. // create the result listener, which will print the result and shutdown the system
    12. val listener = system.actorOf(Props[Listener], name = "listener")
    13.  
    14. // create the master
    15. val master = system.actorOf(Props(new Master(
    16. nrOfWorkers, nrOfMessages, nrOfElements, listener)),
    17. name = "master")
    18.  
    19. // start the calculation
    20. master ! Calculate
    21.  
    22. }
    23. }

    以上的 calculate 方法创建一个 Actor系统,这是包括所有创建出的actor的 “上下文”。 如何在容器中创建actor的例子在calculate方法的 ‘system.actorOf(...)’ 这一行。 这里我们创建两个顶级actor. 如果你是在一个actor上下文(i.e. 在一个创建其它actor的actor中),你应该使用 context.actorOf(...). 这在以上的主actor代码中有所体现。

    好了,终于完成了。

    但是在打包和运行之前,让我们看看完整的代码,包括package定义,import:

    1. /**
    2. * Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
    3. */
    4. package akka.tutorial.first.scala
    5.  
    6. import akka.actor._
    7. import akka.routing.RoundRobinRouter
    8. import akka.util.Duration
    9. import akka.util.duration._
    10.  
    11. object Pi extends App {
    12.  
    13. calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000)
    14.  
    15. sealed trait PiMessage
    16. case object Calculate extends PiMessage
    17. case class Work(start: Int, nrOfElements: Int) extends PiMessage
    18. case class Result(value: Double) extends PiMessage
    19. case class PiApproximation(pi: Double, duration: Duration)
    20.  
    21. class Worker extends Actor {
    22.  
    23. def calculatePiFor(start: Int, nrOfElements: Int): Double = {
    24. var acc = 0.0
    25. for (i start until (start + nrOfElements))
    26. acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1)
    27. acc
    28. }
    29.  
    30. def receive = {
    31. case Work(start, nrOfElements)
    32. sender ! Result(calculatePiFor(start, nrOfElements)) // perform the work
    33. }
    34. }
    35.  
    36. class Master(nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, listener: ActorRef)
    37. extends Actor {
    38.  
    39. var pi: Double = _
    40. var nrOfResults: Int = _
    41. val start: Long = System.currentTimeMillis
    42.  
    43. val workerRouter = context.actorOf(
    44. Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), name = "workerRouter")
    45.  
    46. def receive = {
    47. case Calculate
    48. for (i 0 until nrOfMessages) workerRouter ! Work(i * nrOfElements, nrOfElements)
    49. case Result(value)
    50. pi += value
    51. nrOfResults += 1
    52. if (nrOfResults == nrOfMessages) {
    53. // Send the result to the listener
    54. listener ! PiApproximation(pi, duration = (System.currentTimeMillis - start).millis)
    55. // Stops this actor and all its supervised children
    56. context.stop(self)
    57. }
    58. }
    59.  
    60. }
    61.  
    62. class Listener extends Actor {
    63. def receive = {
    64. case PiApproximation(pi, duration)
    65. println(" Pi approximation: %s Calculation time: %s"
    66. .format(pi, duration))
    67. context.system.shutdown()
    68. }
    69. }
    70.  
    71.  
    72. def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) {
    73. // Create an Akka system
    74. val system = ActorSystem("PiSystem")
    75.  
    76. // create the result listener, which will print the result and shutdown the system
    77. val listener = system.actorOf(Props[Listener], name = "listener")
    78.  
    79. // create the master
    80. val master = system.actorOf(Props(new Master(
    81. nrOfWorkers, nrOfMessages, nrOfElements, listener)),
    82. name = "master")
    83.  
    84. // start the calculation
    85. master ! Calculate
    86.  
    87. }
    88. }

    作为命令行程序运行

    如果你是手动输入(或者拷贝粘贴)指南中的代码到 $AKKA_HOME/akka-tutorials/akka-tutorial-first/src/main/scala/akka/tutorial/first/scala/Pi.scala 那么现在轮到你了. 开启一个shell,并进入akka安装 (cd $AKKA_HOME).

    首先我们需要编译源码文件。使用Scala编译器 scalac. 我们的应用依赖于 akka-actor-2.0.jar JAR包 , 所以编译时将它加入到编译器的classpath中.

    Linux/Unix/Mac 系统:

    1. $ scalac -cp lib/akka/akka-actor-2.0.jar Pi.scala

    Windows 系统:

    1. C:Usersjbonersrcakkaakka-2.0> scalac -cp libakkaakka-actor-2.0.jar Pi.scala

    编译完就可以运行了。使用 java 来运行但是同样,我们要先将 akka-actor-2.0.jar JAR 包加入到 classpath, 而这一次还需要添加Scala运行时库 scala-library.jar 和我们自己的代码编译出的class.

    Linux/Unix/Mac 系统:

    1. $ java
    2. -cp lib/scala-library.jar:lib/akka/akka-actor-2.0.jar:.
    3. akka.tutorial.first.scala.Pi
    4.  
    5. Pi approximation: 3.1415926435897883
    6. Calculation time: 359 millis

    Windows 系统:

    1. C:Usersjbonersrcakkaakka-2.0> java
    2. -cp lib/scala-library.jar;libakkaakka-actor-2.0.jar;.
    3. akka.tutorial.first.scala.Pi
    4.  
    5. Pi approximation: 3.1415926435897883
    6. Calculation time: 359 millis

    Ok!它能跑了。

    在SBT中运行

    如果你使用SBT,那么可以直接在SBT中运行本程序。先进行编译:

    Linux/Unix/Mac 系统:

    1. $ sbt
    2. > compile
    3. ...

    Windows 系统:

    1. C:Usersjbonersrcakkaakka-2.0> sbt
    2. > compile
    3. ...

    以上完成后直接在SBT中运行:

    1. > run
    2. ...
    3. Pi approximation: 3.1415926435897883
    4. Calculation time: 359 millis

    Ok!它能跑了。

    从外部修改配置 (可选)

    示例项目的resources目录下包含一个 application.conf文件:

    1. akka.actor.deployment {
    2. /master/workerRouter {
    3. # 取消下面两行的注释来修改计算过程,使用10个工作actor,而不是4个:
    4. #router = round-robin
    5. #nr-of-instances = 10
    6. }
    7. }

    如果你取消那两行的注释,你应该会看到性能上的变化,基本上应该是更好的性能(你可能需要增加代码中的消息数量来延长应用的运行时间)。需要提醒注意的是修改的配置只在给出了路由器类型的时候才有效,所以仅取消nr-of-instances 的注释将不起作用; 参阅 Routing (Scala) 了解细节.

    注意

    确保 application.conf 在运行应用的classpath上。如果在SBT中运行,那么这条件应该已经满足了,否则你需要把包含该文件的目录加入到jvm的 -classpath 参数.

    总结

    我们已经学习了如何创建第一个Akka项目,使用Akka actor来扩展到多核cpu上(也称为垂直扩展),为cpu密集型计算进行加速。我们还学习了在命令行上或在SBT中编译和运行Akka项目的方法。

    如果你有一个多核的电脑,我建议你通过修改 nrOfWorkers来尝试不同数量的工作actor来观察性能上的改进。

    正因为当初对未来做了太多的憧憬,所以对现在的自己尤其失望。生命中曾经有过的所有灿烂,终究都需要用寂寞来偿还。
  • 相关阅读:
    CentOS上安装Mysql+PHP-fpm+Nginx
    CentOS查看端口
    QTP卷土重来之一录制脚本
    Windows Application 自动化测试之QTP卷土重来
    JAVA Appium自动化测试入门
    iOS自动化遇到坑的解决方式
    将一个字符串形式的字典转化为字典
    【python】接口测试中的异步调用
    【python】接口自动化测试中,如何校验json返回数据的格式是否正确
    【python】接口自动化测试中,json解析神器jsonpath
  • 原文地址:https://www.cnblogs.com/candlia/p/11920334.html
Copyright © 2011-2022 走看看