“A path in an actor system represents a “place” which might be occupied by a living actor. Initially (apart from system initialized actors) a path is empty. When actorOf()
is called it assigns an incarnation of the actor described by the passed Props
to the given path. An actor incarnation is identified by the path and a UID.”
“A restart only swaps the Actor
instance defined by the Props
but the incarnation and hence the UID remains the same. As long as the incarnation is same, you can keep using the same ActorRef
. Restart is handled by the Supervision Strategy of actor’s parent actor, and there is more discussion about what restart means.
The lifecycle of an incarnation ends when the actor is stopped. At that point the appropriate lifecycle events are called and watching actors are notified of the termination. After the incarnation is stopped, the path can be reused again by creating an actor with actorOf()
. In this case the name of the new incarnation will be the same as the previous one but the UIDs will differ. An actor can be stopped by the actor itself, another actor or the ActorSystem
“An ActorRef
always represents an incarnation (path and UID) not just a given path. Therefore if an actor is stopped and a new one with the same name is created an ActorRef
of the old incarnation will not point to the new one.”
@tailrec final def newUid(): Int = { // Note that this uid is also used as hashCode in ActorRef, so be careful // to not break hashing if you change the way uid is generated val uid = ThreadLocalRandom.current.nextInt() if (uid == undefinedUid) newUid else uid }
final override def hashCode: Int = { if (path.uid == ActorCell.undefinedUid) path.hashCode else path.uid } /** * Equals takes path and the unique id of the actor cell into account. */ final override def equals(that: Any): Boolean = that match { case other: ActorRef ⇒ path.uid == other.path.uid && path == other.path case _ ⇒ false }
我们来看一下abstract class ActorRef对hasCode和equals的定义就大概明白,UID的具体作用了,跟我们分析的是一致的。那我们来看看ActorPath的equals是如何定义的。
override def equals(other: Any): Boolean = { @tailrec def rec(left: ActorPath, right: ActorPath): Boolean = if (left eq right) true else if (left.isInstanceOf[RootActorPath]) left equals right else if (right.isInstanceOf[RootActorPath]) right equals left else left.name == right.name && rec(left.parent, right.parent) other match { case p: ActorPath ⇒ rec(this, p) case _ ⇒ false } }
final def invoke(messageHandle: Envelope): Unit = { val influenceReceiveTimeout = !messageHandle.message.isInstanceOf[NotInfluenceReceiveTimeout] try { currentMessage = messageHandle if (influenceReceiveTimeout) cancelReceiveTimeout() messageHandle.message match { case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle) case msg ⇒ receiveMessage(msg) } currentMessage = null // reset current message after successful invocation } catch handleNonFatalOrInterruptedException { e ⇒ handleInvokeFailure(Nil, e) } finally { if (influenceReceiveTimeout) checkReceiveTimeout // Reschedule receive timeout } }
final def handleInvokeFailure(childrenNotToSuspend: immutable.Iterable[ActorRef], t: Throwable): Unit = { // prevent any further messages to be processed until the actor has been restarted if (!isFailed) try { suspendNonRecursive() // suspend children val skip: Set[ActorRef] = currentMessage match { case Envelope(Failed(_, _, _), child) ⇒ { setFailed(child); Set(child) } case _ ⇒ { setFailed(self); Set.empty } } suspendChildren(exceptFor = skip ++ childrenNotToSuspend) t match { // tell supervisor case _: InterruptedException ⇒ // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ parent.sendSystemMessage(Failed(self, new ActorInterruptedException(t), uid)) case _ ⇒ // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ parent.sendSystemMessage(Failed(self, t, uid)) } } catch handleNonFatalOrInterruptedException { e ⇒ publish(Error(e, self.path.toString, clazz(actor), "emergency stop: exception in failure handling for " + t.getClass + Logging.stackTraceFor(t))) try children foreach stop finally finishTerminate() } }
final protected def handleFailure(f: Failed): Unit = { currentMessage = Envelope(f, f.child, system) getChildByRef(f.child) match { /* * only act upon the failure, if it comes from a currently known child; * the UID protects against reception of a Failed from a child which was * killed in preRestart and re-created in postRestart */ case Some(stats) if stats.uid == f.uid ⇒ if (!actor.supervisorStrategy.handleFailure(this, f.child, f.cause, stats, getAllChildStats)) throw f.cause case Some(stats) ⇒ publish(Debug(self.path.toString, clazz(actor), "dropping Failed(" + f.cause + ") from old child " + f.child + " (uid=" + stats.uid + " != " + f.uid + ")")) case None ⇒ publish(Debug(self.path.toString, clazz(actor), "dropping Failed(" + f.cause + ") from unknown child " + f.child)) } }
def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = { val directive = decider.applyOrElse(cause, escalateDefault) directive match { case Resume ⇒ logFailure(context, child, cause, directive) resumeChild(child, cause) true case Restart ⇒ logFailure(context, child, cause, directive) processFailure(context, true, child, cause, stats, children) true case Stop ⇒ logFailure(context, child, cause, directive) processFailure(context, false, child, cause, stats, children) true case Escalate ⇒ logFailure(context, child, cause, directive) false } }
def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = { if (restart && stats.requestRestartPermission(retriesWindow)) restartChild(child, cause, suspendFirst = false) else context.stop(child) //TODO optimization to drop child here already? }
final def restartChild(child: ActorRef, cause: Throwable, suspendFirst: Boolean): Unit = { val c = child.asInstanceOf[InternalActorRef] if (suspendFirst) c.suspend() c.restart(cause) }
def restart(cause: Throwable): Unit = underlying.restart(cause)
final def restart(cause: Throwable): Unit = try dispatcher.systemDispatch(this, Recreate(cause)) catch handleException
private def finishRecreate(cause: Throwable, failedActor: Actor): Unit = { // need to keep a snapshot of the surviving children before the new actor instance creates new ones val survivors = children try { try resumeNonRecursive() finally clearFailed() // must happen in any case, so that failure is propagated val freshActor = newActor() actor = freshActor // this must happen before postRestart has a chance to fail if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields. freshActor.aroundPostRestart(cause) if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(freshActor), "restarted")) // only after parent is up and running again do restart the children which were not stopped survivors foreach (child ⇒ try child.asInstanceOf[InternalActorRef].restart(cause) catch handleNonFatalOrInterruptedException { e ⇒ publish(Error(e, self.path.toString, clazz(freshActor), "restarting " + child)) }) } catch handleNonFatalOrInterruptedException { e ⇒ clearActorFields(actor, recreate = false) // in order to prevent preRestart() from happening again handleInvokeFailure(survivors, PostRestartException(self, e, cause)) } }
其实我们可以这样简单的理解,ActorRef = “ActorPathString” + UID。开发者自定义的Actor类是一个静态的概念,当类通过actorOf创建的时候,就会产生一个Actor实例,如果该Actor由于某种原因失败,被系统restart,系统会新生成一个Actor实例,但该实例的UID不变,所以ActorRef指向相同路径下的actor实例。ActorPath标志Actor的树形路径,通过它可以找到这个路径下的实例,但实例的UID是不是相同则不关心。