package org.test.scala.akka.pi6 import com.typesafe.config.ConfigFactory import akka.actor.ActorSystem import akka.actor.Props import akka.routing.RoundRobinPool import akka.actor.Actor import akka.actor.ActorRef import scala.concurrent.Await import akka.remote.AssociationEvent import akka.remote.AssociatedEvent import akka.remote.DisassociatedEvent import akka.remote.RemotingLifecycleEvent object Pi6 { def main(args: Array[String]): Unit = { val map = new java.util.HashMap[String, String]() map.put("akka.remote.netty.tcp.hostname", "localhost") map.put("akka.remote.netty.tcp.port", "6666") // Toggles whether threads created by this ActorSystem should be daemons or not // default: daemonic = off map.put("akka.daemonic", "off") // FQCN of the ActorRefProvider to be used; // default: provider = "akka.actor.LocalActorRefProvider" // another one is akka.remote.RemoteActorRefProvider in the akka-remote bundle. map.put("akka.actor.provider", "akka.remote.RemoteActorRefProvider") val config = ConfigFactory.parseMap(map) val root = ActorSystem("Pi6-akka", config) println(s"root[$root]") val listener = root.actorOf(Props[Listener], "listener") val master = root.actorOf(Props(new Master(10, 10, 10)), "master") master ! Calculate } } /** * 创建主actor. 传递给它三个整数变量: * nrOfWorkers – 定义我们会启动多少工作actor * nrOfMessages – 定义会有多少整数段发送给工作actor * nrOfElements – 定义发送给工作actor的每个整数段的大小 */ class Master(nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int) extends Actor { var pi: Double = _ var nrOfResults: Int = _ val start: Long = System.currentTimeMillis() //创建一个round-robin的路由器来简化将工作平均地分配给工作actor们的过程 val workerRouter = context.actorOf(Props[Worker].withRouter(RoundRobinPool(nrOfWorkers)), name = "workerRouter") implicit val to = akka.util.Timeout.apply(10, java.util.concurrent.TimeUnit.SECONDS) val future = context.actorSelection("akka.tcp://Pi6-akka@localhost:6666/user/listener").resolveOne val listener = Await.result(future, to.duration) //s"Actor[${path}#${path.uid}]" println(s"actor[listener]=[${listener.toString()}]") println(s"actor[listener] path =[${listener.path}]") def receive = { case Calculate => for (i <- 0 until nrOfMessages) workerRouter ! Work(i * nrOfElements, nrOfElements) case Result(value) => pi += value nrOfResults += 1 if (nrOfResults == nrOfMessages) { // Send the result to the listener listener ! PiApproximation(pi, duration = System.currentTimeMillis - start) // Stops this actor and all its supervised children context.stop(self) } } } //监听者很简单,当它接收到从 Master发来的PiApproximation ,就将结果打印出来并关闭整个 Actor系统。 class Listener extends Actor { context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) def receive = { case a: DisassociatedEvent => { println(s"DisassociatedEvent message from [$sender]") println(s"$a") } case a: AssociatedEvent => { println(s"AssociatedEvent message from [$sender]") println(s"$a") } case PiApproximation(pi, duration) ⇒ println(" Pi approximation: %s Calculation time: %s" .format(pi, duration)) context.system.shutdown() } } //创建工作 actor class Worker extends Actor { println("creating worker...") def calculatePiFor(start: Int, nrOfElements: Int): Double = { var acc = 0.0 for (i ← start until (start + nrOfElements)) acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1) acc } /** * 方法是混入 Actor trait 并定义其中的 receive 方法. * receive 方法定义我们的消息处理器。我们让它能够处理 Work 消息,所以添加一个针对这种消息的处理器: */ def receive = { case Work(start, nrOfElements) => { // 在Akka里,sender引用是与消息一起隐式发送的, // 这样接收者可以随时回复或将sender引用保存起来以备将来使用。 sender ! Result(calculatePiFor(start, nrOfElements)) } } } /** * 我们要做的设计是由一个 主 actor来启动整个计算过程,创建一组 工作 actor. * 整个工作会被分割成具体的小段, 各小段会以round-robin的方式发送到不同的工作 actor. * 主actor等待所有的工作actor完全各自的工作并将其回送的结果进行汇总。 * 当计算完成以后,主actor将结果发送给 监听器 acotr, 由它来输出结果。 * 在这个基础上, 现在让我们创建在这个系统中流动的消息。我们需要4种不同的消息: * * Calculate – 发送给 主 actor 来启动计算。 * Work – 从 主 actor 发送给各 工作 actor,包含工作分配的内容。 * Result – 从 工作 actors 发送给 主 actor,包含工作actor的计算结果。 * PiApproximation – 从 主 actor发送给 监听器 actor,包含pi的最终计算结果和整个计算耗费的时间。 * * 发送给actor的消息应该永远是不可变的,以避免共享可变状态。 * 在scala里我们有 ‘case classes’ 来构造完美的消息。现在让我们用case class创建3种消息。 * 我们还为消息们创建一个通用的基础trait(定义为sealed以防止在我们不可控的地方创建消息) */ sealed trait PiMessage case object Calculate extends PiMessage case class Work(start: Int, nrOfElements: Int) extends PiMessage case class Result(value: Double) extends PiMessage case class PiApproximation(pi: Double, duration: Long)
运行结果如下
[INFO] [11/13/2015 17:03:15.182] [main] [Remoting] Starting remoting [INFO] [11/13/2015 17:03:15.476] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://Pi6-akka@localhost:6666] [INFO] [11/13/2015 17:03:15.479] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://Pi6-akka@localhost:6666] root[akka://Pi6-akka] creating worker... creating worker... creating worker... creating worker... creating worker... creating worker... creating worker... creating worker... creating worker... creating worker... actor[listener]=[Actor[akka://Pi6-akka/user/listener#-2138645838]] actor[listener] path =[akka://Pi6-akka/user/listener] Pi approximation: 3.131592903558554 Calculation time: 45 [INFO] [11/13/2015 17:03:15.559] [Pi6-akka-akka.actor.default-dispatcher-19] [akka://Pi6-akka/user/master/workerRouter/$h] Message [akka.dispatch.sysmsg.Terminate] from Actor[akka://Pi6-akka/user/master/workerRouter/$h#894387185] to Actor[akka://Pi6-akka/user/master/workerRouter/$h#894387185] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [11/13/2015 17:03:15.559] [Pi6-akka-akka.actor.default-dispatcher-19] [akka://Pi6-akka/user/master/workerRouter/$i] Message [akka.dispatch.sysmsg.Terminate] from Actor[akka://Pi6-akka/user/master/workerRouter/$i#1288797532] to Actor[akka://Pi6-akka/user/master/workerRouter/$i#1288797532] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [11/13/2015 17:03:15.559] [Pi6-akka-akka.actor.default-dispatcher-19] [akka://Pi6-akka/user/master/workerRouter/$b] Message [akka.dispatch.sysmsg.Terminate] from Actor[akka://Pi6-akka/user/master/workerRouter/$b#-1299114866] to Actor[akka://Pi6-akka/user/master/workerRouter/$b#-1299114866] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [11/13/2015 17:03:15.560] [Pi6-akka-akka.actor.default-dispatcher-17] [akka://Pi6-akka/user/master/workerRouter/$d] Message [akka.dispatch.sysmsg.Terminate] from Actor[akka://Pi6-akka/user/master/workerRouter/$d#898681021] to Actor[akka://Pi6-akka/user/master/workerRouter/$d#898681021] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [11/13/2015 17:03:15.560] [Pi6-akka-akka.actor.default-dispatcher-17] [akka://Pi6-akka/user/master/workerRouter/$e] Message [akka.dispatch.sysmsg.Terminate] from Actor[akka://Pi6-akka/user/master/workerRouter/$e#2048474849] to Actor[akka://Pi6-akka/user/master/workerRouter/$e#2048474849] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [11/13/2015 17:03:15.560] [Pi6-akka-akka.actor.default-dispatcher-17] [akka://Pi6-akka/user/master/workerRouter/$j] Message [akka.dispatch.sysmsg.Terminate] from Actor[akka://Pi6-akka/user/master/workerRouter/$j#-456361154] to Actor[akka://Pi6-akka/user/master/workerRouter/$j#-456361154] was not delivered. [6] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [11/13/2015 17:03:15.560] [Pi6-akka-akka.actor.default-dispatcher-17] [akka://Pi6-akka/user/master/workerRouter/$g] Message [akka.dispatch.sysmsg.Terminate] from Actor[akka://Pi6-akka/user/master/workerRouter/$g#-908333109] to Actor[akka://Pi6-akka/user/master/workerRouter/$g#-908333109] was not delivered. [7] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [11/13/2015 17:03:15.561] [Pi6-akka-akka.actor.default-dispatcher-21] [akka://Pi6-akka/user/master/workerRouter/$c] Message [akka.dispatch.sysmsg.Terminate] from Actor[akka://Pi6-akka/user/master/workerRouter/$c#-369609720] to Actor[akka://Pi6-akka/user/master/workerRouter/$c#-369609720] was not delivered. [8] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [11/13/2015 17:03:15.561] [Pi6-akka-akka.actor.default-dispatcher-21] [akka://Pi6-akka/user/master/workerRouter/$f] Message [akka.dispatch.sysmsg.Terminate] from Actor[akka://Pi6-akka/user/master/workerRouter/$f#-1227293729] to Actor[akka://Pi6-akka/user/master/workerRouter/$f#-1227293729] was not delivered. [9] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [11/13/2015 17:03:15.561] [Pi6-akka-akka.actor.default-dispatcher-21] [akka://Pi6-akka/user/master/workerRouter/$a] Message [akka.dispatch.sysmsg.Terminate] from Actor[akka://Pi6-akka/user/master/workerRouter/$a#1361034595] to Actor[akka://Pi6-akka/user/master/workerRouter/$a#1361034595] was not delivered. [10] dead letters encountered, no more dead letters will be logged. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [11/13/2015 17:03:15.572] [Pi6-akka-akka.remote.default-remote-dispatcher-6] [akka.tcp://Pi6-akka@localhost:6666/system/remoting-terminator] Shutting down remote daemon. [INFO] [11/13/2015 17:03:15.574] [Pi6-akka-akka.remote.default-remote-dispatcher-6] [akka.tcp://Pi6-akka@localhost:6666/system/remoting-terminator] Remote daemon shut down; proceeding with flushing remote transports. [INFO] [11/13/2015 17:03:15.611] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down [INFO] [11/13/2015 17:03:15.612] [Pi6-akka-akka.remote.default-remote-dispatcher-5] [akka.tcp://Pi6-akka@localhost:6666/system/remoting-terminator] Remoting shut down.
参考 http://www.gtan.com/akka_doc/intro/getting-started-first-scala.html