zoukankan      html  css  js  c++  java
  • akka cluster sharding

    cluster sharding 的目的在于提供一个框架,方便实现 DDD,虽然我至今也没搞明白 DDD 到底适用于是什么场合,但是 cluster sharding 却是我目前在做的一个 project 扩展到集群上非常需要的工具。

    sharding 要做这么几件事

    1. 对于每一个 entity,创建一个 actor。该 entity 有 Id 作为唯一标示。该 entity 的所有消息都由此 actor 来处理

    2. 该 actor 在一段时间内不工作时,会超时并 kill self

    3. 当一个集群中加入新的节点时,新的 actor 会被自动创建到新 node 上,或者老的actor 会负载均衡,迁移到新 node 上。同样的,当节点挂掉时,挂掉的 actor 会迁移到新的

    在 sharding 被提出之前,google group 和 stackoverflow 上有很多人希望有这么一个东西,那时候还是 2011 年,现在已经 2015 年了。

    sharding 的原理和用法 doc 都有比较详细的说明,下面做个小测试:

    首先是 entity actor 的定义:

    object ActionWorker {
      def props(): Props = Props(new ActionWorker)
    
      //must return s: Command
      val idExtractor: ShardRegion.IdExtractor = {
        case s: Command => (s.id, s)
      }
    
      val shardResolver: ShardRegion.ShardResolver = msg => msg match {
        case s: Command   => (math.abs(s.id.hashCode) % 100).toString
      }
    
      val shardName: String = "ActionWorker"
    }
    

      

    idExtractor 会从 message 中抽取 id,作为路由的依据,而返回值的第二项是 actor 收到的消息

    shardResolver 根据 id 确定 shard 所在的位置,哈希函数的设定方式我也没有自习研究,doc 说是 id 的十倍就可以。

    class ActionWorker extends Actor {
      val log = Logging(context.system, this)
    
      println("action worker is created")
    
      context.setReceiveTimeout(30 seconds)
    
      override def receive: Receive = {
        case Command(id, payload) =>
          val selfDesc = self.path.parent.name + "-" + self.path.name
          println("here i am, working: " + selfDesc)
    
          log.info("here i am, working: " + selfDesc)
    
        case ReceiveTimeout =>
          log.info("nothing to do, better kill myself")
    
          val selfDesc = self.path.parent.name + "-" + self.path.name
          println("here i am, working: " + selfDesc)
    
          println("nothing to do, better kill myself")
          context.stop(self)
      }
    

      

    actor 的定义没有特别之处,需要注意的是

    1. actor 收到的消息是 idExtract 的第二项,而不是 s: Command 那个东西

    2. actor 没有一个正常的途径得到自己的id,一个 workaroud 的办法是通过 self.path.name 来得到自己的id,再据此完成某些初始化操作

    worker 处理数据,生成数据的 actor 叫做 Bot

    class Bot extends Actor {
    
      val log = Logging(context.system, this)
    
      val tickTask = context.system.scheduler.schedule(3.seconds, 10.seconds, self, Command(""))
    
      def receive = create
    
      val postRegion = ClusterSharding(context.system).shardRegion(ActionWorker.shardName)
    
      val create: Receive = {
        case Command(id, payload) =>
          val postId = Random.nextInt(5).toString
          log.info("bot create new command and received: " + postId)
    
          println("new command postID = " + postId)
    
          postRegion ! Command(postId, postId)
    
      }
    }
    

      

    Bot 生成的数据要传到 worker actor,注意 postRegion 的获得方式,它首先从 actorSystem 中得到 ClusterSharding 集合(一个 actorSystem 可能会有多个 shard),然后根据 shardName 定位到唯一的 shard。最后把需要发送的消息传给 postRegin,postRegin 会完成转发。

    Main 方法的 startUp 函数

    def startup(ports: Seq[String]): Unit = {
        ports foreach { port =>
          // Override the configuration of the port
          val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
            withFallback(ConfigFactory.load())
    
          // Create an Akka system
          val system = ActorSystem("ClusterSystem", config)
    
          startupSharedJournal(system, startStore = (port == "2551"), path =
            ActorPath.fromString("akka.tcp://ClusterSystem@127.0.0.1:2551/user/store"))
    
          ClusterSharding(system).start(
            typeName = ActionWorker.shardName,
            entryProps = Some(ActionWorker.props()),
            idExtractor = ActionWorker.idExtractor,
            shardResolver = ActionWorker.shardResolver)
    
          if (port != "2551" && port != "2552")
            system.actorOf(Props[Bot], "bot")
        }
    
      }
    
      def startupSharedJournal(system: ActorSystem, startStore: Boolean, path: ActorPath): Unit = {
        // Start the shared journal on one node (don't crash this SPOF)
        // This will not be needed with a distributed journal
        if (startStore)
          system.actorOf(Props[SharedLeveldbStore], "store")
    
        // register the shared journal
        import system.dispatcher
        implicit val timeout = Timeout(15.seconds)
        val f = (system.actorSelection(path) ? Identify(None))
        f.onSuccess {
          case ActorIdentity(_, Some(ref)) => SharedLeveldbJournal.setStore(ref, system)
          case _ =>
            system.log.error("Shared journal not started at {}", path)
            system.shutdown()
        }
        f.onFailure {
          case _ =>
            system.log.error("Lookup of shared journal at {} timed out", path)
            system.shutdown()
        }
      }
    

      

    startupSharedJournal 是必须要执行的,不然 cluster 跑不起来。(后面测试了下,发现能跑起来)

    ClusterSharding 在本 actorSystem 启动,参数比较直观。

    需要注意的是:

    1. sharedJournal 是在测试情况下才用得到的,在 prod 环境下应该使用 journal

    2. cluster sharding 借助 cluster singleton 实现

  • 相关阅读:
    Linux虚拟机突然网络不能用了但是主机能ping㣈
    010商城项目:商品类目的选择——Dao,Service.Action层的分析
    009商城项目:商品类目的选择——1前端页面分析
    《深入理解Java内存模型》读书总结
    java多线程系类:JUC线程池:06之Callable和Future(转)
    Spring中,关于IOC和AOP的那些事
    程序员面试,为什么不要大谈高并发?
    Java 面试宝典!并发编程 71 道题及答案全送上!
    面试必问的并发编程知识点,你知道多少?
    程序员必知的七种并发编程模型
  • 原文地址:https://www.cnblogs.com/xinsheng/p/4615976.html
Copyright © 2011-2022 走看看