zoukankan      html  css  js  c++  java
  • Akka源码分析-故障恢复

      Actor故障恢复是akka中非常重要的内容,在之前的博客中虽然有介绍,但都是杂糅在其他知识点的细节中,本博客将单独介绍这一部分的故障恢复。为了简化分析的单独,本文只研究用户的actor故障恢复的步骤,系统actor也基本差不多;另外,为了简化篇幅,不重要的源码将不再整段贴出来,感兴趣的读者可以去下载源码单独分析。

       上面是官方描述actor树形结构时用到的一个图,非常重要,希望读者一定要记住这个树形的层次概念。也就是说,actor一定会归属到某个父actor下面,而actor的故障恢复就是通过这种层级来实现的。这个图之所以重要,是因为如果你能够深刻理解这个图,大概就知道如何实现故障恢复了。

      有些爱钻研的读者可能会问,还有一个 / 这个actor没有父actor啊,这个actor如果失败,怎么恢复?其实吧,这算是一个好问题。这是akka设计的根基啊,如果akka本身动不动都需要故障恢复,别人还怎么用。你可以这样理解,akka设计的比较好,或者说root actor功能比较少,轻易不会出现故障。虽然这样解释有点勉强,哈哈。

      还记得Actor的调度模型吗,actor只有在收到消息时,才会去调用receive方法,此时actor才会被某个线程执行,也就是“活着”的。如果某个actor没有消息,那么它仅仅是一堆数据躺在内存,也就是“死的”。actor只有在“活着”的时候才可能出现故障。所以我们要从actor处理消息入手,来看看如何实现故障恢复。

    //Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status
      final def invoke(messageHandle: Envelope): Unit = {
        val influenceReceiveTimeout = !messageHandle.message.isInstanceOf[NotInfluenceReceiveTimeout]
        try {
          currentMessage = messageHandle
          if (influenceReceiveTimeout)
            cancelReceiveTimeout()
          messageHandle.message match {
            case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle)
            case msg                      ⇒ receiveMessage(msg)
          }
          currentMessage = null // reset current message after successful invocation
        } catch handleNonFatalOrInterruptedException { e ⇒
          handleInvokeFailure(Nil, e)
        } finally {
          if (influenceReceiveTimeout)
            checkReceiveTimeout // Reschedule receive timeout
        }
      }
    

       上面是ActorCell里面处理用户消息的相关函数,这包括在一个try-catch里面,我们假设receiveMessage处理消息出现了异常。

    final protected def handleNonFatalOrInterruptedException(thunk: (Throwable) ⇒ Unit): Catcher[Unit] = {
        case e: InterruptedException ⇒
          thunk(e)
          Thread.currentThread().interrupt()
        case NonFatal(e) ⇒
          thunk(e)
      }
    /**
        * Returns true if the provided `Throwable` is to be considered non-fatal, or false if it is to be considered fatal
        */
       def apply(t: Throwable): Boolean = t match {
         // VirtualMachineError includes OutOfMemoryError and other fatal errors
         case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException | _: LinkageError | _: ControlThrowable => false
         case _ => true
       }
    

       出现异常则会执行catch代码块,其实就是执行了handleNonFatalOrInterruptedException函数,这个函数会检查是不是InterruptedException或NonFatal类型的异常,如果是,则会执行thunk也就是handleInvokeFailure。简单来说就是如果receive处理消息出现异常,则会执行handleInvokeFailure。

    final def handleInvokeFailure(childrenNotToSuspend: immutable.Iterable[ActorRef], t: Throwable): Unit = {
        // prevent any further messages to be processed until the actor has been restarted
        if (!isFailed) try {
          suspendNonRecursive()
          // suspend children
          val skip: Set[ActorRef] = currentMessage match {
            case Envelope(Failed(_, _, _), child) ⇒ { setFailed(child); Set(child) }
            case _                                ⇒ { setFailed(self); Set.empty }
          }
          suspendChildren(exceptFor = skip ++ childrenNotToSuspend)
          t match {
            // tell supervisor
            case _: InterruptedException ⇒
              // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
              parent.sendSystemMessage(Failed(self, new ActorInterruptedException(t), uid))
            case _ ⇒
              // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
              parent.sendSystemMessage(Failed(self, t, uid))
          }
        } catch handleNonFatalOrInterruptedException { e ⇒
          publish(Error(e, self.path.toString, clazz(actor),
            "emergency stop: exception in failure handling for " + t.getClass + Logging.stackTraceFor(t)))
          try children foreach stop
          finally finishTerminate()
        }
      }
    

       handleInvokeFailure会执行一个try语句,该语句第一句话就是执行suspendNonRecursive,从名称来看,就是要非递归的方式挂起当前actor,就是挂起mailbox对消息的处理,其实就是设置mailbox的状态。

    private def suspendNonRecursive(): Unit = dispatcher suspend this
    
    /**
       * After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference
       */
      protected[akka] def suspend(actor: ActorCell): Unit = {
        val mbox = actor.mailbox
        if ((mbox.actor eq actor) && (mbox.dispatcher eq this))
          mbox.suspend()
      }
    
    /**
       * Increment the suspend count by one. Caller does not need to worry about whether
       * status was Scheduled or not.
       *
       * @return true if the previous suspend count was zero
       */
      @tailrec
      final def suspend(): Boolean = currentStatus match {
        case Closed ⇒
          setStatus(Closed); false
        case s ⇒
          if (updateStatus(s, s + suspendUnit)) s < suspendUnit
          else suspend()
      }
    

       挂起当前出现异常的actor的mailbox对消息的处理之后,又通过suspendChildren挂起了子actor对消息的处理(流程跟上面差不多,都是设置子actor的mailbox状态),最后给父actor发送了Failed消息就结束了。

      总结一下当前处理逻辑,如果某个actor对消息的处理出现异常,就挂起当前actor的邮箱,然后递归地挂起所有子actor的邮箱,发送Failed消息给父actor。怎么样简单吧,就是这么简单。现在要看父actor如何处理这个消息了。

    final protected def handleFailure(f: Failed): Unit = {
        currentMessage = Envelope(f, f.child, system)
        getChildByRef(f.child) match {
          /*
           * only act upon the failure, if it comes from a currently known child;
           * the UID protects against reception of a Failed from a child which was
           * killed in preRestart and re-created in postRestart
           */
          case Some(stats) if stats.uid == f.uid ⇒
            if (!actor.supervisorStrategy.handleFailure(this, f.child, f.cause, stats, getAllChildStats)) throw f.cause
          case Some(stats) ⇒
            publish(Debug(self.path.toString, clazz(actor),
              "dropping Failed(" + f.cause + ") from old child " + f.child + " (uid=" + stats.uid + " != " + f.uid + ")"))
          case None ⇒
            publish(Debug(self.path.toString, clazz(actor), "dropping Failed(" + f.cause + ") from unknown child " + f.child))
        }
      }
    

       父actor异步收到Failed消息后会调用handleFailure方法。这个方法有两个主要逻辑,设置当前的currentMessage值,调用当前actor也就是抛异常actor的父actor的supervisorStrategy对象的handleFailure方法,如果handleFailure不处理当前异常,则通过throw继续抛出异常,这又进入了父actor的handleInvokeFailure处理逻辑。

    /**
       * This is the main entry point: in case of a child’s failure, this method
       * must try to handle the failure by resuming, restarting or stopping the
       * child (and returning `true`), or it returns `false` to escalate the
       * failure, which will lead to this actor re-throwing the exception which
       * caused the failure. The exception will not be wrapped.
       *
       * This method calls [[akka.actor.SupervisorStrategy#logFailure]], which will
       * log the failure unless it is escalated. You can customize the logging by
       * setting [[akka.actor.SupervisorStrategy#loggingEnabled]] to `false` and
       * do the logging inside the `decider` or override the `logFailure` method.
       *
       * @param children is a lazy collection (a view)
       */
      def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = {
        val directive = decider.applyOrElse(cause, escalateDefault)
        directive match {
          case Resume ⇒
            logFailure(context, child, cause, directive)
            resumeChild(child, cause)
            true
          case Restart ⇒
            logFailure(context, child, cause, directive)
            processFailure(context, true, child, cause, stats, children)
            true
          case Stop ⇒
            logFailure(context, child, cause, directive)
            processFailure(context, false, child, cause, stats, children)
            true
          case Escalate ⇒
            logFailure(context, child, cause, directive)
            false
        }
      }
    

       handleFailure会根据当前actor也就是父actor的监督策略,判断当前的指令,然后调用processFailure方法。请注意processFailure方法的第一个参数是当前父actor的context值,第三个参数是出现异常的子actor的ActorRef。

    /**
     * Applies the fault handling `Directive` (Resume, Restart, Stop) specified in the `Decider`
     * to the child actor that failed, as opposed to [[akka.actor.AllForOneStrategy]] that applies
     * it to all children.
     *
     * @param maxNrOfRetries the number of times a child actor is allowed to be restarted, negative value means no limit,
     *   if the limit is exceeded the child actor is stopped
     * @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window
     * @param decider mapping from Throwable to [[akka.actor.SupervisorStrategy.Directive]], you can also use a
     *   [[scala.collection.immutable.Seq]] of Throwables which maps the given Throwables to restarts, otherwise escalates.
     * @param loggingEnabled the strategy logs the failure if this is enabled (true), by default it is enabled
     */
    case class OneForOneStrategy(
      maxNrOfRetries:              Int      = -1,
      withinTimeRange:             Duration = Duration.Inf,
      override val loggingEnabled: Boolean  = true)(val decider: SupervisorStrategy.Decider)
      extends SupervisorStrategy 
    

       默认情况下,每个actor的监督策略是OneForOneStrategy。也就是遇到异常会执行Restart分支的代码。

    def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = {
        if (restart && stats.requestRestartPermission(retriesWindow))
          restartChild(child, cause, suspendFirst = false)
        else
          context.stop(child) //TODO optimization to drop child here already?
      }
    

       其实就是会执行restartChild方法,当然我们还可以设置重试次数以防止无限重启。(默认是没有次数限制的)

    /**
       * Restart the given child, possibly suspending it first.
       *
       * <b>IMPORTANT:</b>
       *
       * If the child is the currently failing one, it will already have been
       * suspended, hence `suspendFirst` must be false. If the child is not the
       * currently failing one, then it did not request this treatment and is
       * therefore not prepared to be resumed without prior suspend.
       */
      final def restartChild(child: ActorRef, cause: Throwable, suspendFirst: Boolean): Unit = {
        val c = child.asInstanceOf[InternalActorRef]
        if (suspendFirst) c.suspend()
        c.restart(cause)
      }
    

       restartChild比较简单,就是执行了子actor的restart函数。那么restart函数在哪里实现呢?还记得这个ActorRef的实际类型是什么吗?没错就是RepointableActorRef

    def restart(cause: Throwable): Unit = underlying.restart(cause)
    

       上面是RepointableActorRef的restart定义,就是调用了ActorCell的restart方法。

    final def restart(cause: Throwable): Unit = try dispatcher.systemDispatch(this, Recreate(cause)) catch handleException
    

       上面是ActorCell的restart方法的实现(在akka.actor.dungeon.Dispatch里面),就是又给自己发送了一个Recreate方法。读者可能会有疑问,之前不都把邮箱给suspend了么,怎么还收消息,但请注意这里发送的是系统消息。所以系统消息还是可以被路由的。

    /**
       * Do re-create the actor in response to a failure.
       */
      protected def faultRecreate(cause: Throwable): Unit =
        if (actor == null) {
          system.eventStream.publish(Error(self.path.toString, clazz(actor),
            "changing Recreate into Create after " + cause))
          faultCreate()
        } else if (isNormal) {
          val failedActor = actor
          if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(failedActor), "restarting"))
          if (failedActor ne null) {
            val optionalMessage = if (currentMessage ne null) Some(currentMessage.message) else None
            try {
              // if the actor fails in preRestart, we can do nothing but log it: it’s best-effort
              if (failedActor.context ne null) failedActor.aroundPreRestart(cause, optionalMessage)
            } catch handleNonFatalOrInterruptedException { e ⇒
              val ex = PreRestartException(self, e, cause, optionalMessage)
              publish(Error(ex, self.path.toString, clazz(failedActor), e.getMessage))
            } finally {
              clearActorFields(failedActor, recreate = true)
            }
          }
          assert(mailbox.isSuspended, "mailbox must be suspended during restart, status=" + mailbox.currentStatus)
          if (!setChildrenTerminationReason(ChildrenContainer.Recreation(cause))) finishRecreate(cause, failedActor)
        } else {
          // need to keep that suspend counter balanced
          faultResume(causedByFailure = null)
        }
    

       上面是收到Recreate时调用的函数,简单来说就是调用aroundPreRestart函数,和clearActorFields、finishRecreate。从此也可以看出aroundPreRestart、preRestart是actor实例出现异常最后执行的函数,此时actor的状态还是出现异常时候的值。

    final protected def clearActorFields(actorInstance: Actor, recreate: Boolean): Unit = {
        setActorFields(actorInstance, context = null, self = if (recreate) self else system.deadLetters)
        currentMessage = null
        behaviorStack = emptyBehaviorStack
      }
    

       clearActorFields设置了context/self/currentMessage/behaviorStack几个字段为默认值或者说空值。setChildrenTerminationReason这个函数不再分析,简单来说就是递归设置子actor的失败原因,然后返回false。这实现逻辑其实也挺扯的,不返回true,返回false,简直反人类。

      其实吧,setChildrenTerminationReason一般都会成功。所以会执行finishRecreate方法。

    private def finishRecreate(cause: Throwable, failedActor: Actor): Unit = {
        // need to keep a snapshot of the surviving children before the new actor instance creates new ones
        val survivors = children
    
        try {
          try resumeNonRecursive()
          finally clearFailed() // must happen in any case, so that failure is propagated
    
          val freshActor = newActor()
          actor = freshActor // this must happen before postRestart has a chance to fail
          if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields.
    
          freshActor.aroundPostRestart(cause)
          if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(freshActor), "restarted"))
    
          // only after parent is up and running again do restart the children which were not stopped
          survivors foreach (child ⇒
            try child.asInstanceOf[InternalActorRef].restart(cause)
            catch handleNonFatalOrInterruptedException { e ⇒
              publish(Error(e, self.path.toString, clazz(freshActor), "restarting " + child))
            })
        } catch handleNonFatalOrInterruptedException { e ⇒
          clearActorFields(actor, recreate = false) // in order to prevent preRestart() from happening again
          handleInvokeFailure(survivors, PostRestartException(self, e, cause))
        }
      }
    

       首先调用resumeNonRecursive,这个好像是恢复mailbox的状态的,此时就可以正常的接收消息了。重点来了:newActor。这里通过Props创建了actor的一个新的实例,一般情况下创建的actor实例与原来的都不同,不过读者是不是觉得,这里可以优化一下,弄个参数让开发者自己定义是不是重用以前的actor实例?嗯,没错,恭喜你,这就是SupervisorStrategy.Resume的作用,其实就是简单的忽略此次异常,actor实例不会有任何变化。

      重建之后调用了新actor实例的aroundPostRestart、postRestart,也就是说postRestart是新实例创建成功后执行的第一个方法。新的actor实例创建成功后,后面的survivors foreach代码块就是在递归的重启子actor了。这里不再具体分析。

      再总结一下,父actor收到子actor异常的消息后,根据当前的策略判断是恢复、重启、还是重用什么的,一般都是重启。如果是重启就是调用子actor的restart函数,就会给出现异常的子actor,发送Recreate系统消息,子actor收到该消息后,会执行一系列重启的操作,最后创建新的actor实例,重启成功。当然这个过程中,子actor的所有子actor也会被重启,也就是一个递归的处理过程。

      其实简单的说,如果某个子actor出现异常,挂起自己的邮箱和递归子actor的邮箱,发消息通知父actor,父actor根据当前策略判断能够重启,如果能够重启,则该子actor完成重启动作,并负责递归重启它的子actor。当然还有其他策略,不同的策略这个处理逻辑会有不同。

      关于Actor的生命周期,上面的图画的比较清楚,读者可以自行理解。但需要注意,Instance 1和Instance 2是两个“不同”的Actor实例,这里的不同,是指Actor内部的用户状态(即开发者自定义的字段)全部丢失,都会被重建。但两个不同实例的UID相同,这就意味着用相同的ActorRef可以跟后面重新建立的Instance 2发消息。

    Akka in action: actor model

  • 相关阅读:
    asp.net 曲线图
    asp.net 图片下载
    使用a标签删除进行提示
    asp.net 后台获取input的值
    asp.net Excel数据导入到数据库中
    asp.net DataSet数据导出到Excel中
    select top 所有
    asp.net 上一条和下一条记录的显示
    日期相减函数
    mybatis mapper学习1(补充)-mapper.xml映射文件生成补充
  • 原文地址:https://www.cnblogs.com/gabry/p/9402456.html
Copyright © 2011-2022 走看看