有些爱钻研的读者可能会问,还有一个 / 这个actor没有父actor啊,这个actor如果失败,怎么恢复?其实吧,这算是一个好问题。这是akka设计的根基啊,如果akka本身动不动都需要故障恢复,别人还怎么用。你可以这样理解,akka设计的比较好,或者说root actor功能比较少,轻易不会出现故障。虽然这样解释有点勉强,哈哈。
//Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status final def invoke(messageHandle: Envelope): Unit = { val influenceReceiveTimeout = !messageHandle.message.isInstanceOf[NotInfluenceReceiveTimeout] try { currentMessage = messageHandle if (influenceReceiveTimeout) cancelReceiveTimeout() messageHandle.message match { case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle) case msg ⇒ receiveMessage(msg) } currentMessage = null // reset current message after successful invocation } catch handleNonFatalOrInterruptedException { e ⇒ handleInvokeFailure(Nil, e) } finally { if (influenceReceiveTimeout) checkReceiveTimeout // Reschedule receive timeout } }
final protected def handleNonFatalOrInterruptedException(thunk: (Throwable) ⇒ Unit): Catcher[Unit] = { case e: InterruptedException ⇒ thunk(e) Thread.currentThread().interrupt() case NonFatal(e) ⇒ thunk(e) } /** * Returns true if the provided `Throwable` is to be considered non-fatal, or false if it is to be considered fatal */ def apply(t: Throwable): Boolean = t match { // VirtualMachineError includes OutOfMemoryError and other fatal errors case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException | _: LinkageError | _: ControlThrowable => false case _ => true }
final def handleInvokeFailure(childrenNotToSuspend: immutable.Iterable[ActorRef], t: Throwable): Unit = { // prevent any further messages to be processed until the actor has been restarted if (!isFailed) try { suspendNonRecursive() // suspend children val skip: Set[ActorRef] = currentMessage match { case Envelope(Failed(_, _, _), child) ⇒ { setFailed(child); Set(child) } case _ ⇒ { setFailed(self); Set.empty } } suspendChildren(exceptFor = skip ++ childrenNotToSuspend) t match { // tell supervisor case _: InterruptedException ⇒ // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ parent.sendSystemMessage(Failed(self, new ActorInterruptedException(t), uid)) case _ ⇒ // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ parent.sendSystemMessage(Failed(self, t, uid)) } } catch handleNonFatalOrInterruptedException { e ⇒ publish(Error(e, self.path.toString, clazz(actor), "emergency stop: exception in failure handling for " + t.getClass + Logging.stackTraceFor(t))) try children foreach stop finally finishTerminate() } }
private def suspendNonRecursive(): Unit = dispatcher suspend this
/** * After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference */ protected[akka] def suspend(actor: ActorCell): Unit = { val mbox = actor.mailbox if (( eq actor) && (mbox.dispatcher eq this)) mbox.suspend() }
/** * Increment the suspend count by one. Caller does not need to worry about whether * status was Scheduled or not. * * @return true if the previous suspend count was zero */ @tailrec final def suspend(): Boolean = currentStatus match { case Closed ⇒ setStatus(Closed); false case s ⇒ if (updateStatus(s, s + suspendUnit)) s < suspendUnit else suspend() }
final protected def handleFailure(f: Failed): Unit = { currentMessage = Envelope(f, f.child, system) getChildByRef(f.child) match { /* * only act upon the failure, if it comes from a currently known child; * the UID protects against reception of a Failed from a child which was * killed in preRestart and re-created in postRestart */ case Some(stats) if stats.uid == f.uid ⇒ if (!actor.supervisorStrategy.handleFailure(this, f.child, f.cause, stats, getAllChildStats)) throw f.cause case Some(stats) ⇒ publish(Debug(self.path.toString, clazz(actor), "dropping Failed(" + f.cause + ") from old child " + f.child + " (uid=" + stats.uid + " != " + f.uid + ")")) case None ⇒ publish(Debug(self.path.toString, clazz(actor), "dropping Failed(" + f.cause + ") from unknown child " + f.child)) } }
/** * This is the main entry point: in case of a child’s failure, this method * must try to handle the failure by resuming, restarting or stopping the * child (and returning `true`), or it returns `false` to escalate the * failure, which will lead to this actor re-throwing the exception which * caused the failure. The exception will not be wrapped. * * This method calls [[]], which will * log the failure unless it is escalated. You can customize the logging by * setting [[]] to `false` and * do the logging inside the `decider` or override the `logFailure` method. * * @param children is a lazy collection (a view) */ def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = { val directive = decider.applyOrElse(cause, escalateDefault) directive match { case Resume ⇒ logFailure(context, child, cause, directive) resumeChild(child, cause) true case Restart ⇒ logFailure(context, child, cause, directive) processFailure(context, true, child, cause, stats, children) true case Stop ⇒ logFailure(context, child, cause, directive) processFailure(context, false, child, cause, stats, children) true case Escalate ⇒ logFailure(context, child, cause, directive) false } }
/** * Applies the fault handling `Directive` (Resume, Restart, Stop) specified in the `Decider` * to the child actor that failed, as opposed to [[]] that applies * it to all children. * * @param maxNrOfRetries the number of times a child actor is allowed to be restarted, negative value means no limit, * if the limit is exceeded the child actor is stopped * @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window * @param decider mapping from Throwable to [[]], you can also use a * [[scala.collection.immutable.Seq]] of Throwables which maps the given Throwables to restarts, otherwise escalates. * @param loggingEnabled the strategy logs the failure if this is enabled (true), by default it is enabled */ case class OneForOneStrategy( maxNrOfRetries: Int = -1, withinTimeRange: Duration = Duration.Inf, override val loggingEnabled: Boolean = true)(val decider: SupervisorStrategy.Decider) extends SupervisorStrategy
def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = { if (restart && stats.requestRestartPermission(retriesWindow)) restartChild(child, cause, suspendFirst = false) else context.stop(child) //TODO optimization to drop child here already? }
/** * Restart the given child, possibly suspending it first. * * <b>IMPORTANT:</b> * * If the child is the currently failing one, it will already have been * suspended, hence `suspendFirst` must be false. If the child is not the * currently failing one, then it did not request this treatment and is * therefore not prepared to be resumed without prior suspend. */ final def restartChild(child: ActorRef, cause: Throwable, suspendFirst: Boolean): Unit = { val c = child.asInstanceOf[InternalActorRef] if (suspendFirst) c.suspend() c.restart(cause) }
def restart(cause: Throwable): Unit = underlying.restart(cause)
final def restart(cause: Throwable): Unit = try dispatcher.systemDispatch(this, Recreate(cause)) catch handleException
/** * Do re-create the actor in response to a failure. */ protected def faultRecreate(cause: Throwable): Unit = if (actor == null) { system.eventStream.publish(Error(self.path.toString, clazz(actor), "changing Recreate into Create after " + cause)) faultCreate() } else if (isNormal) { val failedActor = actor if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(failedActor), "restarting")) if (failedActor ne null) { val optionalMessage = if (currentMessage ne null) Some(currentMessage.message) else None try { // if the actor fails in preRestart, we can do nothing but log it: it’s best-effort if (failedActor.context ne null) failedActor.aroundPreRestart(cause, optionalMessage) } catch handleNonFatalOrInterruptedException { e ⇒ val ex = PreRestartException(self, e, cause, optionalMessage) publish(Error(ex, self.path.toString, clazz(failedActor), e.getMessage)) } finally { clearActorFields(failedActor, recreate = true) } } assert(mailbox.isSuspended, "mailbox must be suspended during restart, status=" + mailbox.currentStatus) if (!setChildrenTerminationReason(ChildrenContainer.Recreation(cause))) finishRecreate(cause, failedActor) } else { // need to keep that suspend counter balanced faultResume(causedByFailure = null) }
final protected def clearActorFields(actorInstance: Actor, recreate: Boolean): Unit = { setActorFields(actorInstance, context = null, self = if (recreate) self else system.deadLetters) currentMessage = null behaviorStack = emptyBehaviorStack }
private def finishRecreate(cause: Throwable, failedActor: Actor): Unit = { // need to keep a snapshot of the surviving children before the new actor instance creates new ones val survivors = children try { try resumeNonRecursive() finally clearFailed() // must happen in any case, so that failure is propagated val freshActor = newActor() actor = freshActor // this must happen before postRestart has a chance to fail if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields. freshActor.aroundPostRestart(cause) if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(freshActor), "restarted")) // only after parent is up and running again do restart the children which were not stopped survivors foreach (child ⇒ try child.asInstanceOf[InternalActorRef].restart(cause) catch handleNonFatalOrInterruptedException { e ⇒ publish(Error(e, self.path.toString, clazz(freshActor), "restarting " + child)) }) } catch handleNonFatalOrInterruptedException { e ⇒ clearActorFields(actor, recreate = false) // in order to prevent preRestart() from happening again handleInvokeFailure(survivors, PostRestartException(self, e, cause)) } }
重建之后调用了新actor实例的aroundPostRestart、postRestart,也就是说postRestart是新实例创建成功后执行的第一个方法。新的actor实例创建成功后,后面的survivors foreach代码块就是在递归的重启子actor了。这里不再具体分析。
关于Actor的生命周期,上面的图画的比较清楚,读者可以自行理解。但需要注意,Instance 1和Instance 2是两个“不同”的Actor实例,这里的不同,是指Actor内部的用户状态(即开发者自定义的字段)全部丢失,都会被重建。但两个不同实例的UID相同,这就意味着用相同的ActorRef可以跟后面重新建立的Instance 2发消息。