zoukankan      html  css  js  c++  java
  • Akka源码分析-Remote-位置透明

      上一篇博客中,我们研究了remote模式下如何发消息给远程actor,其实无论如何,最终都是通过RemoteActorRef来发送消息的。另外官网也明确说明了,ActorRef是可以忽略网络位置的,这其实有两点含义:1.ActorRef可以序列化后跨网络传输;2.ActorRef反序列化后在本地可以正常识别是本地还是远程。那么实现位置透明就有两个关键点:1.ActorRef的序列化过程;2.ActorRef的识别。下面我们来逐一研究这两个关键点。

      在local模式下,是通过InternalActorRef发送消息的;remote是通过RemoteActorRef发送消息的,那这两者有什么区别呢?

    /**
     * INTERNAL API
     * Remote ActorRef that is used when referencing the Actor on a different node than its "home" node.
     * This reference is network-aware (remembers its origin) and immutable.
     */
    private[akka] class RemoteActorRef private[akka] (
      remote:                RemoteTransport,
      val localAddressToUse: Address,
      val path:              ActorPath,
      val getParent:         InternalActorRef,
      props:                 Option[Props],
      deploy:                Option[Deploy])
      extends InternalActorRef with RemoteRef 
    

       从源码可以看出RemoteActorRef继承了InternalActorRef,还扩展了RemoteRef特质。

    private[akka] trait RemoteRef extends ActorRefScope {
      final def isLocal = false
    }
    

       RemoteRef比较简单,就是把isLocal定义成了false。这样看来RemoteRef和InternalActorRef差别并不是特别大。ActorRef在本地传输时,默认是不需要序列化的,那该如何切入序列化过程呢?我们首先来看序列化的过程。

      还记得之前的文章吗?在remote模式下,是通过EndpointWriter.writeSend发送消息的。

     def writeSend(s: Send): Boolean = try {
        handle match {
          case Some(h) ⇒
            if (provider.remoteSettings.LogSend && log.isDebugEnabled) {
              def msgLog = s"RemoteMessage: [${s.message}] to [${s.recipient}]<+[${s.recipient.path}] from [${s.senderOption.getOrElse(extendedSystem.deadLetters)}]"
              log.debug("sending message {}", msgLog)
            }
    
            val pdu = codec.constructMessage(
              s.recipient.localAddressToUse,
              s.recipient,
              serializeMessage(s.message),
              s.senderOption,
              seqOption = s.seqOpt,
              ackOption = lastAck)
    
            val pduSize = pdu.size
            remoteMetrics.logPayloadBytes(s.message, pduSize)
    
            if (pduSize > transport.maximumPayloadBytes) {
              val reason = new OversizedPayloadException(s"Discarding oversized payload sent to ${s.recipient}: max allowed size ${transport.maximumPayloadBytes} bytes, actual size of encoded ${s.message.getClass} was ${pdu.size} bytes.")
              log.error(reason, "Transient association error (association remains live)")
              true
            } else {
              val ok = h.write(pdu)
              if (ok) {
                ackDeadline = newAckDeadline
                lastAck = None
              }
              ok
            }
    
          case None ⇒
            throw new EndpointException("Internal error: Endpoint is in state Writing, but no association handle is present.")
        }
      } catch {
        case e: NotSerializableException ⇒
          log.error(e, "Serializer not defined for message type [{}]. Transient association error (association remains live)", s.message.getClass)
          true
        case e: IllegalArgumentException ⇒
          log.error(e, "Serializer not defined for message type [{}]. Transient association error (association remains live)", s.message.getClass)
          true
        case e: MessageSerializer.SerializationException ⇒
          log.error(e, "{} Transient association error (association remains live)", e.getMessage)
          true
        case e: EndpointException ⇒
          publishAndThrow(e, Logging.ErrorLevel)
        case NonFatal(e) ⇒
          publishAndThrow(new EndpointException("Failed to write message to the transport", e), Logging.ErrorLevel)
      }
    

       可以看到codec.constructMessage函数中调用了serializeMessage函数对待发送的消息进行了序列化,那如果用户发送的消息中包含ActorRef,就一定会在这个函数处理。哪些消息会包含ActorRef呢?还记得ActorIdentity吗,里面就包含ActorRef。当然了,如果用户自定义的消息包含ActorRef,也一定会被序列化。

    private def serializeMessage(msg: Any): SerializedMessage = handle match {
        case Some(h) ⇒
          Serialization.currentTransportInformation.withValue(Serialization.Information(h.localAddress, extendedSystem)) {
            MessageSerializer.serialize(extendedSystem, msg.asInstanceOf[AnyRef])
          }
        case None ⇒
          throw new EndpointException("Internal error: No handle was present during serialization of outbound message.")
      }
    

       很明显EndpointWriter.serializeMessage有调用了MessageSerializer.serialize进行序列化。

      /**
       * Serialization information needed for serializing local actor refs,
       * or if serializer library e.g. custom serializer/deserializer in Jackson need
       * access to the current `ActorSystem`.
       */
      final case class Information(address: Address, system: ActorSystem)
    

       Serialization.Information这个case class比较简单,官网说的也很清楚,这里不再详细分析。简单点说,它就是给序列化过程提供了必需的基础变量,例如地址和当前的ActorSystem。

     /**
       * INTERNAL API: This holds a reference to the current transport serialization information used for
       * serializing local actor refs, or if serializer library e.g. custom serializer/deserializer in
       * Jackson need access to the current `ActorSystem`.
       */
      @InternalApi private[akka] val currentTransportInformation = new DynamicVariable[Information](null)
    

       Serialization.currentTransportInformation又是什么呢?

    /** `DynamicVariables` provide a binding mechanism where the current
     *  value is found through dynamic scope, but where access to the
     *  variable itself is resolved through static scope.
     *
     *  The current value can be retrieved with the value method. New values
     *  should be pushed using the `withValue` method. Values pushed via
     *  `withValue` only stay valid while the `withValue`'s second argument, a
     *  parameterless closure, executes. When the second argument finishes,
     *  the variable reverts to the previous value.
     *
     *  {{{
     *  someDynamicVariable.withValue(newValue) {
     *    // ... code called in here that calls value ...
     *    // ... will be given back the newValue ...
     *  }
     *  }}}
     *
     *  Each thread gets its own stack of bindings.  When a
     *  new thread is created, the `DynamicVariable` gets a copy
     *  of the stack of bindings from the parent thread, and
     *  from then on the bindings for the new thread
     *  are independent of those for the original thread.
     *
     *  @author  Lex Spoon
     *  @version 1.1, 2007-5-21
     */
    class DynamicVariable[T](init: T) 
    

       currentTransportInformation是一个动态变量,其具体的功能和用法,scala官网说的也很清楚,你可以把它理解成一个能够继承父线程数据的ThreadLocal变量。

    /** Set the value of the variable while executing the specified
        * thunk.
        *
        * @param newval The value to which to set the variable
        * @param thunk The code to evaluate under the new setting
        */
      def withValue[S](newval: T)(thunk: => S): S = {
        val oldval = value
        tl set newval
    
        try thunk
        finally tl set oldval
      }
    

       withValue函数,其实就是给thunk提供一个线程安全的执行变量环境。

      综上所述,MessageSerializer.serialize(extendedSystem, msg.asInstanceOf[AnyRef])在执行时通过currentTransportInformation获取到的值就是Serialization.Information(h.localAddress, extendedSystem),那就来看看serialize在做什么。

    /**
       * Uses Akka Serialization for the specified ActorSystem to transform the given message to a MessageProtocol
       * Throws `NotSerializableException` if serializer was not configured for the message type.
       * Throws `MessageSerializer.SerializationException` if exception was thrown from `toBinary` of the
       * serializer.
       */
      def serialize(system: ExtendedActorSystem, message: AnyRef): SerializedMessage = {
        val s = SerializationExtension(system)
        val serializer = s.findSerializerFor(message)
        val builder = SerializedMessage.newBuilder
    
        val oldInfo = Serialization.currentTransportInformation.value
        try {
          if (oldInfo eq null)
            Serialization.currentTransportInformation.value = system.provider.serializationInformation
    
          builder.setMessage(ByteString.copyFrom(serializer.toBinary(message)))
          builder.setSerializerId(serializer.identifier)
    
          val ms = Serializers.manifestFor(serializer, message)
          if (ms.nonEmpty) builder.setMessageManifest(ByteString.copyFromUtf8(ms))
    
          builder.build
        } catch {
          case NonFatal(e) ⇒
            throw new SerializationException(s"Failed to serialize remote message [${message.getClass}] " +
              s"using serializer [${serializer.getClass}].", e)
        } finally Serialization.currentTransportInformation.value = oldInfo
      }
    

       serialize函数简单来说,就是通过SerializationExtension给message找到一个serializer,用serializer把message转化成二进制,也就是序列化message。在通过SerializedMessage.Builder设置一些其他信息,最终返回SerializedMessage消息。那么如何通过SerializationExtension找到一个合适的serializer就很重要了。

    /**
       * Returns the configured Serializer for the given Class. The configured Serializer
       * is used if the configured class `isAssignableFrom` from the `clazz`, i.e.
       * the configured class is a super class or implemented interface. In case of
       * ambiguity it is primarily using the most specific configured class,
       * and secondly the entry configured first.
       *
       * Throws java.io.NotSerializableException if no `serialization-bindings` is configured for the class.
       */
      @throws(classOf[NotSerializableException])
      def serializerFor(clazz: Class[_]): Serializer =
        serializerMap.get(clazz) match {
          case null ⇒ // bindings are ordered from most specific to least specific
            def unique(possibilities: immutable.Seq[(Class[_], Serializer)]): Boolean =
              possibilities.size == 1 ||
                (possibilities forall (_._1 isAssignableFrom possibilities(0)._1)) ||
                (possibilities forall (_._2 == possibilities(0)._2))
    
            val ser = {
              bindings.filter {
                case (c, _) ⇒ c isAssignableFrom clazz
              } match {
                case immutable.Seq() ⇒
                  throw new NotSerializableException(s"No configured serialization-bindings for class [${clazz.getName}]")
                case possibilities ⇒
                  if (unique(possibilities))
                    possibilities.head._2
                  else {
                    // give JavaSerializer lower priority if multiple serializers found
                    val possibilitiesWithoutJavaSerializer = possibilities.filter {
                      case (_, _: JavaSerializer)         ⇒ false
                      case (_, _: DisabledJavaSerializer) ⇒ false
                      case _                              ⇒ true
                    }
                    if (possibilitiesWithoutJavaSerializer.isEmpty) {
                      // shouldn't happen
                      throw new NotSerializableException(s"More than one JavaSerializer configured for class [${clazz.getName}]")
                    }
    
                    if (!unique(possibilitiesWithoutJavaSerializer)) {
                      _log.warning(LogMarker.Security, "Multiple serializers found for [{}], choosing first of: [{}]",
                        clazz.getName,
                        possibilitiesWithoutJavaSerializer.map { case (_, s) ⇒ s.getClass.getName }.mkString(", "))
                    }
                    possibilitiesWithoutJavaSerializer.head._2
    
                  }
    
              }
            }
    
            serializerMap.putIfAbsent(clazz, ser) match {
              case null ⇒
                if (shouldWarnAboutJavaSerializer(clazz, ser)) {
                  _log.warning(LogMarker.Security, "Using the default Java serializer for class [{}] which is not recommended because of " +
                    "performance implications. Use another serializer or disable this warning using the setting " +
                    "'akka.actor.warn-about-java-serializer-usage'", clazz.getName)
                }
                log.debug("Using serializer [{}] for message [{}]", ser.getClass.getName, clazz.getName)
                ser
              case some ⇒ some
            }
          case ser ⇒ ser
        }
    

       findSerializerFor最终调用了serializerFor,serializerFor简单点来说就是首先查找配置的序列化函数,如果没有找到则通过bindings中查找是否符合isAssignableFrom条件的序列化类,如果只找到了相同的序列化类,则使用该序列化类,如果找到多个则优先使用除JavaSerializer以外的序列化类。当然了,默认情况下是一定可以找到JavaSerializer的。serializer具体加载的过程这里就不再具体分析,只需要知道它是从配置文件加载的就可以了。那默认配置是怎么样的呢?下面是akka remote包里面的reference.conf摘录出来的部分配置。

    serializers {
          akka-containers = "akka.remote.serialization.MessageContainerSerializer"
          akka-misc = "akka.remote.serialization.MiscMessageSerializer"
          artery = "akka.remote.serialization.ArteryMessageSerializer"
          proto = "akka.remote.serialization.ProtobufSerializer"
          daemon-create = "akka.remote.serialization.DaemonMsgCreateSerializer"
          primitive-long = "akka.remote.serialization.LongSerializer"
          primitive-int = "akka.remote.serialization.IntSerializer"
          primitive-string = "akka.remote.serialization.StringSerializer"
          primitive-bytestring = "akka.remote.serialization.ByteStringSerializer"
          akka-system-msg = "akka.remote.serialization.SystemMessageSerializer"
        }
    
        serialization-bindings {
          "akka.actor.ActorSelectionMessage" = akka-containers
    
          "akka.remote.DaemonMsgCreate" = daemon-create
    
          "akka.remote.artery.ArteryMessage" = artery
    
          # Since akka.protobuf.Message does not extend Serializable but
          # GeneratedMessage does, need to use the more specific one here in order
          # to avoid ambiguity.
          "akka.protobuf.GeneratedMessage" = proto
    
          # Since com.google.protobuf.Message does not extend Serializable but
          # GeneratedMessage does, need to use the more specific one here in order
          # to avoid ambiguity.
          # This com.google.protobuf serialization binding is only used if the class can be loaded,
          # i.e. com.google.protobuf dependency has been added in the application project.
          "com.google.protobuf.GeneratedMessage" = proto
    
          "java.util.Optional" = akka-misc
    
    
          # The following are handled by the MiscMessageSerializer, but they are not enabled for
          # compatibility reasons (it was added in Akka 2.5.[8,9,12]). Enable them by adding:
          # akka.actor.serialization-bindings {
          #   "akka.Done"                 = akka-misc
          #   "akka.NotUsed"              = akka-misc
          #   "akka.actor.Address"        = akka-misc
          #   "akka.remote.UniqueAddress" = akka-misc
          # }
        }
    
        # Additional serialization-bindings that are replacing Java serialization are
        # defined in this section for backwards compatibility reasons. They are included
        # by default but can be excluded for backwards compatibility with Akka 2.4.x.
        # They can be disabled with enable-additional-serialization-bindings=off.
        additional-serialization-bindings {
          "akka.actor.Identify" = akka-misc
          "akka.actor.ActorIdentity" = akka-misc
          "scala.Some" = akka-misc
          "scala.None$" = akka-misc
          "akka.actor.Status$Success" = akka-misc
          "akka.actor.Status$Failure" = akka-misc
          "akka.actor.ActorRef" = akka-misc
          "akka.actor.PoisonPill$" = akka-misc
          "akka.actor.Kill$" = akka-misc
          "akka.remote.RemoteWatcher$Heartbeat$" = akka-misc
          "akka.remote.RemoteWatcher$HeartbeatRsp" = akka-misc
          "akka.actor.ActorInitializationException" = akka-misc
    
          "akka.dispatch.sysmsg.SystemMessage" = akka-system-msg
    
          "java.lang.String" = primitive-string
          "akka.util.ByteString$ByteString1C" = primitive-bytestring
          "akka.util.ByteString$ByteString1" = primitive-bytestring
          "akka.util.ByteString$ByteStrings" = primitive-bytestring
          "java.lang.Long" = primitive-long
          "scala.Long" = primitive-long
          "java.lang.Integer" = primitive-int
          "scala.Int" = primitive-int
    
          # Java Serializer is by default used for exceptions.
          # It's recommended that you implement custom serializer for exceptions that are
          # sent remotely, e.g. in akka.actor.Status.Failure for ask replies. You can add
          # binding to akka-misc (MiscMessageSerializerSpec) for the exceptions that have
          # a constructor with single message String or constructor with message String as
          # first parameter and cause Throwable as second parameter. Note that it's not
          # safe to add this binding for general exceptions such as IllegalArgumentException
          # because it may have a subclass without required constructor.
          "java.lang.Throwable" = java
          "akka.actor.IllegalActorStateException" = akka-misc
          "akka.actor.ActorKilledException" = akka-misc
          "akka.actor.InvalidActorNameException" = akka-misc
          "akka.actor.InvalidMessageException" = akka-misc
    
          "akka.actor.LocalScope$" = akka-misc
          "akka.remote.RemoteScope" = akka-misc
    
          "com.typesafe.config.impl.SimpleConfig" = akka-misc
          "com.typesafe.config.Config" = akka-misc
    
          "akka.routing.FromConfig" = akka-misc
          "akka.routing.DefaultResizer" = akka-misc
          "akka.routing.BalancingPool" = akka-misc
          "akka.routing.BroadcastGroup" = akka-misc
          "akka.routing.BroadcastPool" = akka-misc
          "akka.routing.RandomGroup" = akka-misc
          "akka.routing.RandomPool" = akka-misc
          "akka.routing.RoundRobinGroup" = akka-misc
          "akka.routing.RoundRobinPool" = akka-misc
          "akka.routing.ScatterGatherFirstCompletedGroup" = akka-misc
          "akka.routing.ScatterGatherFirstCompletedPool" = akka-misc
          "akka.routing.SmallestMailboxPool" = akka-misc
          "akka.routing.TailChoppingGroup" = akka-misc
          "akka.routing.TailChoppingPool" = akka-misc
          "akka.remote.routing.RemoteRouterConfig" = akka-misc
        }
    

       通过上面的配置,我们知道ActorRef是通过akka-misc,也就是akka.remote.serialization.MiscMessageSerializer来序列化的。

      MiscMessageSerializer.toBinary是调用了serializeActorRef对ActorRef序列化的。

    private def serializeActorRef(ref: ActorRef): Array[Byte] =
        actorRefBuilder(ref).build().toByteArray
    
      private def actorRefBuilder(actorRef: ActorRef): ContainerFormats.ActorRef.Builder =
        ContainerFormats.ActorRef.newBuilder()
          .setPath(Serialization.serializedActorPath(actorRef))
    
    /**
       * The serialized path of an actorRef, based on the current transport serialization information.
       * If there is no external address available in the given `ActorRef` then the systems default
       * address will be used and that is retrieved from the ThreadLocal `Serialization.Information`
       * that was set with [[Serialization#withTransportInformation]].
       */
      def serializedActorPath(actorRef: ActorRef): String = {
        val path = actorRef.path
        val originalSystem: ExtendedActorSystem = actorRef match {
          case a: ActorRefWithCell ⇒ a.underlying.system.asInstanceOf[ExtendedActorSystem]
          case _                   ⇒ null
        }
        Serialization.currentTransportInformation.value match {
          case null ⇒ originalSystem match {
            case null ⇒ path.toSerializationFormat
            case system ⇒
              try path.toSerializationFormatWithAddress(system.provider.getDefaultAddress)
              catch { case NonFatal(_) ⇒ path.toSerializationFormat }
          }
          case Information(address, system) ⇒
            if (originalSystem == null || originalSystem == system)
              path.toSerializationFormatWithAddress(address)
            else {
              val provider = originalSystem.provider
              path.toSerializationFormatWithAddress(provider.getExternalAddressFor(address).getOrElse(provider.getDefaultAddress))
            }
        }
      }
    

       首先Serialization.currentTransportInformation.value一定不为空,这个之前已经赋值过了,所以一定会走到Information(address, system),而且无论执行if的哪个分支,最后都是通过调用toSerializationFormatWithAddress对ActorRef进行序列化的。

      这样来看ActorRef在序列化时,对当前的path转化成了序列化的格式,其实就是ActorPath的String值。只不过在remote模式下,是包含host:port、协议(比如akka.tcp)等信息的。那么分析到这里,聪明的读者一定知道反序列化的过程了:对ActorPath的String值进行解析,转化成对应的RemoteActorRef。关于如果通过ActorPath在之前的文章其实我们有分析过,但这里还是再带领大家过一遍。

    override def receive: Receive = {
        case Disassociated(info) ⇒ handleDisassociated(info)
    
        case InboundPayload(p) if p.size <= transport.maximumPayloadBytes ⇒
          val (ackOption, msgOption) = tryDecodeMessageAndAck(p)
    
          for (ack ← ackOption; reliableDelivery ← reliableDeliverySupervisor) reliableDelivery ! ack
    
          msgOption match {
            case Some(msg) ⇒
              if (msg.reliableDeliveryEnabled) {
                ackedReceiveBuffer = ackedReceiveBuffer.receive(msg)
                deliverAndAck()
              } else try
                msgDispatch.dispatch(msg.recipient, msg.recipientAddress, msg.serializedMessage, msg.senderOption)
              catch {
                case e: NotSerializableException ⇒ logTransientSerializationError(msg, e)
                case e: IllegalArgumentException ⇒ logTransientSerializationError(msg, e)
              }
    
            case None ⇒
          }
    
        case InboundPayload(oversized) ⇒
          log.error(
            new OversizedPayloadException(s"Discarding oversized payload received: " +
              s"max allowed size [${transport.maximumPayloadBytes}] bytes, actual size [${oversized.size}] bytes."),
            "Transient error while reading from association (association remains live)")
    
        case StopReading(writer, replyTo) ⇒
          saveState()
          context.become(notReading)
          replyTo ! StoppedReading(writer)
    
      }
    

       EndpointReader.receive在收到InboundPayload消息后,先把它decode成Message,然后把消息通过msgDispatch.dispatch发送出去,而msgDispatch是一个DefaultMessageDispatcher实例。

    override def dispatch(
        recipient:         InternalActorRef,
        recipientAddress:  Address,
        serializedMessage: SerializedMessage,
        senderOption:      OptionVal[ActorRef]): Unit = {
    
        import provider.remoteSettings._
    
        lazy val payload: AnyRef = MessageSerializer.deserialize(system, serializedMessage)
        def payloadClass: Class[_] = if (payload eq null) null else payload.getClass
        val sender: ActorRef = senderOption.getOrElse(system.deadLetters)
        val originalReceiver = recipient.path
    
        def logMessageReceived(messageType: String): Unit = {
          if (LogReceive && log.isDebugEnabled)
            log.debug(s"received $messageType RemoteMessage: [{}] to [{}]<+[{}] from [{}]", payload, recipient, originalReceiver, sender)
        }
    
        recipient match {
    
          case `remoteDaemon` ⇒
            if (UntrustedMode) log.debug(LogMarker.Security, "dropping daemon message in untrusted mode")
            else {
              logMessageReceived("daemon message")
              remoteDaemon ! payload
            }
    
          case l @ (_: LocalRef | _: RepointableRef) if l.isLocal ⇒
            logMessageReceived("local message")
            payload match {
              case sel: ActorSelectionMessage ⇒
                if (UntrustedMode && (!TrustedSelectionPaths.contains(sel.elements.mkString("/", "/", "")) ||
                  sel.msg.isInstanceOf[PossiblyHarmful] || l != provider.rootGuardian))
                  log.debug(
                    LogMarker.Security,
                    "operating in UntrustedMode, dropping inbound actor selection to [{}], " +
                      "allow it by adding the path to 'akka.remote.trusted-selection-paths' configuration",
                    sel.elements.mkString("/", "/", ""))
                else
                  // run the receive logic for ActorSelectionMessage here to make sure it is not stuck on busy user actor
                  ActorSelection.deliverSelection(l, sender, sel)
              case msg: PossiblyHarmful if UntrustedMode ⇒
                log.debug(LogMarker.Security, "operating in UntrustedMode, dropping inbound PossiblyHarmful message of type [{}]", msg.getClass.getName)
              case msg: SystemMessage ⇒ l.sendSystemMessage(msg)
              case msg                ⇒ l.!(msg)(sender)
            }
    
          case r @ (_: RemoteRef | _: RepointableRef) if !r.isLocal && !UntrustedMode ⇒
            logMessageReceived("remote-destined message")
            if (provider.transport.addresses(recipientAddress))
              // if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed)
              r.!(payload)(sender)
            else
              log.error(
                "dropping message [{}] for non-local recipient [{}] arriving at [{}] inbound addresses are [{}]",
                payloadClass, r, recipientAddress, provider.transport.addresses.mkString(", "))
    
          case r ⇒ log.error(
            "dropping message [{}] for unknown recipient [{}] arriving at [{}] inbound addresses are [{}]",
            payloadClass, r, recipientAddress, provider.transport.addresses.mkString(", "))
    
        }
      }
    

       dispatch首先调用MessageSerializer.deserialize(system, serializedMessage)对消息进行反序列化。

      /**
       * Uses Akka Serialization for the specified ActorSystem to transform the given MessageProtocol to a message
       */
      def deserialize(system: ExtendedActorSystem, messageProtocol: SerializedMessage): AnyRef = {
        SerializationExtension(system).deserialize(
          messageProtocol.getMessage.toByteArray,
          messageProtocol.getSerializerId,
          if (messageProtocol.hasMessageManifest) messageProtocol.getMessageManifest.toStringUtf8 else "").get
      }
    
     /**
       * Deserializes the given array of bytes using the specified serializer id,
       * using the optional type hint to the Serializer.
       * Returns either the resulting object or an Exception if one was thrown.
       */
      def deserialize(bytes: Array[Byte], serializerId: Int, manifest: String): Try[AnyRef] =
        Try {
          val serializer = try getSerializerById(serializerId) catch {
            case _: NoSuchElementException ⇒ throw new NotSerializableException(
              s"Cannot find serializer with id [$serializerId]. The most probable reason is that the configuration entry " +
                "akka.actor.serializers is not in synch between the two systems.")
          }
          deserializeByteArray(bytes, serializer, manifest)
        }
    

       分析上面代码得知,反序列化时就是简单的通过serializerId找到对应具体的序列化类,然后调用deserializeByteArray函数进行反序列化。还记得如果当前消息是ActorRef的话,serializerId是什么吗?没错,就是akka-misc。也就是说最终通过akka.remote.serialization.MiscMessageSerializer进行反序列化。但有一个字段也是比较关键manifest,这个manifest是什么呢?可以从ActorRef的序列化过程找到蛛丝马迹。

      在MessageSerializer.serialize函数中,有一段代码对这个manifest进行了赋值:val ms = Serializers.manifestFor(serializer, message)。

    def manifestFor(s: Serializer, message: AnyRef): String = s match {
        case s2: SerializerWithStringManifest ⇒ s2.manifest(message)
        case _                                ⇒ if (s.includeManifest) message.getClass.getName else ""
      }
    

       其实就是判断当前的Serializer是不是SerializerWithStringManifest的子类,如果是就调用manifest,如果不是,就判断includeManifest是不是为true,如果是就返回当前类的类名,否则返回空字符串。我们来看下MiscMessageSerializer的定义。

    class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest with BaseSerializer
    

    private val ActorRefManifest = "G"
    

        很明显它继承了SerializerWithStringManifest,而且对于ActorRef,manifest的值就是字符串G。

    private def deserializeByteArray(bytes: Array[Byte], serializer: Serializer, manifest: String): AnyRef = {
    
        @tailrec def updateCache(cache: Map[String, Option[Class[_]]], key: String, value: Option[Class[_]]): Boolean = {
          manifestCache.compareAndSet(cache, cache.updated(key, value)) ||
            updateCache(manifestCache.get, key, value) // recursive, try again
        }
    
        withTransportInformation { () ⇒
          serializer match {
            case s2: SerializerWithStringManifest ⇒ s2.fromBinary(bytes, manifest)
            case s1 ⇒
              if (manifest == "")
                s1.fromBinary(bytes, None)
              else {
                val cache = manifestCache.get
                cache.get(manifest) match {
                  case Some(cachedClassManifest) ⇒ s1.fromBinary(bytes, cachedClassManifest)
                  case None ⇒
                    system.dynamicAccess.getClassFor[AnyRef](manifest) match {
                      case Success(classManifest) ⇒
                        val classManifestOption: Option[Class[_]] = Some(classManifest)
                        updateCache(cache, manifest, classManifestOption)
                        s1.fromBinary(bytes, classManifestOption)
                      case Failure(e) ⇒
                        throw new NotSerializableException(
                          s"Cannot find manifest class [$manifest] for serializer with id [${serializer.identifier}].")
                    }
                }
              }
          }
        }
      }
    

       那我们接下来看看deserializeByteArray的源码,它首先会判断当前serializer是不是SerializerWithStringManifest,很显然对于ActorRef,serializer是SerializerWithStringManifest的子类,那我们来看看是fromBinary是如何实现的。

      override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef =
        fromBinaryMap.get(manifest) match {
          case Some(deserializer) ⇒ deserializer(bytes)
          case None ⇒ throw new NotSerializableException(
            s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]")
        }
    
     private val fromBinaryMap = Map[String, Array[Byte] ⇒ AnyRef](
        IdentifyManifest → deserializeIdentify,
        ActorIdentityManifest → deserializeActorIdentity,
        StatusSuccessManifest → deserializeStatusSuccess,
        StatusFailureManifest → deserializeStatusFailure,
        ThrowableManifest → throwableSupport.deserializeThrowable,
        ActorRefManifest → deserializeActorRefBytes,
        OptionManifest → deserializeOption,
        OptionalManifest → deserializeOptional,
        PoisonPillManifest → ((_) ⇒ PoisonPill),
        KillManifest → ((_) ⇒ Kill),
        RemoteWatcherHBManifest → ((_) ⇒ RemoteWatcher.Heartbeat),
        DoneManifest → ((_) ⇒ Done),
        NotUsedManifest → ((_) ⇒ NotUsed),
        AddressManifest → deserializeAddressData,
        UniqueAddressManifest → deserializeUniqueAddress,
        RemoteWatcherHBRespManifest → deserializeHeartbeatRsp,
        ActorInitializationExceptionManifest → deserializeActorInitializationException,
        LocalScopeManifest → ((_) ⇒ LocalScope),
        RemoteScopeManifest → deserializeRemoteScope,
        ConfigManifest → deserializeConfig,
        FromConfigManifest → deserializeFromConfig,
        DefaultResizerManifest → deserializeDefaultResizer,
        BalancingPoolManifest → deserializeBalancingPool,
        BroadcastPoolManifest → deserializeBroadcastPool,
        RandomPoolManifest → deserializeRandomPool,
        RoundRobinPoolManifest → deserializeRoundRobinPool,
        ScatterGatherPoolManifest → deserializeScatterGatherPool,
        TailChoppingPoolManifest → deserializeTailChoppingPool,
        RemoteRouterConfigManifest → deserializeRemoteRouterConfig
      )
    
      private def deserializeActorRefBytes(bytes: Array[Byte]): ActorRef =
        deserializeActorRef(ContainerFormats.ActorRef.parseFrom(bytes))
    
    private def deserializeActorRef(actorRef: ContainerFormats.ActorRef): ActorRef =
        serialization.system.provider.resolveActorRef(actorRef.getPath)
    

       由此可见,首先调用了ContainerFormats.ActorRef.parseFrom把Array[Byte] 转化成了ContainerFormats.ActorRef,这个过程就不再具体分析;其次调用serialization.system.provider.resolveActorRef把当前的ActorPathString转化成了ActorRef。根据上下文,serialization.system.provider应该就是RemoteActorRefProvider。

    def resolveActorRef(path: String): ActorRef = {
        // using thread local LRU cache, which will call internalRresolveActorRef
        // if the value is not cached
        actorRefResolveThreadLocalCache match {
          case null ⇒ internalResolveActorRef(path) // not initalized yet
          case c    ⇒ c.threadLocalCache(this).getOrCompute(path)
        }
      }
    

       resolveActorRef这段代码会先判断当前actorRefResolveThreadLocalCache缓存是否已经初始化,很显然在RemoteActorRefProvider.init过程中,actorRefResolveThreadLocalCache已经被创建,之前也分析过。

    /**
     * INTERNAL API
     */
    private[akka] final class ActorRefResolveCache(provider: RemoteActorRefProvider)
      extends LruBoundedCache[String, ActorRef](capacity = 1024, evictAgeThreshold = 600) {
    
      override protected def compute(k: String): ActorRef =
        provider.internalResolveActorRef(k)
    
      override protected def hash(k: String): Int = Unsafe.fastHash(k)
    
      override protected def isCacheable(v: ActorRef): Boolean = !v.isInstanceOf[EmptyLocalActorRef]
    }
    

       actorRefResolveThreadLocalCache是一个ActorSystem的Extension,这个Extension最终是ActorRefResolveCache的实例,这个类是不是很熟悉?它是一个LruBoundedCache,容量是1024,过期时间是600秒。很显然第一次getOrCompute时,会调用compute函数,而compute又调用了provider.internalResolveActorRef,解析之后对解析的结果进行缓存,具体如何缓存也不再分析。下面来分析internalResolveActorRef。

     /**
       * INTERNAL API: This is used by the `ActorRefResolveCache` via the
       * public `resolveActorRef(path: String)`.
       */
      private[akka] def internalResolveActorRef(path: String): ActorRef = path match {
        case ActorPathExtractor(address, elems) ⇒
          if (hasAddress(address)) local.resolveActorRef(rootGuardian, elems)
          else {
            val rootPath = RootActorPath(address) / elems
            try {
              new RemoteActorRef(transport, transport.localAddressForRemote(address),
                rootPath, Nobody, props = None, deploy = None)
            } catch {
              case NonFatal(e) ⇒
                log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage)
                new EmptyLocalActorRef(this, rootPath, eventStream)
            }
          }
        case _ ⇒
          log.debug("Resolve (deserialization) of unknown (invalid) path [{}], using deadLetters.", path)
          deadLetters
      }
    

       internalResolveActorRef还有印象吗?这个函数在分析actorSelection过程中出现过,它首先会判断当前机器是否包含path中的address,如果包含就调用LocalActorRefProvider进行查找,否则就创建RemoteActorRef。因为消息来自远程actor,所以会创建RemoteActorRef作为远程actor的代理。至此远程ActorRef反序列化结束。

      至此我们对ActorRef的序列化、反序列化过程做了完整的分析,其实Akka的位置透明就是通过ActorPath来实现的,ActorRef跨网路传输都是通过ActorPath的String值(包含host/port等信息)来完成的,指定host收到ActorRef的序列化消息,会根据反序列化后的ActorPath的String值,在本地创建远程actor的ActorRef代理:RemoteActorRef。之后的通信都是通过RemoteActorRef。这样看来,位置透明也是比较简单的。

  • 相关阅读:
    PTP 接线方式及通讯距离
    串口通信基本概念
    Modbus RTU 通信应用案例
    Modbus 指令
    Modbus RTU新版本指令介绍
    Integer自动装箱和拆箱
    重写hashCode方法,导致内存泄漏
    Dom4j入门
    Java设计模式(9)——观察者模式
    IntelliJ IDEA版本控制——过滤提交文件
  • 原文地址:https://www.cnblogs.com/gabry/p/9395765.html
Copyright © 2011-2022 走看看