cluster 配置
akka { actor { provider = "akka.cluster.ClusterActorRefProvider" } remote { log-remote-lifecycle-events = off enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 0 } } cluster { seed-nodes = [ "akka.tcp://ClusterSystem@127.0.0.1:2551", "akka.tcp://ClusterSystem@127.0.0.1:2552"] auto-down-unreachable-after = 10s } persistence { journal.plugin = "akka.persistence.journal.leveldb-shared" journal.leveldb-shared.store { # DO NOT USE 'native = off' IN PRODUCTION !!! native = off dir = "target/shared-journal" } snapshot-store.local.dir = "target/snapshots" } log-dead-letters = off }
actor.provider 设定选取 clusterActorRefProvider,在 IDE 中该 String 可以跳转到 ClusterActorRefProvider,从程序的注释来看,actorRef provider 其实并不是说 actor 是怎么提供的,它是为了引入 cluster extension,并自动启动 cluster
i.e. the cluster will automatically be started when the 'ClusterActorRefProvider' is used.
创建三个 actorSystem 组成 cluster
def main(args: Array[String]): Unit = { if (args.isEmpty) startup(Seq("2551", "2552", "0")) else startup(args) } def startup(ports: Seq[String]): Unit = { ports foreach { port => // Override the configuration of the port val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port). withFallback(ConfigFactory.load()) // Create an Akka system val system = ActorSystem("ClusterSystem", config) system.actorOf(ClusterSingletonManager.props(Master.props(Duration(99, "second")), "active", PoisonPill, None), "master") } }
创建 actorSystem 时的 config 重写了 akka.remote.netty.tcp.port,因为默认的配置只有 port = 0 这个选项。因为三个 cluster 都在本机启动,所以 hostname 不需要额外声明,重用 application.conf 中的 127.0.0.1
另外,ActorSystem 的名字必须统一,都是 ClusterSystem,这是在 application.conf 中的 seed-nodes 中声明的,它是 cluster 的 id。
我猜,当 seed-nodes 都挂掉了,新的 actorSystem 应该就无法加入 cluster 了,因为光靠 cluster id 已经无法找到组织了
object Master { val ResultsTopic = "results" def props(workTimeout: FiniteDuration): Props = Props(classOf[Master], workTimeout) case object Job case object ParentGreetings } class Master(workTimeout: FiniteDuration) extends Actor with ActorLogging { context.system.scheduler.schedule(Duration(10, "second"), Duration(5, "second"), self, Job) val timeout: Timeout = 10 second override def receive: Receive = { case Job => val info = ClusterProtocol.selfInfo(self) log.info(info._1 + ": " + info._2 + ": " + info._3) context.system.actorSelection("user/master/active").resolveOne(4 second).map(actor => actor ! ParentGreetings) case ParentGreetings => log.info("greetings from parent") } }
上面是 cluster singleton actor 的实现,它有两件事要做,一是打印出自己的路径,而是找到自己。需要注意的是,resolveOne 不能 await(阻塞式的等),否者会报异常,actor not found
上面的代码实现了一个简单的 cluster singleton,具体的表现是,当一个 cluster 有多个 actorSystem 时,当一个 actorSystem 挂掉时,master actor 会继续提供服务,且此 actor 的 instance 有且只有一个。
sbt "run-main packageName.mainName portNum" 可以启动 actorSystem 在端口 portNum 上,不加 端口号,会一下启动三个 actorSystem,但是不方便模拟一个 actorSystem 挂掉的情况。