zoukankan      html  css  js  c++  java
  • [Akka]发送一条消息的内部流程

    本想通过了解一下Akka-actor工程中主要的类的概念,来看下Akka内部运作的机制。无奈里边的类的确太多,注释中对每个类的功能也没有足够的解释。所以还是通过debug的方式,找个入手点,看一下互相之间调用的关系。

    最初的选择是看一下ActorSystem的实始化过程,但发现难度挺大,因为这个初始化工作是为以后的行为做准备,所以仅根据初始化的动作,难以了解其目的是什么。所以就试着从消息发送的过程了解起,发现这个逻辑更好理解。

    下面来看一下其执行过程吧。代码如下,使用默认的配置。

    object Main extends App{
      val system = ActorSystem("football")
      val actor = system.actorOf(Props[PrintActor])
      actor ! "hello"
    }

    想要了解的是actor ! "hello"的执行过程。


    首先,actor的类型是ActorRef,在默认的配置下,这个ActorRef的具体类型为RepointableActorRef。

    它的!方法的定义为

    def !(message: Any)(implicit sender: ActorRef = Actor.noSender) = underlying.sendMessage(message, sender)

    underlying的定义为

    def underlying: Cell = Unsafe.instance.getObjectVolatile(this, cellOffset).asInstanceOf[Cell]

    Cell是ActorCell混入的一个特质,后者是Actor的实体部分(相对于ActorRef)。

    对于underlying.sendMessage的调用,会调用ActorCell的sendMessage方法,这个方法实现于Dispatch这个trait。

    def sendMessage(msg: Envelope): Unit =
        try {
          if (system.settings.SerializeAllMessages) {
            val unwrapped = (msg.message match {
              case DeadLetter(wrapped, _, _) ⇒ wrapped
              case other                     ⇒ other
            }).asInstanceOf[AnyRef]
            if (!unwrapped.isInstanceOf[NoSerializationVerificationNeeded]) {
              val s = SerializationExtension(system)
              s.deserialize(s.serialize(unwrapped).get, unwrapped.getClass).get
            }
          }
          dispatcher.dispatch(this, msg)
        } catch handleException

    虽然有些怪怪的,但是handleException是一个PartialFunction,所以语法是没问题的。

    在if (system.settings.SerializeAllMessages)里的部分是根据需要把消息序列化后又反序列化了一遍,可能是为了副作用,因为这个流程中没有改变msg。

    下面就走到了dispatcher.dispatch(this,msg)。

    默认的Dispathcer实现是akka.dispatch.Dispatcher。其dispatch方法的实现为:

      protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {
        val mbox = receiver.mailbox
        mbox.enqueue(receiver.self, invocation)
        registerForExecution(mbox, true, false)
      }

    它把消息加入到receiver的mailbox中,然后调用registerForExecution。

      protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = {
        if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races
          if (mbox.setAsScheduled()) {
            try {
              executorService execute mbox
              true
            } catch {
              case e: RejectedExecutionException ⇒
                try {
                  executorService execute mbox
                  true
                } catch { //Retry once
                  case e: RejectedExecutionException ⇒
                    mbox.setAsIdle()
                    eventStream.publish(Error(e, getClass.getName, getClass, "registerForExecution was rejected twice!"))
                    throw e
                }
            }
          } else false
        } else false
      }

    registerForExecution这个名字起得很到位,因为它的确只是"register",并不实际执行它。实际执行execution的是一个线程池,所以这个方法会调用executorService.execute(mbox)。

    为什么能够执行mailbox呢?因为mailbox实现了Runnable方法。

      override final def run(): Unit = {
        try {
          if (!isClosed) { //Volatile read, needed here
            processAllSystemMessages() //First, deal with any system messages
            processMailbox() //Then deal with messages
          }
        } finally {
          setAsIdle() //Volatile write, needed here
          dispatcher.registerForExecution(this, false, false)
        }
      }

    在上篇文章中,俺认为在设计Akka时,应该考虑提交给线程池的任务的粒度,在这个run方法中,实现上决定了这个粒度:ProcessAllSystemMessages以及processMailbox。

      @tailrec private final def processMailbox(
        left: Int = java.lang.Math.max(dispatcher.throughput, 1),
        deadlineNs: Long = if (dispatcher.isThroughputDeadlineTimeDefined == true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0L): Unit =
        if (shouldProcessMessage) {
          val next = dequeue()
          if (next ne null) {
            if (Mailbox.debug) println(actor.self + " processing message " + next)
            actor invoke next
            if (Thread.interrupted())
              throw new InterruptedException("Interrupted while processing actor messages")
            processAllSystemMessages()
            if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0))
              processMailbox(left - 1, deadlineNs)
          }
        }

    这个方法会调用actor.invoke(next)来处理消息。

      final def invoke(messageHandle: Envelope): Unit = try {
        currentMessage = messageHandle
        cancelReceiveTimeout() // FIXME: leave this here???
        messageHandle.message match {
          case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle)
          case msg                      ⇒ receiveMessage(msg)
        }
        currentMessage = null // reset current message after successful invocation
      } catch handleNonFatalOrInterruptedException { e ⇒
        handleInvokeFailure(Nil, e)
      } finally {
        checkReceiveTimeout // Reschedule receive timeout
      }

    对于AutoReceivedMessage会走autoRecieveMessage这条路,其它的走receiveMessage(msg)这条路。

    final def receiveMessage(msg: Any): Unit = actor.aroundReceive(behaviorStack.head, msg)
    protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = receive.applyOrElse(msg, unhandled)

    至此,可以看到我们在actor中定义的receive方法返回的Actor.Receive这个PartialFunction被应用于msg。也就是说,至此,我们定义的处理消息的逻辑被执行。


    可见,Akka处理消息的过程,实际上是就是把消息加入了actor的mailbox中,然后生成一个任务(即Mailbox的run方法中的动作)提交给线程池。

    当然这其中的很多细节决定了Akka的正确工作。比如每次调用processMailbox处理多少消息;比如equeue一个消息到mailbox的动作是否阻塞,阻塞多久;比如,在Mailbox的receiveMessage方法中,behaviorStack的行为。

  • 相关阅读:
    2个准则,解决人际、团队和客户问题
    系统思维
    如何看透他人行为背后的本质 | 思维模型02:行为分析模型
    提问比回答更有力量
    有了套路,为什么还是解决不了问题
    能够跨界成功的人
    正确的思考
    你的烦恼,全因为不会思考
    努力,到底是不是天赋
    我们是魔术师面前的观众吗
  • 原文地址:https://www.cnblogs.com/devos/p/4438402.html
Copyright © 2011-2022 走看看