zoukankan      html  css  js  c++  java
  • akka cluster sharding source code 学习 (2/5) handle off

    一旦 shard coordinator(相当于分布式系统的 zookeeper) 启动,它就会启动一个定时器,每隔一定的时间尝试平衡一下集群中各个节点的负载,平衡的办法是把那些负载较重的 actor 移动到负载较轻的节点上。在这一点上,我以前的理解有误,我以为 shardRegion 是移动的最小单位。

    val rebalanceTask = context.system.scheduler.schedule(rebalanceInterval, rebalanceInterval, self, RebalanceTick)
    

     当 coordinator 收到 ReblanceTick 后,就开始尝试平衡系统负载

    case RebalanceTick ⇒
          if (persistentState.regions.nonEmpty) {
            val shardsFuture = allocationStrategy.rebalance(persistentState.regions, rebalanceInProgress)
            shardsFuture.value match {
              case Some(Success(shards)) ⇒
                continueRebalance(shards)
              case _ ⇒
                // continue when future is completed
                shardsFuture.map { shards ⇒ RebalanceResult(shards)
                }.recover {
                  case _ ⇒ RebalanceResult(Set.empty)
                }.pipeTo(self)
            }
          }
    

    上面的逻辑我看懂了,但是 Future 的用法没看明白。按照一般的写法,当 shardsFuture 返回 Failure 以后,应该直接执行 RebalanceResut(Set.empty).pipeTo(self),不知道为什么失败以后还要尝试等待 Future

    allocationStrategy 提供了默认的实现,也可以自定义负载均衡策略。rebalance 函数返回的是 Set(ShardId),即那些要被移动的 shards

    当 coordinator 收到 RebalanceResult 后,开始 启动 balance 逻辑

    def continueRebalance(shards: Set[ShardId]): Unit =
        shards.foreach { shard ⇒
          if (!rebalanceInProgress(shard)) {
            persistentState.shards.get(shard) match {
              case Some(rebalanceFromRegion) ⇒
                rebalanceInProgress += shard
                log.debug("Rebalance shard [{}] from [{}]", shard, rebalanceFromRegion)
                context.actorOf(rebalanceWorkerProps(shard, rebalanceFromRegion, handOffTimeout,
                  persistentState.regions.keySet ++ persistentState.regionProxies)
                  .withDispatcher(context.props.dispatcher))
              case None ⇒
                log.debug("Rebalance of non-existing shard [{}] is ignored", shard)
            }
    
          }
        }
    

    rebalanceInProcess 是一个 Set,记录正在被移动的 shard,我想,在新一轮 balance 开始时, rebalanceInProcess 为空的情况只会发生在上次 balance 还没有做完。不知道这个时候,是应该报错还是继续 balance 更好,因为 balanceStrategy 应该不会考虑吧到 上一轮 balance 还没做完这种可能性。

    然后, coordinator 启动 rebalanceWorker,也就是上篇提到的替身 actor。

    private[akka] class RebalanceWorker(shard: String, from: ActorRef, handOffTimeout: FiniteDuration,
                                          regions: Set[ActorRef]) extends Actor {
        import Internal._
        regions.foreach(_ ! BeginHandOff(shard))
        var remaining = regions
    
        import context.dispatcher
        context.system.scheduler.scheduleOnce(handOffTimeout, self, ReceiveTimeout)
    
        def receive = {
          case BeginHandOffAck(`shard`) ⇒
            remaining -= sender()
            if (remaining.isEmpty) {
              from ! HandOff(shard)
              context.become(stoppingShard, discardOld = true)
            }
          case ReceiveTimeout ⇒ done(ok = false)
        }
    
        def stoppingShard: Receive = {
          case ShardStopped(shard) ⇒ done(ok = true)
          case ReceiveTimeout      ⇒ done(ok = false)
        }
    
        def done(ok: Boolean): Unit = {
          context.parent ! RebalanceDone(shard, ok)
          context.stop(self)
        }
      }
    

    akka 的逻辑是基于消息传递的,这种代码其实是很难去读的。在 rebalanceWorker 运行时,牵扯到很多个 actor。首先是,coordinator,其次是 shardRegion,也就是 host 待迁移 shard actor 的那个 region,然后是 shard actor 本身,最后是系统里所有的 shardRegion,他们也要参与进来。写到这里,我不禁把电脑屏幕竖了起来。

    1. RebalanceWorker 首先给所有的 ShardRegion BeginHandOff 消息,告诉大家,hand off 开始,然后等待大家的回复

    2. ShardRegion 收到 BeginHandOff 后,开始更新自己的知识库,将 HostShardRegion 和 shardActor 的记忆从自己的知识库中抹去

    case BeginHandOff(shard) ⇒
          log.debug("BeginHandOff shard [{}]", shard)
          if (regionByShard.contains(shard)) {
            val regionRef = regionByShard(shard)
            val updatedShards = regions(regionRef) - shard
            if (updatedShards.isEmpty) regions -= regionRef
            else regions = regions.updated(regionRef, updatedShards)
            regionByShard -= shard
          }
          sender() ! BeginHandOffAck(shard)
    

     最后,发送 BeginHandOffAck 消息,告诉 rebalanceWorker 自己准备完毕(这些 shardRegion 以后也没事干了)

    3. 继续回到 rebalanceWorker,它发送 HandOff 告诉 Host shard actor 的 ShardRegion,你可以做自己的清理工作了。然后将自己的状态设置成 stoppingShard,等待 ShardStopped 消息,这个消息的来源有两个,一个是 HostShardRegion,另外一个是 shard actor

    4. HostShardRegion 收到 HandOff 消息后

    case msg @ HandOff(shard) ⇒
          log.debug("HandOff shard [{}]", shard)
    
          // must drop requests that came in between the BeginHandOff and now,
          // because they might be forwarded from other regions and there
          // is a risk or message re-ordering otherwise
          if (shardBuffers.contains(shard)) {
            shardBuffers -= shard
            loggedFullBufferWarning = false
          }
    
          if (shards.contains(shard)) {
            handingOff += shards(shard)
            shards(shard) forward msg
          } else
            sender() ! ShardStopped(shard)
    

     如果 HostShardRegion 已经不再含有 shard actor,那么直接返回 ShardStopped,否则 HandOff 这个 Set 加入 shard actor,并将 HandOff 传给 shard actor

    5. 又看了一遍代码,发现 shard actor 和 entity actor 又是两种东西,shard actor 存在于 entity actor 和 shard region 之间

    目前还不知道 entity actor 和 shard region 之间的关系

    def getEntity(id: EntityId): ActorRef = {
        val name = URLEncoder.encode(id, "utf-8")
        context.child(name).getOrElse {
          log.debug("Starting entity [{}] in shard [{}]", id, shardId)
    
          val a = context.watch(context.actorOf(entityProps, name))
          idByRef = idByRef.updated(a, id)
          refById = refById.updated(id, a)
          state = state.copy(state.entities + id)
          a
        }
      }
    

    从这段代码来看, shard actor 与 entity actor 是一对多的关系。

    def receiveCoordinatorMessage(msg: CoordinatorMessage): Unit = msg match {
        case HandOff(`shardId`) ⇒ handOff(sender())
        case HandOff(shard)     ⇒ log.warning("Shard [{}] can not hand off for another Shard [{}]", shardId, shard)
        case _                  ⇒ unhandled(msg)
      }
    
      def handOff(replyTo: ActorRef): Unit = handOffStopper match {
        case Some(_) ⇒ log.warning("HandOff shard [{}] received during existing handOff", shardId)
        case None ⇒
          log.debug("HandOff shard [{}]", shardId)
    
          if (state.entities.nonEmpty) {
            handOffStopper = Some(context.watch(context.actorOf(
              handOffStopperProps(shardId, replyTo, idByRef.keySet, handOffStopMessage))))
    
            //During hand off we only care about watching for termination of the hand off stopper
            context become {
              case Terminated(ref) ⇒ receiveTerminated(ref)
            }
          } else {
            replyTo ! ShardStopped(shardId)
            context stop self
          }
      }
    def receiveTerminated(ref: ActorRef): Unit = {
    if (handOffStopper.exists(_ == ref))
    context stop self
    else if (idByRef.contains(ref) && handOffStopper.isEmpty)
    entityTerminated(ref)
    }

     从这段代码看, shard actor 与 entity actor 的关系是一对一,因为当 entity stop self 了以后, shard actor 也会 stop self。这让我想到 coursera reactive programming 的最后一道作业题,为什么也是类似于 一个 entity 有一个 shard actor 对应。 

  • 相关阅读:
    django用户认证系统——基本设置1
    django用户认证系统——注册3
    django数据库设计
    修改linux最大文件句柄数
    LoadRunner监控Linux
    MySQL设置密码的三种方法
    JMeter学习-021-JMeter 定时器的应用
    mysql-bin.000001文件的来源及处理方法【转】
    js读取解析JSON数据
    关于查询区域标注区域总结
  • 原文地址:https://www.cnblogs.com/xinsheng/p/4771725.html
Copyright © 2011-2022 走看看