zoukankan      html  css  js  c++  java
  • Akka源码分析-故障恢复

      Actor故障恢复是akka中非常重要的内容,在之前的博客中虽然有介绍,但都是杂糅在其他知识点的细节中,本博客将单独介绍这一部分的故障恢复。为了简化分析的单独,本文只研究用户的actor故障恢复的步骤,系统actor也基本差不多;另外,为了简化篇幅,不重要的源码将不再整段贴出来,感兴趣的读者可以去下载源码单独分析。

       上面是官方描述actor树形结构时用到的一个图,非常重要,希望读者一定要记住这个树形的层次概念。也就是说,actor一定会归属到某个父actor下面,而actor的故障恢复就是通过这种层级来实现的。这个图之所以重要,是因为如果你能够深刻理解这个图,大概就知道如何实现故障恢复了。

      有些爱钻研的读者可能会问,还有一个 / 这个actor没有父actor啊,这个actor如果失败,怎么恢复?其实吧,这算是一个好问题。这是akka设计的根基啊,如果akka本身动不动都需要故障恢复,别人还怎么用。你可以这样理解,akka设计的比较好,或者说root actor功能比较少,轻易不会出现故障。虽然这样解释有点勉强,哈哈。

      还记得Actor的调度模型吗,actor只有在收到消息时,才会去调用receive方法,此时actor才会被某个线程执行,也就是“活着”的。如果某个actor没有消息,那么它仅仅是一堆数据躺在内存,也就是“死的”。actor只有在“活着”的时候才可能出现故障。所以我们要从actor处理消息入手,来看看如何实现故障恢复。

    //Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status
      final def invoke(messageHandle: Envelope): Unit = {
        val influenceReceiveTimeout = !messageHandle.message.isInstanceOf[NotInfluenceReceiveTimeout]
        try {
          currentMessage = messageHandle
          if (influenceReceiveTimeout)
            cancelReceiveTimeout()
          messageHandle.message match {
            case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle)
            case msg                      ⇒ receiveMessage(msg)
          }
          currentMessage = null // reset current message after successful invocation
        } catch handleNonFatalOrInterruptedException { e ⇒
          handleInvokeFailure(Nil, e)
        } finally {
          if (influenceReceiveTimeout)
            checkReceiveTimeout // Reschedule receive timeout
        }
      }
    

       上面是ActorCell里面处理用户消息的相关函数,这包括在一个try-catch里面,我们假设receiveMessage处理消息出现了异常。

    final protected def handleNonFatalOrInterruptedException(thunk: (Throwable) ⇒ Unit): Catcher[Unit] = {
        case e: InterruptedException ⇒
          thunk(e)
          Thread.currentThread().interrupt()
        case NonFatal(e) ⇒
          thunk(e)
      }
    /**
        * Returns true if the provided `Throwable` is to be considered non-fatal, or false if it is to be considered fatal
        */
       def apply(t: Throwable): Boolean = t match {
         // VirtualMachineError includes OutOfMemoryError and other fatal errors
         case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException | _: LinkageError | _: ControlThrowable => false
         case _ => true
       }
    

       出现异常则会执行catch代码块,其实就是执行了handleNonFatalOrInterruptedException函数,这个函数会检查是不是InterruptedException或NonFatal类型的异常,如果是,则会执行thunk也就是handleInvokeFailure。简单来说就是如果receive处理消息出现异常,则会执行handleInvokeFailure。

    final def handleInvokeFailure(childrenNotToSuspend: immutable.Iterable[ActorRef], t: Throwable): Unit = {
        // prevent any further messages to be processed until the actor has been restarted
        if (!isFailed) try {
          suspendNonRecursive()
          // suspend children
          val skip: Set[ActorRef] = currentMessage match {
            case Envelope(Failed(_, _, _), child) ⇒ { setFailed(child); Set(child) }
            case _                                ⇒ { setFailed(self); Set.empty }
          }
          suspendChildren(exceptFor = skip ++ childrenNotToSuspend)
          t match {
            // tell supervisor
            case _: InterruptedException ⇒
              // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
              parent.sendSystemMessage(Failed(self, new ActorInterruptedException(t), uid))
            case _ ⇒
              // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
              parent.sendSystemMessage(Failed(self, t, uid))
          }
        } catch handleNonFatalOrInterruptedException { e ⇒
          publish(Error(e, self.path.toString, clazz(actor),
            "emergency stop: exception in failure handling for " + t.getClass + Logging.stackTraceFor(t)))
          try children foreach stop
          finally finishTerminate()
        }
      }
    

       handleInvokeFailure会执行一个try语句,该语句第一句话就是执行suspendNonRecursive,从名称来看,就是要非递归的方式挂起当前actor,就是挂起mailbox对消息的处理,其实就是设置mailbox的状态。

    private def suspendNonRecursive(): Unit = dispatcher suspend this
    
    /**
       * After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference
       */
      protected[akka] def suspend(actor: ActorCell): Unit = {
        val mbox = actor.mailbox
        if ((mbox.actor eq actor) && (mbox.dispatcher eq this))
          mbox.suspend()
      }
    
    /**
       * Increment the suspend count by one. Caller does not need to worry about whether
       * status was Scheduled or not.
       *
       * @return true if the previous suspend count was zero
       */
      @tailrec
      final def suspend(): Boolean = currentStatus match {
        case Closed ⇒
          setStatus(Closed); false
        case s ⇒
          if (updateStatus(s, s + suspendUnit)) s < suspendUnit
          else suspend()
      }
    

       挂起当前出现异常的actor的mailbox对消息的处理之后,又通过suspendChildren挂起了子actor对消息的处理(流程跟上面差不多,都是设置子actor的mailbox状态),最后给父actor发送了Failed消息就结束了。

      总结一下当前处理逻辑,如果某个actor对消息的处理出现异常,就挂起当前actor的邮箱,然后递归地挂起所有子actor的邮箱,发送Failed消息给父actor。怎么样简单吧,就是这么简单。现在要看父actor如何处理这个消息了。

    final protected def handleFailure(f: Failed): Unit = {
        currentMessage = Envelope(f, f.child, system)
        getChildByRef(f.child) match {
          /*
           * only act upon the failure, if it comes from a currently known child;
           * the UID protects against reception of a Failed from a child which was
           * killed in preRestart and re-created in postRestart
           */
          case Some(stats) if stats.uid == f.uid ⇒
            if (!actor.supervisorStrategy.handleFailure(this, f.child, f.cause, stats, getAllChildStats)) throw f.cause
          case Some(stats) ⇒
            publish(Debug(self.path.toString, clazz(actor),
              "dropping Failed(" + f.cause + ") from old child " + f.child + " (uid=" + stats.uid + " != " + f.uid + ")"))
          case None ⇒
            publish(Debug(self.path.toString, clazz(actor), "dropping Failed(" + f.cause + ") from unknown child " + f.child))
        }
      }
    

       父actor异步收到Failed消息后会调用handleFailure方法。这个方法有两个主要逻辑,设置当前的currentMessage值,调用当前actor也就是抛异常actor的父actor的supervisorStrategy对象的handleFailure方法,如果handleFailure不处理当前异常,则通过throw继续抛出异常,这又进入了父actor的handleInvokeFailure处理逻辑。

    /**
       * This is the main entry point: in case of a child’s failure, this method
       * must try to handle the failure by resuming, restarting or stopping the
       * child (and returning `true`), or it returns `false` to escalate the
       * failure, which will lead to this actor re-throwing the exception which
       * caused the failure. The exception will not be wrapped.
       *
       * This method calls [[akka.actor.SupervisorStrategy#logFailure]], which will
       * log the failure unless it is escalated. You can customize the logging by
       * setting [[akka.actor.SupervisorStrategy#loggingEnabled]] to `false` and
       * do the logging inside the `decider` or override the `logFailure` method.
       *
       * @param children is a lazy collection (a view)
       */
      def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = {
        val directive = decider.applyOrElse(cause, escalateDefault)
        directive match {
          case Resume ⇒
            logFailure(context, child, cause, directive)
            resumeChild(child, cause)
            true
          case Restart ⇒
            logFailure(context, child, cause, directive)
            processFailure(context, true, child, cause, stats, children)
            true
          case Stop ⇒
            logFailure(context, child, cause, directive)
            processFailure(context, false, child, cause, stats, children)
            true
          case Escalate ⇒
            logFailure(context, child, cause, directive)
            false
        }
      }
    

       handleFailure会根据当前actor也就是父actor的监督策略,判断当前的指令,然后调用processFailure方法。请注意processFailure方法的第一个参数是当前父actor的context值,第三个参数是出现异常的子actor的ActorRef。

    /**
     * Applies the fault handling `Directive` (Resume, Restart, Stop) specified in the `Decider`
     * to the child actor that failed, as opposed to [[akka.actor.AllForOneStrategy]] that applies
     * it to all children.
     *
     * @param maxNrOfRetries the number of times a child actor is allowed to be restarted, negative value means no limit,
     *   if the limit is exceeded the child actor is stopped
     * @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window
     * @param decider mapping from Throwable to [[akka.actor.SupervisorStrategy.Directive]], you can also use a
     *   [[scala.collection.immutable.Seq]] of Throwables which maps the given Throwables to restarts, otherwise escalates.
     * @param loggingEnabled the strategy logs the failure if this is enabled (true), by default it is enabled
     */
    case class OneForOneStrategy(
      maxNrOfRetries:              Int      = -1,
      withinTimeRange:             Duration = Duration.Inf,
      override val loggingEnabled: Boolean  = true)(val decider: SupervisorStrategy.Decider)
      extends SupervisorStrategy 
    

       默认情况下,每个actor的监督策略是OneForOneStrategy。也就是遇到异常会执行Restart分支的代码。

    def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = {
        if (restart && stats.requestRestartPermission(retriesWindow))
          restartChild(child, cause, suspendFirst = false)
        else
          context.stop(child) //TODO optimization to drop child here already?
      }
    

       其实就是会执行restartChild方法,当然我们还可以设置重试次数以防止无限重启。(默认是没有次数限制的)

    /**
       * Restart the given child, possibly suspending it first.
       *
       * <b>IMPORTANT:</b>
       *
       * If the child is the currently failing one, it will already have been
       * suspended, hence `suspendFirst` must be false. If the child is not the
       * currently failing one, then it did not request this treatment and is
       * therefore not prepared to be resumed without prior suspend.
       */
      final def restartChild(child: ActorRef, cause: Throwable, suspendFirst: Boolean): Unit = {
        val c = child.asInstanceOf[InternalActorRef]
        if (suspendFirst) c.suspend()
        c.restart(cause)
      }
    

       restartChild比较简单,就是执行了子actor的restart函数。那么restart函数在哪里实现呢?还记得这个ActorRef的实际类型是什么吗?没错就是RepointableActorRef

    def restart(cause: Throwable): Unit = underlying.restart(cause)
    

       上面是RepointableActorRef的restart定义,就是调用了ActorCell的restart方法。

    final def restart(cause: Throwable): Unit = try dispatcher.systemDispatch(this, Recreate(cause)) catch handleException
    

       上面是ActorCell的restart方法的实现(在akka.actor.dungeon.Dispatch里面),就是又给自己发送了一个Recreate方法。读者可能会有疑问,之前不都把邮箱给suspend了么,怎么还收消息,但请注意这里发送的是系统消息。所以系统消息还是可以被路由的。

    /**
       * Do re-create the actor in response to a failure.
       */
      protected def faultRecreate(cause: Throwable): Unit =
        if (actor == null) {
          system.eventStream.publish(Error(self.path.toString, clazz(actor),
            "changing Recreate into Create after " + cause))
          faultCreate()
        } else if (isNormal) {
          val failedActor = actor
          if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(failedActor), "restarting"))
          if (failedActor ne null) {
            val optionalMessage = if (currentMessage ne null) Some(currentMessage.message) else None
            try {
              // if the actor fails in preRestart, we can do nothing but log it: it’s best-effort
              if (failedActor.context ne null) failedActor.aroundPreRestart(cause, optionalMessage)
            } catch handleNonFatalOrInterruptedException { e ⇒
              val ex = PreRestartException(self, e, cause, optionalMessage)
              publish(Error(ex, self.path.toString, clazz(failedActor), e.getMessage))
            } finally {
              clearActorFields(failedActor, recreate = true)
            }
          }
          assert(mailbox.isSuspended, "mailbox must be suspended during restart, status=" + mailbox.currentStatus)
          if (!setChildrenTerminationReason(ChildrenContainer.Recreation(cause))) finishRecreate(cause, failedActor)
        } else {
          // need to keep that suspend counter balanced
          faultResume(causedByFailure = null)
        }
    

       上面是收到Recreate时调用的函数,简单来说就是调用aroundPreRestart函数,和clearActorFields、finishRecreate。从此也可以看出aroundPreRestart、preRestart是actor实例出现异常最后执行的函数,此时actor的状态还是出现异常时候的值。

    final protected def clearActorFields(actorInstance: Actor, recreate: Boolean): Unit = {
        setActorFields(actorInstance, context = null, self = if (recreate) self else system.deadLetters)
        currentMessage = null
        behaviorStack = emptyBehaviorStack
      }
    

       clearActorFields设置了context/self/currentMessage/behaviorStack几个字段为默认值或者说空值。setChildrenTerminationReason这个函数不再分析,简单来说就是递归设置子actor的失败原因,然后返回false。这实现逻辑其实也挺扯的,不返回true,返回false,简直反人类。

      其实吧,setChildrenTerminationReason一般都会成功。所以会执行finishRecreate方法。

    private def finishRecreate(cause: Throwable, failedActor: Actor): Unit = {
        // need to keep a snapshot of the surviving children before the new actor instance creates new ones
        val survivors = children
    
        try {
          try resumeNonRecursive()
          finally clearFailed() // must happen in any case, so that failure is propagated
    
          val freshActor = newActor()
          actor = freshActor // this must happen before postRestart has a chance to fail
          if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields.
    
          freshActor.aroundPostRestart(cause)
          if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(freshActor), "restarted"))
    
          // only after parent is up and running again do restart the children which were not stopped
          survivors foreach (child ⇒
            try child.asInstanceOf[InternalActorRef].restart(cause)
            catch handleNonFatalOrInterruptedException { e ⇒
              publish(Error(e, self.path.toString, clazz(freshActor), "restarting " + child))
            })
        } catch handleNonFatalOrInterruptedException { e ⇒
          clearActorFields(actor, recreate = false) // in order to prevent preRestart() from happening again
          handleInvokeFailure(survivors, PostRestartException(self, e, cause))
        }
      }
    

       首先调用resumeNonRecursive,这个好像是恢复mailbox的状态的,此时就可以正常的接收消息了。重点来了:newActor。这里通过Props创建了actor的一个新的实例,一般情况下创建的actor实例与原来的都不同,不过读者是不是觉得,这里可以优化一下,弄个参数让开发者自己定义是不是重用以前的actor实例?嗯,没错,恭喜你,这就是SupervisorStrategy.Resume的作用,其实就是简单的忽略此次异常,actor实例不会有任何变化。

      重建之后调用了新actor实例的aroundPostRestart、postRestart,也就是说postRestart是新实例创建成功后执行的第一个方法。新的actor实例创建成功后,后面的survivors foreach代码块就是在递归的重启子actor了。这里不再具体分析。

      再总结一下,父actor收到子actor异常的消息后,根据当前的策略判断是恢复、重启、还是重用什么的,一般都是重启。如果是重启就是调用子actor的restart函数,就会给出现异常的子actor,发送Recreate系统消息,子actor收到该消息后,会执行一系列重启的操作,最后创建新的actor实例,重启成功。当然这个过程中,子actor的所有子actor也会被重启,也就是一个递归的处理过程。

      其实简单的说,如果某个子actor出现异常,挂起自己的邮箱和递归子actor的邮箱,发消息通知父actor,父actor根据当前策略判断能够重启,如果能够重启,则该子actor完成重启动作,并负责递归重启它的子actor。当然还有其他策略,不同的策略这个处理逻辑会有不同。

      关于Actor的生命周期,上面的图画的比较清楚,读者可以自行理解。但需要注意,Instance 1和Instance 2是两个“不同”的Actor实例,这里的不同,是指Actor内部的用户状态(即开发者自定义的字段)全部丢失,都会被重建。但两个不同实例的UID相同,这就意味着用相同的ActorRef可以跟后面重新建立的Instance 2发消息。

    Akka in action: actor model

  • 相关阅读:
    HOJ 2139 Spiderman's workout(动态规划)
    FZU 2107 Hua Rong Dao(dfs)
    Java 第十一届 蓝桥杯 省模拟赛 计算机存储中有多少字节
    Java 第十一届 蓝桥杯 省模拟赛 计算机存储中有多少字节
    Java 第十一届 蓝桥杯 省模拟赛 计算机存储中有多少字节
    Java 第十一届 蓝桥杯 省模拟赛 合法括号序列
    Java 第十一届 蓝桥杯 省模拟赛 合法括号序列
    Java 第十一届 蓝桥杯 省模拟赛 合法括号序列
    Java 第十一届 蓝桥杯 省模拟赛 无向连通图最少包含多少条边
    Java 第十一届 蓝桥杯 省模拟赛 无向连通图最少包含多少条边
  • 原文地址:https://www.cnblogs.com/gabry/p/9402456.html
Copyright © 2011-2022 走看看