zoukankan      html  css  js  c++  java
  • akka cluster 初体验

    cluster 配置

    akka {
      actor {
        provider = "akka.cluster.ClusterActorRefProvider"
      }
      remote {
        log-remote-lifecycle-events = off
        enabled-transports = ["akka.remote.netty.tcp"]
        netty.tcp {
          hostname = "127.0.0.1"
          port = 0
        }
      }
    
      cluster {
        seed-nodes = [
          "akka.tcp://ClusterSystem@127.0.0.1:2551",
          "akka.tcp://ClusterSystem@127.0.0.1:2552"]
    
        auto-down-unreachable-after = 10s
      }
    
      persistence {
        journal.plugin = "akka.persistence.journal.leveldb-shared"
        journal.leveldb-shared.store {
          # DO NOT USE 'native = off' IN PRODUCTION !!!
          native = off
          dir = "target/shared-journal"
        }
        snapshot-store.local.dir = "target/snapshots"
      }
    
      log-dead-letters = off
    }
    

      

    actor.provider 设定选取 clusterActorRefProvider,在 IDE 中该 String 可以跳转到 ClusterActorRefProvider,从程序的注释来看,actorRef provider 其实并不是说 actor 是怎么提供的,它是为了引入 cluster extension,并自动启动 cluster 

    i.e. the cluster will automatically be started when the 'ClusterActorRefProvider' is used.

    创建三个 actorSystem 组成 cluster

      def main(args: Array[String]): Unit = {
        if (args.isEmpty)
          startup(Seq("2551", "2552", "0"))
        else
          startup(args)
      }
    
      def startup(ports: Seq[String]): Unit = {
        ports foreach { port =>
          // Override the configuration of the port
          val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
            withFallback(ConfigFactory.load())
    
          // Create an Akka system
          val system = ActorSystem("ClusterSystem", config)
    
          system.actorOf(ClusterSingletonManager.props(Master.props(Duration(99, "second")), "active",
            PoisonPill, None), "master")
    
        }
      }
    

      

    创建 actorSystem 时的 config 重写了 akka.remote.netty.tcp.port,因为默认的配置只有 port = 0 这个选项。因为三个 cluster 都在本机启动,所以 hostname 不需要额外声明,重用 application.conf 中的 127.0.0.1 

    另外,ActorSystem 的名字必须统一,都是 ClusterSystem,这是在 application.conf 中的 seed-nodes 中声明的,它是 cluster 的 id。

    我猜,当 seed-nodes 都挂掉了,新的 actorSystem 应该就无法加入 cluster 了,因为光靠 cluster id 已经无法找到组织了

    object Master {
      val ResultsTopic = "results"
    
      def props(workTimeout: FiniteDuration): Props =
        Props(classOf[Master], workTimeout)
    
      case object Job
    
      case object ParentGreetings
    
    }
    
    class Master(workTimeout: FiniteDuration) extends Actor with ActorLogging {
    
      context.system.scheduler.schedule(Duration(10, "second"), Duration(5, "second"), self, Job)
    
      val timeout: Timeout = 10 second
    
      override def receive: Receive = {
        case Job =>
    
          val info = ClusterProtocol.selfInfo(self)
    
          log.info(info._1 + ": " + info._2 + ": " + info._3)
    
          context.system.actorSelection("user/master/active").resolveOne(4 second).map(actor => actor ! ParentGreetings)
    
        case ParentGreetings =>
          log.info("greetings from parent")
    
      }
    }
    

      

    上面是 cluster singleton actor 的实现,它有两件事要做,一是打印出自己的路径,而是找到自己。需要注意的是,resolveOne 不能 await(阻塞式的等),否者会报异常,actor not found

    上面的代码实现了一个简单的 cluster singleton,具体的表现是,当一个 cluster 有多个 actorSystem 时,当一个 actorSystem 挂掉时,master actor 会继续提供服务,且此 actor 的 instance 有且只有一个。

    sbt "run-main packageName.mainName portNum" 可以启动 actorSystem 在端口 portNum 上,不加 端口号,会一下启动三个 actorSystem,但是不方便模拟一个 actorSystem 挂掉的情况。

  • 相关阅读:
    背水一战 Windows 10 (61)
    背水一战 Windows 10 (60)
    背水一战 Windows 10 (59)
    背水一战 Windows 10 (58)
    背水一战 Windows 10 (57)
    背水一战 Windows 10 (56)
    背水一战 Windows 10 (55)
    背水一战 Windows 10 (54)
    背水一战 Windows 10 (53)
    背水一战 Windows 10 (52)
  • 原文地址:https://www.cnblogs.com/xinsheng/p/4615515.html
Copyright © 2011-2022 走看看