上一篇博客中,我们研究了remote模式下如何发消息给远程actor,其实无论如何,最终都是通过RemoteActorRef来发送消息的。另外官网也明确说明了,ActorRef是可以忽略网络位置的,这其实有两点含义:1.ActorRef可以序列化后跨网络传输;2.ActorRef反序列化后在本地可以正常识别是本地还是远程。那么实现位置透明就有两个关键点:1.ActorRef的序列化过程;2.ActorRef的识别。下面我们来逐一研究这两个关键点。
在local模式下,是通过InternalActorRef发送消息的;remote是通过RemoteActorRef发送消息的,那这两者有什么区别呢?
/** * INTERNAL API * Remote ActorRef that is used when referencing the Actor on a different node than its "home" node. * This reference is network-aware (remembers its origin) and immutable. */ private[akka] class RemoteActorRef private[akka] ( remote: RemoteTransport, val localAddressToUse: Address, val path: ActorPath, val getParent: InternalActorRef, props: Option[Props], deploy: Option[Deploy]) extends InternalActorRef with RemoteRef
从源码可以看出RemoteActorRef继承了InternalActorRef,还扩展了RemoteRef特质。
private[akka] trait RemoteRef extends ActorRefScope { final def isLocal = false }
RemoteRef比较简单,就是把isLocal定义成了false。这样看来RemoteRef和InternalActorRef差别并不是特别大。ActorRef在本地传输时,默认是不需要序列化的,那该如何切入序列化过程呢?我们首先来看序列化的过程。
还记得之前的文章吗?在remote模式下,是通过EndpointWriter.writeSend发送消息的。
def writeSend(s: Send): Boolean = try { handle match { case Some(h) ⇒ if (provider.remoteSettings.LogSend && log.isDebugEnabled) { def msgLog = s"RemoteMessage: [${s.message}] to [${s.recipient}]<+[${s.recipient.path}] from [${s.senderOption.getOrElse(extendedSystem.deadLetters)}]" log.debug("sending message {}", msgLog) } val pdu = codec.constructMessage( s.recipient.localAddressToUse, s.recipient, serializeMessage(s.message), s.senderOption, seqOption = s.seqOpt, ackOption = lastAck) val pduSize = pdu.size remoteMetrics.logPayloadBytes(s.message, pduSize) if (pduSize > transport.maximumPayloadBytes) { val reason = new OversizedPayloadException(s"Discarding oversized payload sent to ${s.recipient}: max allowed size ${transport.maximumPayloadBytes} bytes, actual size of encoded ${s.message.getClass} was ${pdu.size} bytes.") log.error(reason, "Transient association error (association remains live)") true } else { val ok = h.write(pdu) if (ok) { ackDeadline = newAckDeadline lastAck = None } ok } case None ⇒ throw new EndpointException("Internal error: Endpoint is in state Writing, but no association handle is present.") } } catch { case e: NotSerializableException ⇒ log.error(e, "Serializer not defined for message type [{}]. Transient association error (association remains live)", s.message.getClass) true case e: IllegalArgumentException ⇒ log.error(e, "Serializer not defined for message type [{}]. Transient association error (association remains live)", s.message.getClass) true case e: MessageSerializer.SerializationException ⇒ log.error(e, "{} Transient association error (association remains live)", e.getMessage) true case e: EndpointException ⇒ publishAndThrow(e, Logging.ErrorLevel) case NonFatal(e) ⇒ publishAndThrow(new EndpointException("Failed to write message to the transport", e), Logging.ErrorLevel) }
可以看到codec.constructMessage函数中调用了serializeMessage函数对待发送的消息进行了序列化,那如果用户发送的消息中包含ActorRef,就一定会在这个函数处理。哪些消息会包含ActorRef呢?还记得ActorIdentity吗,里面就包含ActorRef。当然了,如果用户自定义的消息包含ActorRef,也一定会被序列化。
private def serializeMessage(msg: Any): SerializedMessage = handle match { case Some(h) ⇒ Serialization.currentTransportInformation.withValue(Serialization.Information(h.localAddress, extendedSystem)) { MessageSerializer.serialize(extendedSystem, msg.asInstanceOf[AnyRef]) } case None ⇒ throw new EndpointException("Internal error: No handle was present during serialization of outbound message.") }
很明显EndpointWriter.serializeMessage有调用了MessageSerializer.serialize进行序列化。
/** * Serialization information needed for serializing local actor refs, * or if serializer library e.g. custom serializer/deserializer in Jackson need * access to the current `ActorSystem`. */ final case class Information(address: Address, system: ActorSystem)
Serialization.Information这个case class比较简单,官网说的也很清楚,这里不再详细分析。简单点说,它就是给序列化过程提供了必需的基础变量,例如地址和当前的ActorSystem。
/** * INTERNAL API: This holds a reference to the current transport serialization information used for * serializing local actor refs, or if serializer library e.g. custom serializer/deserializer in * Jackson need access to the current `ActorSystem`. */ @InternalApi private[akka] val currentTransportInformation = new DynamicVariable[Information](null)
Serialization.currentTransportInformation又是什么呢?
/** `DynamicVariables` provide a binding mechanism where the current * value is found through dynamic scope, but where access to the * variable itself is resolved through static scope. * * The current value can be retrieved with the value method. New values * should be pushed using the `withValue` method. Values pushed via * `withValue` only stay valid while the `withValue`'s second argument, a * parameterless closure, executes. When the second argument finishes, * the variable reverts to the previous value. * * {{{ * someDynamicVariable.withValue(newValue) { * // ... code called in here that calls value ... * // ... will be given back the newValue ... * } * }}} * * Each thread gets its own stack of bindings. When a * new thread is created, the `DynamicVariable` gets a copy * of the stack of bindings from the parent thread, and * from then on the bindings for the new thread * are independent of those for the original thread. * * @author Lex Spoon * @version 1.1, 2007-5-21 */ class DynamicVariable[T](init: T)
currentTransportInformation是一个动态变量,其具体的功能和用法,scala官网说的也很清楚,你可以把它理解成一个能够继承父线程数据的ThreadLocal变量。
/** Set the value of the variable while executing the specified * thunk. * * @param newval The value to which to set the variable * @param thunk The code to evaluate under the new setting */ def withValue[S](newval: T)(thunk: => S): S = { val oldval = value tl set newval try thunk finally tl set oldval }
withValue函数,其实就是给thunk提供一个线程安全的执行变量环境。
综上所述,MessageSerializer.serialize(extendedSystem, msg.asInstanceOf[AnyRef])在执行时通过currentTransportInformation获取到的值就是Serialization.Information(h.localAddress, extendedSystem),那就来看看serialize在做什么。
/** * Uses Akka Serialization for the specified ActorSystem to transform the given message to a MessageProtocol * Throws `NotSerializableException` if serializer was not configured for the message type. * Throws `MessageSerializer.SerializationException` if exception was thrown from `toBinary` of the * serializer. */ def serialize(system: ExtendedActorSystem, message: AnyRef): SerializedMessage = { val s = SerializationExtension(system) val serializer = s.findSerializerFor(message) val builder = SerializedMessage.newBuilder val oldInfo = Serialization.currentTransportInformation.value try { if (oldInfo eq null) Serialization.currentTransportInformation.value = system.provider.serializationInformation builder.setMessage(ByteString.copyFrom(serializer.toBinary(message))) builder.setSerializerId(serializer.identifier) val ms = Serializers.manifestFor(serializer, message) if (ms.nonEmpty) builder.setMessageManifest(ByteString.copyFromUtf8(ms)) builder.build } catch { case NonFatal(e) ⇒ throw new SerializationException(s"Failed to serialize remote message [${message.getClass}] " + s"using serializer [${serializer.getClass}].", e) } finally Serialization.currentTransportInformation.value = oldInfo }
serialize函数简单来说,就是通过SerializationExtension给message找到一个serializer,用serializer把message转化成二进制,也就是序列化message。在通过SerializedMessage.Builder设置一些其他信息,最终返回SerializedMessage消息。那么如何通过SerializationExtension找到一个合适的serializer就很重要了。
/** * Returns the configured Serializer for the given Class. The configured Serializer * is used if the configured class `isAssignableFrom` from the `clazz`, i.e. * the configured class is a super class or implemented interface. In case of * ambiguity it is primarily using the most specific configured class, * and secondly the entry configured first. * * Throws java.io.NotSerializableException if no `serialization-bindings` is configured for the class. */ @throws(classOf[NotSerializableException]) def serializerFor(clazz: Class[_]): Serializer = serializerMap.get(clazz) match { case null ⇒ // bindings are ordered from most specific to least specific def unique(possibilities: immutable.Seq[(Class[_], Serializer)]): Boolean = possibilities.size == 1 || (possibilities forall (_._1 isAssignableFrom possibilities(0)._1)) || (possibilities forall (_._2 == possibilities(0)._2)) val ser = { bindings.filter { case (c, _) ⇒ c isAssignableFrom clazz } match { case immutable.Seq() ⇒ throw new NotSerializableException(s"No configured serialization-bindings for class [${clazz.getName}]") case possibilities ⇒ if (unique(possibilities)) possibilities.head._2 else { // give JavaSerializer lower priority if multiple serializers found val possibilitiesWithoutJavaSerializer = possibilities.filter { case (_, _: JavaSerializer) ⇒ false case (_, _: DisabledJavaSerializer) ⇒ false case _ ⇒ true } if (possibilitiesWithoutJavaSerializer.isEmpty) { // shouldn't happen throw new NotSerializableException(s"More than one JavaSerializer configured for class [${clazz.getName}]") } if (!unique(possibilitiesWithoutJavaSerializer)) { _log.warning(LogMarker.Security, "Multiple serializers found for [{}], choosing first of: [{}]", clazz.getName, possibilitiesWithoutJavaSerializer.map { case (_, s) ⇒ s.getClass.getName }.mkString(", ")) } possibilitiesWithoutJavaSerializer.head._2 } } } serializerMap.putIfAbsent(clazz, ser) match { case null ⇒ if (shouldWarnAboutJavaSerializer(clazz, ser)) { _log.warning(LogMarker.Security, "Using the default Java serializer for class [{}] which is not recommended because of " + "performance implications. Use another serializer or disable this warning using the setting " + "'akka.actor.warn-about-java-serializer-usage'", clazz.getName) } log.debug("Using serializer [{}] for message [{}]", ser.getClass.getName, clazz.getName) ser case some ⇒ some } case ser ⇒ ser }
findSerializerFor最终调用了serializerFor,serializerFor简单点来说就是首先查找配置的序列化函数,如果没有找到则通过bindings中查找是否符合isAssignableFrom条件的序列化类,如果只找到了相同的序列化类,则使用该序列化类,如果找到多个则优先使用除JavaSerializer以外的序列化类。当然了,默认情况下是一定可以找到JavaSerializer的。serializer具体加载的过程这里就不再具体分析,只需要知道它是从配置文件加载的就可以了。那默认配置是怎么样的呢?下面是akka remote包里面的reference.conf摘录出来的部分配置。
serializers { akka-containers = "akka.remote.serialization.MessageContainerSerializer" akka-misc = "akka.remote.serialization.MiscMessageSerializer" artery = "akka.remote.serialization.ArteryMessageSerializer" proto = "akka.remote.serialization.ProtobufSerializer" daemon-create = "akka.remote.serialization.DaemonMsgCreateSerializer" primitive-long = "akka.remote.serialization.LongSerializer" primitive-int = "akka.remote.serialization.IntSerializer" primitive-string = "akka.remote.serialization.StringSerializer" primitive-bytestring = "akka.remote.serialization.ByteStringSerializer" akka-system-msg = "akka.remote.serialization.SystemMessageSerializer" } serialization-bindings { "akka.actor.ActorSelectionMessage" = akka-containers "akka.remote.DaemonMsgCreate" = daemon-create "akka.remote.artery.ArteryMessage" = artery # Since akka.protobuf.Message does not extend Serializable but # GeneratedMessage does, need to use the more specific one here in order # to avoid ambiguity. "akka.protobuf.GeneratedMessage" = proto # Since com.google.protobuf.Message does not extend Serializable but # GeneratedMessage does, need to use the more specific one here in order # to avoid ambiguity. # This com.google.protobuf serialization binding is only used if the class can be loaded, # i.e. com.google.protobuf dependency has been added in the application project. "com.google.protobuf.GeneratedMessage" = proto "java.util.Optional" = akka-misc # The following are handled by the MiscMessageSerializer, but they are not enabled for # compatibility reasons (it was added in Akka 2.5.[8,9,12]). Enable them by adding: # akka.actor.serialization-bindings { # "akka.Done" = akka-misc # "akka.NotUsed" = akka-misc # "akka.actor.Address" = akka-misc # "akka.remote.UniqueAddress" = akka-misc # } } # Additional serialization-bindings that are replacing Java serialization are # defined in this section for backwards compatibility reasons. They are included # by default but can be excluded for backwards compatibility with Akka 2.4.x. # They can be disabled with enable-additional-serialization-bindings=off. additional-serialization-bindings { "akka.actor.Identify" = akka-misc "akka.actor.ActorIdentity" = akka-misc "scala.Some" = akka-misc "scala.None$" = akka-misc "akka.actor.Status$Success" = akka-misc "akka.actor.Status$Failure" = akka-misc "akka.actor.ActorRef" = akka-misc "akka.actor.PoisonPill$" = akka-misc "akka.actor.Kill$" = akka-misc "akka.remote.RemoteWatcher$Heartbeat$" = akka-misc "akka.remote.RemoteWatcher$HeartbeatRsp" = akka-misc "akka.actor.ActorInitializationException" = akka-misc "akka.dispatch.sysmsg.SystemMessage" = akka-system-msg "java.lang.String" = primitive-string "akka.util.ByteString$ByteString1C" = primitive-bytestring "akka.util.ByteString$ByteString1" = primitive-bytestring "akka.util.ByteString$ByteStrings" = primitive-bytestring "java.lang.Long" = primitive-long "scala.Long" = primitive-long "java.lang.Integer" = primitive-int "scala.Int" = primitive-int # Java Serializer is by default used for exceptions. # It's recommended that you implement custom serializer for exceptions that are # sent remotely, e.g. in akka.actor.Status.Failure for ask replies. You can add # binding to akka-misc (MiscMessageSerializerSpec) for the exceptions that have # a constructor with single message String or constructor with message String as # first parameter and cause Throwable as second parameter. Note that it's not # safe to add this binding for general exceptions such as IllegalArgumentException # because it may have a subclass without required constructor. "java.lang.Throwable" = java "akka.actor.IllegalActorStateException" = akka-misc "akka.actor.ActorKilledException" = akka-misc "akka.actor.InvalidActorNameException" = akka-misc "akka.actor.InvalidMessageException" = akka-misc "akka.actor.LocalScope$" = akka-misc "akka.remote.RemoteScope" = akka-misc "com.typesafe.config.impl.SimpleConfig" = akka-misc "com.typesafe.config.Config" = akka-misc "akka.routing.FromConfig" = akka-misc "akka.routing.DefaultResizer" = akka-misc "akka.routing.BalancingPool" = akka-misc "akka.routing.BroadcastGroup" = akka-misc "akka.routing.BroadcastPool" = akka-misc "akka.routing.RandomGroup" = akka-misc "akka.routing.RandomPool" = akka-misc "akka.routing.RoundRobinGroup" = akka-misc "akka.routing.RoundRobinPool" = akka-misc "akka.routing.ScatterGatherFirstCompletedGroup" = akka-misc "akka.routing.ScatterGatherFirstCompletedPool" = akka-misc "akka.routing.SmallestMailboxPool" = akka-misc "akka.routing.TailChoppingGroup" = akka-misc "akka.routing.TailChoppingPool" = akka-misc "akka.remote.routing.RemoteRouterConfig" = akka-misc }
通过上面的配置,我们知道ActorRef是通过akka-misc,也就是akka.remote.serialization.MiscMessageSerializer来序列化的。
MiscMessageSerializer.toBinary是调用了serializeActorRef对ActorRef序列化的。
private def serializeActorRef(ref: ActorRef): Array[Byte] = actorRefBuilder(ref).build().toByteArray
private def actorRefBuilder(actorRef: ActorRef): ContainerFormats.ActorRef.Builder = ContainerFormats.ActorRef.newBuilder() .setPath(Serialization.serializedActorPath(actorRef))
/** * The serialized path of an actorRef, based on the current transport serialization information. * If there is no external address available in the given `ActorRef` then the systems default * address will be used and that is retrieved from the ThreadLocal `Serialization.Information` * that was set with [[Serialization#withTransportInformation]]. */ def serializedActorPath(actorRef: ActorRef): String = { val path = actorRef.path val originalSystem: ExtendedActorSystem = actorRef match { case a: ActorRefWithCell ⇒ a.underlying.system.asInstanceOf[ExtendedActorSystem] case _ ⇒ null } Serialization.currentTransportInformation.value match { case null ⇒ originalSystem match { case null ⇒ path.toSerializationFormat case system ⇒ try path.toSerializationFormatWithAddress(system.provider.getDefaultAddress) catch { case NonFatal(_) ⇒ path.toSerializationFormat } } case Information(address, system) ⇒ if (originalSystem == null || originalSystem == system) path.toSerializationFormatWithAddress(address) else { val provider = originalSystem.provider path.toSerializationFormatWithAddress(provider.getExternalAddressFor(address).getOrElse(provider.getDefaultAddress)) } } }
首先Serialization.currentTransportInformation.value一定不为空,这个之前已经赋值过了,所以一定会走到Information(address, system),而且无论执行if的哪个分支,最后都是通过调用toSerializationFormatWithAddress对ActorRef进行序列化的。
这样来看ActorRef在序列化时,对当前的path转化成了序列化的格式,其实就是ActorPath的String值。只不过在remote模式下,是包含host:port、协议(比如akka.tcp)等信息的。那么分析到这里,聪明的读者一定知道反序列化的过程了:对ActorPath的String值进行解析,转化成对应的RemoteActorRef。关于如果通过ActorPath在之前的文章其实我们有分析过,但这里还是再带领大家过一遍。
override def receive: Receive = { case Disassociated(info) ⇒ handleDisassociated(info) case InboundPayload(p) if p.size <= transport.maximumPayloadBytes ⇒ val (ackOption, msgOption) = tryDecodeMessageAndAck(p) for (ack ← ackOption; reliableDelivery ← reliableDeliverySupervisor) reliableDelivery ! ack msgOption match { case Some(msg) ⇒ if (msg.reliableDeliveryEnabled) { ackedReceiveBuffer = ackedReceiveBuffer.receive(msg) deliverAndAck() } else try msgDispatch.dispatch(msg.recipient, msg.recipientAddress, msg.serializedMessage, msg.senderOption) catch { case e: NotSerializableException ⇒ logTransientSerializationError(msg, e) case e: IllegalArgumentException ⇒ logTransientSerializationError(msg, e) } case None ⇒ } case InboundPayload(oversized) ⇒ log.error( new OversizedPayloadException(s"Discarding oversized payload received: " + s"max allowed size [${transport.maximumPayloadBytes}] bytes, actual size [${oversized.size}] bytes."), "Transient error while reading from association (association remains live)") case StopReading(writer, replyTo) ⇒ saveState() context.become(notReading) replyTo ! StoppedReading(writer) }
EndpointReader.receive在收到InboundPayload消息后,先把它decode成Message,然后把消息通过msgDispatch.dispatch发送出去,而msgDispatch是一个DefaultMessageDispatcher实例。
override def dispatch( recipient: InternalActorRef, recipientAddress: Address, serializedMessage: SerializedMessage, senderOption: OptionVal[ActorRef]): Unit = { import provider.remoteSettings._ lazy val payload: AnyRef = MessageSerializer.deserialize(system, serializedMessage) def payloadClass: Class[_] = if (payload eq null) null else payload.getClass val sender: ActorRef = senderOption.getOrElse(system.deadLetters) val originalReceiver = recipient.path def logMessageReceived(messageType: String): Unit = { if (LogReceive && log.isDebugEnabled) log.debug(s"received $messageType RemoteMessage: [{}] to [{}]<+[{}] from [{}]", payload, recipient, originalReceiver, sender) } recipient match { case `remoteDaemon` ⇒ if (UntrustedMode) log.debug(LogMarker.Security, "dropping daemon message in untrusted mode") else { logMessageReceived("daemon message") remoteDaemon ! payload } case l @ (_: LocalRef | _: RepointableRef) if l.isLocal ⇒ logMessageReceived("local message") payload match { case sel: ActorSelectionMessage ⇒ if (UntrustedMode && (!TrustedSelectionPaths.contains(sel.elements.mkString("/", "/", "")) || sel.msg.isInstanceOf[PossiblyHarmful] || l != provider.rootGuardian)) log.debug( LogMarker.Security, "operating in UntrustedMode, dropping inbound actor selection to [{}], " + "allow it by adding the path to 'akka.remote.trusted-selection-paths' configuration", sel.elements.mkString("/", "/", "")) else // run the receive logic for ActorSelectionMessage here to make sure it is not stuck on busy user actor ActorSelection.deliverSelection(l, sender, sel) case msg: PossiblyHarmful if UntrustedMode ⇒ log.debug(LogMarker.Security, "operating in UntrustedMode, dropping inbound PossiblyHarmful message of type [{}]", msg.getClass.getName) case msg: SystemMessage ⇒ l.sendSystemMessage(msg) case msg ⇒ l.!(msg)(sender) } case r @ (_: RemoteRef | _: RepointableRef) if !r.isLocal && !UntrustedMode ⇒ logMessageReceived("remote-destined message") if (provider.transport.addresses(recipientAddress)) // if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed) r.!(payload)(sender) else log.error( "dropping message [{}] for non-local recipient [{}] arriving at [{}] inbound addresses are [{}]", payloadClass, r, recipientAddress, provider.transport.addresses.mkString(", ")) case r ⇒ log.error( "dropping message [{}] for unknown recipient [{}] arriving at [{}] inbound addresses are [{}]", payloadClass, r, recipientAddress, provider.transport.addresses.mkString(", ")) } }
dispatch首先调用MessageSerializer.deserialize(system, serializedMessage)对消息进行反序列化。
/** * Uses Akka Serialization for the specified ActorSystem to transform the given MessageProtocol to a message */ def deserialize(system: ExtendedActorSystem, messageProtocol: SerializedMessage): AnyRef = { SerializationExtension(system).deserialize( messageProtocol.getMessage.toByteArray, messageProtocol.getSerializerId, if (messageProtocol.hasMessageManifest) messageProtocol.getMessageManifest.toStringUtf8 else "").get }
/** * Deserializes the given array of bytes using the specified serializer id, * using the optional type hint to the Serializer. * Returns either the resulting object or an Exception if one was thrown. */ def deserialize(bytes: Array[Byte], serializerId: Int, manifest: String): Try[AnyRef] = Try { val serializer = try getSerializerById(serializerId) catch { case _: NoSuchElementException ⇒ throw new NotSerializableException( s"Cannot find serializer with id [$serializerId]. The most probable reason is that the configuration entry " + "akka.actor.serializers is not in synch between the two systems.") } deserializeByteArray(bytes, serializer, manifest) }
分析上面代码得知,反序列化时就是简单的通过serializerId找到对应具体的序列化类,然后调用deserializeByteArray函数进行反序列化。还记得如果当前消息是ActorRef的话,serializerId是什么吗?没错,就是akka-misc。也就是说最终通过akka.remote.serialization.MiscMessageSerializer进行反序列化。但有一个字段也是比较关键manifest,这个manifest是什么呢?可以从ActorRef的序列化过程找到蛛丝马迹。
在MessageSerializer.serialize函数中,有一段代码对这个manifest进行了赋值:val ms = Serializers.manifestFor(serializer, message)。
def manifestFor(s: Serializer, message: AnyRef): String = s match { case s2: SerializerWithStringManifest ⇒ s2.manifest(message) case _ ⇒ if (s.includeManifest) message.getClass.getName else "" }
其实就是判断当前的Serializer是不是SerializerWithStringManifest的子类,如果是就调用manifest,如果不是,就判断includeManifest是不是为true,如果是就返回当前类的类名,否则返回空字符串。我们来看下MiscMessageSerializer的定义。
class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest with BaseSerializer
private val ActorRefManifest = "G"
很明显它继承了SerializerWithStringManifest,而且对于ActorRef,manifest的值就是字符串G。
private def deserializeByteArray(bytes: Array[Byte], serializer: Serializer, manifest: String): AnyRef = { @tailrec def updateCache(cache: Map[String, Option[Class[_]]], key: String, value: Option[Class[_]]): Boolean = { manifestCache.compareAndSet(cache, cache.updated(key, value)) || updateCache(manifestCache.get, key, value) // recursive, try again } withTransportInformation { () ⇒ serializer match { case s2: SerializerWithStringManifest ⇒ s2.fromBinary(bytes, manifest) case s1 ⇒ if (manifest == "") s1.fromBinary(bytes, None) else { val cache = manifestCache.get cache.get(manifest) match { case Some(cachedClassManifest) ⇒ s1.fromBinary(bytes, cachedClassManifest) case None ⇒ system.dynamicAccess.getClassFor[AnyRef](manifest) match { case Success(classManifest) ⇒ val classManifestOption: Option[Class[_]] = Some(classManifest) updateCache(cache, manifest, classManifestOption) s1.fromBinary(bytes, classManifestOption) case Failure(e) ⇒ throw new NotSerializableException( s"Cannot find manifest class [$manifest] for serializer with id [${serializer.identifier}].") } } } } } }
那我们接下来看看deserializeByteArray的源码,它首先会判断当前serializer是不是SerializerWithStringManifest,很显然对于ActorRef,serializer是SerializerWithStringManifest的子类,那我们来看看是fromBinary是如何实现的。
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = fromBinaryMap.get(manifest) match { case Some(deserializer) ⇒ deserializer(bytes) case None ⇒ throw new NotSerializableException( s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]") }
private val fromBinaryMap = Map[String, Array[Byte] ⇒ AnyRef]( IdentifyManifest → deserializeIdentify, ActorIdentityManifest → deserializeActorIdentity, StatusSuccessManifest → deserializeStatusSuccess, StatusFailureManifest → deserializeStatusFailure, ThrowableManifest → throwableSupport.deserializeThrowable, ActorRefManifest → deserializeActorRefBytes, OptionManifest → deserializeOption, OptionalManifest → deserializeOptional, PoisonPillManifest → ((_) ⇒ PoisonPill), KillManifest → ((_) ⇒ Kill), RemoteWatcherHBManifest → ((_) ⇒ RemoteWatcher.Heartbeat), DoneManifest → ((_) ⇒ Done), NotUsedManifest → ((_) ⇒ NotUsed), AddressManifest → deserializeAddressData, UniqueAddressManifest → deserializeUniqueAddress, RemoteWatcherHBRespManifest → deserializeHeartbeatRsp, ActorInitializationExceptionManifest → deserializeActorInitializationException, LocalScopeManifest → ((_) ⇒ LocalScope), RemoteScopeManifest → deserializeRemoteScope, ConfigManifest → deserializeConfig, FromConfigManifest → deserializeFromConfig, DefaultResizerManifest → deserializeDefaultResizer, BalancingPoolManifest → deserializeBalancingPool, BroadcastPoolManifest → deserializeBroadcastPool, RandomPoolManifest → deserializeRandomPool, RoundRobinPoolManifest → deserializeRoundRobinPool, ScatterGatherPoolManifest → deserializeScatterGatherPool, TailChoppingPoolManifest → deserializeTailChoppingPool, RemoteRouterConfigManifest → deserializeRemoteRouterConfig )
private def deserializeActorRefBytes(bytes: Array[Byte]): ActorRef = deserializeActorRef(ContainerFormats.ActorRef.parseFrom(bytes))
private def deserializeActorRef(actorRef: ContainerFormats.ActorRef): ActorRef = serialization.system.provider.resolveActorRef(actorRef.getPath)
由此可见,首先调用了ContainerFormats.ActorRef.parseFrom把Array[Byte] 转化成了ContainerFormats.ActorRef,这个过程就不再具体分析;其次调用serialization.system.provider.resolveActorRef把当前的ActorPathString转化成了ActorRef。根据上下文,serialization.system.provider应该就是RemoteActorRefProvider。
def resolveActorRef(path: String): ActorRef = { // using thread local LRU cache, which will call internalRresolveActorRef // if the value is not cached actorRefResolveThreadLocalCache match { case null ⇒ internalResolveActorRef(path) // not initalized yet case c ⇒ c.threadLocalCache(this).getOrCompute(path) } }
resolveActorRef这段代码会先判断当前actorRefResolveThreadLocalCache缓存是否已经初始化,很显然在RemoteActorRefProvider.init过程中,actorRefResolveThreadLocalCache已经被创建,之前也分析过。
/** * INTERNAL API */ private[akka] final class ActorRefResolveCache(provider: RemoteActorRefProvider) extends LruBoundedCache[String, ActorRef](capacity = 1024, evictAgeThreshold = 600) { override protected def compute(k: String): ActorRef = provider.internalResolveActorRef(k) override protected def hash(k: String): Int = Unsafe.fastHash(k) override protected def isCacheable(v: ActorRef): Boolean = !v.isInstanceOf[EmptyLocalActorRef] }
actorRefResolveThreadLocalCache是一个ActorSystem的Extension,这个Extension最终是ActorRefResolveCache的实例,这个类是不是很熟悉?它是一个LruBoundedCache,容量是1024,过期时间是600秒。很显然第一次getOrCompute时,会调用compute函数,而compute又调用了provider.internalResolveActorRef,解析之后对解析的结果进行缓存,具体如何缓存也不再分析。下面来分析internalResolveActorRef。
/** * INTERNAL API: This is used by the `ActorRefResolveCache` via the * public `resolveActorRef(path: String)`. */ private[akka] def internalResolveActorRef(path: String): ActorRef = path match { case ActorPathExtractor(address, elems) ⇒ if (hasAddress(address)) local.resolveActorRef(rootGuardian, elems) else { val rootPath = RootActorPath(address) / elems try { new RemoteActorRef(transport, transport.localAddressForRemote(address), rootPath, Nobody, props = None, deploy = None) } catch { case NonFatal(e) ⇒ log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage) new EmptyLocalActorRef(this, rootPath, eventStream) } } case _ ⇒ log.debug("Resolve (deserialization) of unknown (invalid) path [{}], using deadLetters.", path) deadLetters }
internalResolveActorRef还有印象吗?这个函数在分析actorSelection过程中出现过,它首先会判断当前机器是否包含path中的address,如果包含就调用LocalActorRefProvider进行查找,否则就创建RemoteActorRef。因为消息来自远程actor,所以会创建RemoteActorRef作为远程actor的代理。至此远程ActorRef反序列化结束。
至此我们对ActorRef的序列化、反序列化过程做了完整的分析,其实Akka的位置透明就是通过ActorPath来实现的,ActorRef跨网路传输都是通过ActorPath的String值(包含host/port等信息)来完成的,指定host收到ActorRef的序列化消息,会根据反序列化后的ActorPath的String值,在本地创建远程actor的ActorRef代理:RemoteActorRef。之后的通信都是通过RemoteActorRef。这样看来,位置透明也是比较简单的。