zoukankan      html  css  js  c++  java
  • Akka源码分析-ask模式

      在我之前的博文中,已经介绍过要慎用Actor的ask。这里我们要分析一下ask的源码,看看它究竟是怎么实现的。

      开发时,如果要使用ask方法,必须要引入akka.pattern._,这样才能使用ask(或者?)方法,那么想必ask是在akka.pattern._对应的包里面实现的。

    /*
     * Implementation class of the “ask” pattern enrichment of ActorRef
     */
    final class AskableActorRef(val actorRef: ActorRef) extends AnyVal {
    
      /**
       * INTERNAL API: for binary compatibility
       */
      protected def ask(message: Any, timeout: Timeout): Future[Any] =
        internalAsk(message, timeout, ActorRef.noSender)
    
      def ask(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any] =
        internalAsk(message, timeout, sender)
    
      /**
       * INTERNAL API: for binary compatibility
       */
      protected def ?(message: Any)(implicit timeout: Timeout): Future[Any] =
        internalAsk(message, timeout, ActorRef.noSender)
    
      def ?(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any] =
        internalAsk(message, timeout, sender)
    
      /**
       * INTERNAL API: for binary compatibility
       */
      private[pattern] def internalAsk(message: Any, timeout: Timeout, sender: ActorRef) = actorRef match {
        case ref: InternalActorRef if ref.isTerminated ⇒
          actorRef ! message
          Future.failed[Any](new AskTimeoutException(s"""Recipient[$actorRef] had already been terminated. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
        case ref: InternalActorRef ⇒
          if (timeout.duration.length <= 0)
            Future.failed[Any](new IllegalArgumentException(s"""Timeout length must be positive, question not sent to [$actorRef]. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
          else {
            val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef, message.getClass.getName, sender)
            actorRef.tell(message, a)
            a.result.future
          }
        case _ ⇒ Future.failed[Any](new IllegalArgumentException(s"""Unsupported recipient ActorRef type, question not sent to [$actorRef]. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
      }
    
    }
    

       上面是通过定位ask(或?)找到的实现源码,我们发现,这是一个隐式转换,在akka.pattern.AskSupport中我们找到了隐式转换对应的函数。

    implicit def ask(actorRef: ActorRef): AskableActorRef = new AskableActorRef(actorRef)
    

       通过AskableActorRef源码我们知道最终调用了internalAsk函数,该函数有三个参数:待发送的消息、超时时间、消息发送者。函数创建了一个PromiseActorRef,又把消息原样的通过原actor的tell函数发送给了原actor,然后用新创建的PromiseActorRef作为新的sender传递,再用PromiseActorRef的result.future作为最终的Future返回。下面我们来看一下PromiseActorRef的创建过程。

    def apply(provider: ActorRefProvider, timeout: Timeout, targetName: Any, messageClassName: String,
                sender: ActorRef = Actor.noSender, onTimeout: String ⇒ Throwable = defaultOnTimeout): PromiseActorRef = {
        val result = Promise[Any]()
        val scheduler = provider.guardian.underlying.system.scheduler
        val a = new PromiseActorRef(provider, result, messageClassName)
        implicit val ec = a.internalCallingThreadExecutionContext
        val f = scheduler.scheduleOnce(timeout.duration) {
          result tryComplete Failure(
            onTimeout(s"""Ask timed out on [$targetName] after [${timeout.duration.toMillis} ms]. Sender[$sender] sent message of type "${a.messageClassName}"."""))
        }
        result.future onComplete { _ ⇒ try a.stop() finally f.cancel() }
        a
      }
    

       很明显,PromiseActorRef持有了一个Promise[Any],但是上面的代码只显示了在超时的时候通过onTimeout赋了值,并没有不超时赋值的逻辑,且Promise[Any]一旦完成就调用PromiseActorRef的stop方法和cancel方法。那么成功时赋值的逻辑应该在哪里呢?

      如果你对ask的使用方式比较熟悉的话,一定会找出其中的端倪的。我们来梳理一下这其中的使用细节。其实,在ask的使用与tell,并没有太大的区别,至少对于server端的Actor来说没有任何区别,都是正常的接收消息,然后处理,最后通过sender把消息使用tell返回。在ask时,消息的sender是什么,就是PromiseActorRef啊,那PromiseActorRef的!方法具体怎么实现的呢?

    override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = state match {
        case Stopped | _: StoppedWithPath ⇒ provider.deadLetters ! message
        case _ ⇒
          if (message == null) throw InvalidMessageException("Message is null")
          if (!(result.tryComplete(
            message match {
              case Status.Success(r) ⇒ Success(r)
              case Status.Failure(f) ⇒ Failure(f)
              case other             ⇒ Success(other)
            }))) provider.deadLetters ! message
      }
    

       很明显,接收消息的Actor会通过!返回对应的消息,消息的处理一般会命中 case other,这其实就是给result赋值,在超时之前的赋值。如果在!方法内部给result赋值的时候,刚好已经超时或已经赋过值,会把返回的消息发送给deadLetters。

      其实result,也就是Promise[Any]的赋值逻辑已经解释清楚。不过如果小伙伴对Promise不熟悉的话,此处还是有点难理解的。如果说Future是一个只读的,值还没计算的占位符。那么Promise就是一个可写的,单次指派的容器,也就是说Promise一旦赋值,就无法再次赋值,且与之关联的future也就计算完毕,返回的值就是固定的。当然了通过ask(也就是?)返回的还只是一个future,如果要取出future最终的值,还是需要Await.ready等语义来支持的,这里就不再详细解释了。

  • 相关阅读:
    页面 笔记
    快速统计一个数二进制中1的个数
    [JAVA] String 拼接效率
    [JAVA] String常用方法
    [letcode] 832 Flipping an Image
    [java]冒泡排序
    SharedPreferences
    归并排序
    安全退出调用多个Activity的Application
    Activity生命周期
  • 原文地址:https://www.cnblogs.com/gabry/p/9371776.html
Copyright © 2011-2022 走看看