zoukankan      html  css  js  c++  java
  • akka 入门例子

    package org.test.scala.akka.pi6
    
    import com.typesafe.config.ConfigFactory
    import akka.actor.ActorSystem
    import akka.actor.Props
    import akka.routing.RoundRobinPool
    import akka.actor.Actor
    import akka.actor.ActorRef
    import scala.concurrent.Await
    import akka.remote.AssociationEvent
    import akka.remote.AssociatedEvent
    import akka.remote.DisassociatedEvent
    import akka.remote.RemotingLifecycleEvent
    
    object Pi6 {
    
      def main(args: Array[String]): Unit = {
    
        val map = new java.util.HashMap[String, String]()
        map.put("akka.remote.netty.tcp.hostname", "localhost")
        map.put("akka.remote.netty.tcp.port", "6666")
        // Toggles whether threads created by this ActorSystem should be daemons or not
        // default: daemonic = off
        map.put("akka.daemonic", "off")
    
        // FQCN of the ActorRefProvider to be used;
        // default: provider = "akka.actor.LocalActorRefProvider"
        // another one is akka.remote.RemoteActorRefProvider in the akka-remote bundle.
        map.put("akka.actor.provider", "akka.remote.RemoteActorRefProvider")
        val config = ConfigFactory.parseMap(map)
        val root = ActorSystem("Pi6-akka", config)
        println(s"root[$root]")
    
        val listener = root.actorOf(Props[Listener], "listener")
        val master = root.actorOf(Props(new Master(10, 10, 10)), "master")
        master ! Calculate
      }
    
    }
    
    /**
     * 创建主actor. 传递给它三个整数变量:
     * nrOfWorkers – 定义我们会启动多少工作actor
     * nrOfMessages – 定义会有多少整数段发送给工作actor
     * nrOfElements – 定义发送给工作actor的每个整数段的大小
     */
    class Master(nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int)
      extends Actor {
      var pi: Double = _
      var nrOfResults: Int = _
      val start: Long = System.currentTimeMillis()
    
      //创建一个round-robin的路由器来简化将工作平均地分配给工作actor们的过程
      val workerRouter = context.actorOf(Props[Worker].withRouter(RoundRobinPool(nrOfWorkers)), name = "workerRouter")
    
      implicit val to = akka.util.Timeout.apply(10, java.util.concurrent.TimeUnit.SECONDS)
      val future = context.actorSelection("akka.tcp://Pi6-akka@localhost:6666/user/listener").resolveOne
      val listener = Await.result(future, to.duration)
      //s"Actor[${path}#${path.uid}]"
      println(s"actor[listener]=[${listener.toString()}]")
      println(s"actor[listener] path =[${listener.path}]")
    
      def receive = {
        case Calculate =>
          for (i <- 0 until nrOfMessages) workerRouter ! Work(i * nrOfElements, nrOfElements)
        case Result(value) =>
          pi += value
          nrOfResults += 1
          if (nrOfResults == nrOfMessages) {
            // Send the result to the listener
            listener ! PiApproximation(pi, duration = System.currentTimeMillis - start)
            // Stops this actor and all its supervised children
            context.stop(self)
          }
      }
    }
    
    //监听者很简单,当它接收到从 Master发来的PiApproximation ,就将结果打印出来并关闭整个 Actor系统。
    class Listener extends Actor {
    
      context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
    
      def receive = {
        case a: DisassociatedEvent => {
          println(s"DisassociatedEvent message from [$sender]")
          println(s"$a")
        }
        case a: AssociatedEvent => {
          println(s"AssociatedEvent message from [$sender]")
          println(s"$a")
        }
        case PiApproximation(pi, duration) ⇒
          println("
    	Pi approximation: 		%s
    	Calculation time: 	%s"
            .format(pi, duration))
          context.system.shutdown()
      }
    }
    
    //创建工作 actor
    class Worker extends Actor {
    
      println("creating worker...")
    
      def calculatePiFor(start: Int, nrOfElements: Int): Double = {
        var acc = 0.0
        for (i ← start until (start + nrOfElements))
          acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1)
        acc
      }
    
      /**
       *  方法是混入 Actor trait 并定义其中的 receive 方法.
       *  receive 方法定义我们的消息处理器。我们让它能够处理 Work 消息,所以添加一个针对这种消息的处理器:
       */
      def receive = {
        case Work(start, nrOfElements) => {
          // 在Akka里,sender引用是与消息一起隐式发送的,
          // 这样接收者可以随时回复或将sender引用保存起来以备将来使用。
          sender ! Result(calculatePiFor(start, nrOfElements))
        }
      }
    }
    
    /**
     * 我们要做的设计是由一个 主 actor来启动整个计算过程,创建一组 工作 actor.
     * 整个工作会被分割成具体的小段, 各小段会以round-robin的方式发送到不同的工作 actor.
     * 主actor等待所有的工作actor完全各自的工作并将其回送的结果进行汇总。
     * 当计算完成以后,主actor将结果发送给 监听器 acotr, 由它来输出结果。
     * 在这个基础上, 现在让我们创建在这个系统中流动的消息。我们需要4种不同的消息:
     *
     * Calculate – 发送给 主 actor 来启动计算。
     * Work – 从 主 actor 发送给各 工作 actor,包含工作分配的内容。
     * Result – 从 工作 actors 发送给 主 actor,包含工作actor的计算结果。
     * PiApproximation – 从 主 actor发送给 监听器 actor,包含pi的最终计算结果和整个计算耗费的时间。
     *
     * 发送给actor的消息应该永远是不可变的,以避免共享可变状态。
     * 在scala里我们有 ‘case classes’ 来构造完美的消息。现在让我们用case class创建3种消息。
     * 我们还为消息们创建一个通用的基础trait(定义为sealed以防止在我们不可控的地方创建消息)
     */
    
    sealed trait PiMessage
    case object Calculate extends PiMessage
    case class Work(start: Int, nrOfElements: Int) extends PiMessage
    case class Result(value: Double) extends PiMessage
    case class PiApproximation(pi: Double, duration: Long)
    
    
    

    运行结果如下

    [INFO] [11/13/2015 17:03:15.182] [main] [Remoting] Starting remoting
    [INFO] [11/13/2015 17:03:15.476] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://Pi6-akka@localhost:6666]
    [INFO] [11/13/2015 17:03:15.479] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://Pi6-akka@localhost:6666]
    root[akka://Pi6-akka]
    creating worker...
    creating worker...
    creating worker...
    creating worker...
    creating worker...
    creating worker...
    creating worker...
    creating worker...
    creating worker...
    creating worker...
    actor[listener]=[Actor[akka://Pi6-akka/user/listener#-2138645838]]
    actor[listener] path =[akka://Pi6-akka/user/listener]
    
    	Pi approximation: 		3.131592903558554
    	Calculation time: 	45
    [INFO] [11/13/2015 17:03:15.559] [Pi6-akka-akka.actor.default-dispatcher-19] [akka://Pi6-akka/user/master/workerRouter/$h] Message [akka.dispatch.sysmsg.Terminate] from Actor[akka://Pi6-akka/user/master/workerRouter/$h#894387185] to Actor[akka://Pi6-akka/user/master/workerRouter/$h#894387185] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
    [INFO] [11/13/2015 17:03:15.559] [Pi6-akka-akka.actor.default-dispatcher-19] [akka://Pi6-akka/user/master/workerRouter/$i] Message [akka.dispatch.sysmsg.Terminate] from Actor[akka://Pi6-akka/user/master/workerRouter/$i#1288797532] to Actor[akka://Pi6-akka/user/master/workerRouter/$i#1288797532] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
    [INFO] [11/13/2015 17:03:15.559] [Pi6-akka-akka.actor.default-dispatcher-19] [akka://Pi6-akka/user/master/workerRouter/$b] Message [akka.dispatch.sysmsg.Terminate] from Actor[akka://Pi6-akka/user/master/workerRouter/$b#-1299114866] to Actor[akka://Pi6-akka/user/master/workerRouter/$b#-1299114866] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
    [INFO] [11/13/2015 17:03:15.560] [Pi6-akka-akka.actor.default-dispatcher-17] [akka://Pi6-akka/user/master/workerRouter/$d] Message [akka.dispatch.sysmsg.Terminate] from Actor[akka://Pi6-akka/user/master/workerRouter/$d#898681021] to Actor[akka://Pi6-akka/user/master/workerRouter/$d#898681021] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
    [INFO] [11/13/2015 17:03:15.560] [Pi6-akka-akka.actor.default-dispatcher-17] [akka://Pi6-akka/user/master/workerRouter/$e] Message [akka.dispatch.sysmsg.Terminate] from Actor[akka://Pi6-akka/user/master/workerRouter/$e#2048474849] to Actor[akka://Pi6-akka/user/master/workerRouter/$e#2048474849] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
    [INFO] [11/13/2015 17:03:15.560] [Pi6-akka-akka.actor.default-dispatcher-17] [akka://Pi6-akka/user/master/workerRouter/$j] Message [akka.dispatch.sysmsg.Terminate] from Actor[akka://Pi6-akka/user/master/workerRouter/$j#-456361154] to Actor[akka://Pi6-akka/user/master/workerRouter/$j#-456361154] was not delivered. [6] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
    [INFO] [11/13/2015 17:03:15.560] [Pi6-akka-akka.actor.default-dispatcher-17] [akka://Pi6-akka/user/master/workerRouter/$g] Message [akka.dispatch.sysmsg.Terminate] from Actor[akka://Pi6-akka/user/master/workerRouter/$g#-908333109] to Actor[akka://Pi6-akka/user/master/workerRouter/$g#-908333109] was not delivered. [7] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
    [INFO] [11/13/2015 17:03:15.561] [Pi6-akka-akka.actor.default-dispatcher-21] [akka://Pi6-akka/user/master/workerRouter/$c] Message [akka.dispatch.sysmsg.Terminate] from Actor[akka://Pi6-akka/user/master/workerRouter/$c#-369609720] to Actor[akka://Pi6-akka/user/master/workerRouter/$c#-369609720] was not delivered. [8] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
    [INFO] [11/13/2015 17:03:15.561] [Pi6-akka-akka.actor.default-dispatcher-21] [akka://Pi6-akka/user/master/workerRouter/$f] Message [akka.dispatch.sysmsg.Terminate] from Actor[akka://Pi6-akka/user/master/workerRouter/$f#-1227293729] to Actor[akka://Pi6-akka/user/master/workerRouter/$f#-1227293729] was not delivered. [9] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
    [INFO] [11/13/2015 17:03:15.561] [Pi6-akka-akka.actor.default-dispatcher-21] [akka://Pi6-akka/user/master/workerRouter/$a] Message [akka.dispatch.sysmsg.Terminate] from Actor[akka://Pi6-akka/user/master/workerRouter/$a#1361034595] to Actor[akka://Pi6-akka/user/master/workerRouter/$a#1361034595] was not delivered. [10] dead letters encountered, no more dead letters will be logged. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
    [INFO] [11/13/2015 17:03:15.572] [Pi6-akka-akka.remote.default-remote-dispatcher-6] [akka.tcp://Pi6-akka@localhost:6666/system/remoting-terminator] Shutting down remote daemon.
    [INFO] [11/13/2015 17:03:15.574] [Pi6-akka-akka.remote.default-remote-dispatcher-6] [akka.tcp://Pi6-akka@localhost:6666/system/remoting-terminator] Remote daemon shut down; proceeding with flushing remote transports.
    [INFO] [11/13/2015 17:03:15.611] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down
    [INFO] [11/13/2015 17:03:15.612] [Pi6-akka-akka.remote.default-remote-dispatcher-5] [akka.tcp://Pi6-akka@localhost:6666/system/remoting-terminator] Remoting shut down.
    

    参考 http://www.gtan.com/akka_doc/intro/getting-started-first-scala.html

  • 相关阅读:
    一步一步学EF系列【4、升级篇 实体与数据库的映射】live writer真坑,第4次补发
    一步一步学EF系列3【升级篇 实体与数据库的映射】
    一步一步学EF系列2【最简单的一个实例】
    一步一步学EF系列1【Fluent API的方式来处理实体与数据表之间的映射关系】
    MVC5 Entity Framework学习之创建复杂的数据模型
    Demo源码放送:打通B/S与C/S !让HTML5 WebSocket与.NET Socket公用同一个服务端!
    动手写一个Remoting接口测试工具(附源码下载)
    通信服务器群集——跨服务器通信Demo(源码)
    轻量级通信引擎StriveEngine —— C/S通信demo(2) —— 使用二进制协议 (附源码)
    PAT A1011 World Cup Betting(20)
  • 原文地址:https://www.cnblogs.com/ihongyan/p/4962635.html
Copyright © 2011-2022 走看看