zoukankan      html  css  js  c++  java
  • Akka源码分析-Remote-ActorSystem

      前面的文章都是基于local模式分析的,现在我们简要分析一下在remote模式下,ActorSystem的创建过程。

    final val ProviderClass: String =
          setup.get[BootstrapSetup]
            .flatMap(_.actorRefProvider).map(_.identifier)
            .getOrElse(getString("akka.actor.provider")) match {
              case "local"   ⇒ classOf[LocalActorRefProvider].getName
              // these two cannot be referenced by class as they may not be on the classpath
              case "remote"  ⇒ "akka.remote.RemoteActorRefProvider"
              case "cluster" ⇒ "akka.cluster.ClusterActorRefProvider"
              case fqcn      ⇒ fqcn
            }
    

       之前我们分析过,在创建provider过程中,是通过ProviderClass来判断具体是哪种模式的。从ProviderClass源码来看,当我们配置akka.actor.provider为remote时,会创建akka.remote.RemoteActorRefProvider的实例。我们知道ActorSystem在start时候会去调用provider.init方法进行初始化。

    def init(system: ActorSystemImpl): Unit = {
        local.init(system)
    
        actorRefResolveThreadLocalCache = ActorRefResolveThreadLocalCache(system)
    
        remotingTerminator = system.systemActorOf(
          remoteSettings.configureDispatcher(Props(classOf[RemotingTerminator], local.systemGuardian)),
          "remoting-terminator")
    
        val internals = Internals(
          remoteDaemon = {
            val d = new RemoteSystemDaemon(
              system,
              local.rootPath / "remote",
              rootGuardian,
              remotingTerminator,
              _log,
              untrustedMode = remoteSettings.UntrustedMode)
            local.registerExtraNames(Map(("remote", d)))
            d
          },
          transport =
            if (remoteSettings.Artery.Enabled) remoteSettings.Artery.Transport match {
              case ArterySettings.AeronUpd ⇒ new ArteryAeronUdpTransport(system, this)
              case ArterySettings.Tcp      ⇒ new ArteryTcpTransport(system, this, tlsEnabled = false)
              case ArterySettings.TlsTcp   ⇒ new ArteryTcpTransport(system, this, tlsEnabled = true)
            }
            else new Remoting(system, this))
    
        _internals = internals
        remotingTerminator ! internals
    
        _log = Logging.withMarker(eventStream, getClass.getName)
    
        // this enables reception of remote requests
        transport.start()
    
        _remoteWatcher = createRemoteWatcher(system)
        remoteDeploymentWatcher = createRemoteDeploymentWatcher(system)
      }
    

       我们来结合RemoteActorRefProvider的构造函数和init函数来初步理解RemoteActorRefProvider的行为。首先在init方法的第一步就是调用local的init,通过local的类型我们发现这是一个LocalActorRefProvider,local的作用暂时不做分析,继续往下看。

      下面创建了ActorRefResolveThreadLocalCache对象,从ActorRefResolveThreadLocalCache的定义来看(这里就不再贴出相关代码),它是一个ThreadLocal变量,且是一个实现了Lru的缓存器,缓存的内容是ActorRef,具体作用也忽略。remotingTerminator的具体作用也不做深入分析。

    private final case class Internals(transport: RemoteTransport, remoteDaemon: InternalActorRef)
        extends NoSerializationVerificationNeeded
    

       Internals的定义还是值得一看的,它有两个变量,其中transport的值应该是new Remoting(system, this),remoteDaemon的值是RemoteSystemDaemon。然后调用了transport.start(),也就是Remoting的start。那么Remoting具体又是什呢?

    // Start assumes that it cannot be followed by another start() without having a shutdown() first
      override def start(): Unit = {
        endpointManager match {
          case None ⇒
            log.info("Starting remoting")
            val manager: ActorRef = system.systemActorOf(
              configureDispatcher(Props(classOf[EndpointManager], provider.remoteSettings.config, log)).withDeploy(Deploy.local),
              Remoting.EndpointManagerName)
            endpointManager = Some(manager)
    
            try {
              val addressesPromise: Promise[Seq[(AkkaProtocolTransport, Address)]] = Promise()
              manager ! Listen(addressesPromise)
    
              val transports: Seq[(AkkaProtocolTransport, Address)] = Await.result(
                addressesPromise.future,
                StartupTimeout.duration)
              if (transports.isEmpty) throw new RemoteTransportException("No transport drivers were loaded.", null)
    
              transportMapping = transports.groupBy {
                case (transport, _) ⇒ transport.schemeIdentifier
              } map { case (k, v) ⇒ k → v.toSet }
    
              defaultAddress = transports.head._2
              addresses = transports.map { _._2 }.toSet
    
              log.info("Remoting started; listening on addresses :" + addresses.mkString("[", ", ", "]"))
    
              manager ! StartupFinished
              eventPublisher.notifyListeners(RemotingListenEvent(addresses))
    
            } catch {
              case e: TimeoutException ⇒
                notifyError("Startup timed out. This is usually related to actor system host setting or host name resolution misconfiguration.", e)
                throw e
              case NonFatal(e) ⇒
                notifyError("Startup failed", e)
                throw e
            }
    
          case Some(_) ⇒
            log.warning("Remoting was already started. Ignoring start attempt.")
        }
      }
    

       在Remoting.start过程中,首先创建了EndpointManager,然后发送了一条Listen消息,并使用Await.result等待它的返回,然后又给EndpointManager发送了StartUpFinished。上面代码中的log.info("Remoting started; listening on addresses :" + addresses.mkString("[", ", ", "]"))还是值得我们关注的,毕竟我们启动remote模式的ActorSystem会经常看到这个日志信息。我们来看看EndpointManager收到Listen消息后做了哪些操作。

      那么listens又是什么呢?

    private def listens: Future[Seq[(AkkaProtocolTransport, Address, Promise[AssociationEventListener])]] = {
        /*
         * Constructs chains of adapters on top of each driver as given in configuration. The resulting structure looks
         * like the following:
         *   AkkaProtocolTransport <- Adapter <- ... <- Adapter <- Driver
         *
         * The transports variable contains only the heads of each chains (the AkkaProtocolTransport instances).
         */
        val transports: Seq[AkkaProtocolTransport] = for ((fqn, adapters, config) ← settings.Transports) yield {
    
          val args = Seq(classOf[ExtendedActorSystem] → context.system, classOf[Config] → config)
    
          // Loads the driver -- the bottom element of the chain.
          // The chain at this point:
          //   Driver
          val driver = extendedSystem.dynamicAccess
            .createInstanceFor[Transport](fqn, args).recover({
    
              case exception ⇒ throw new IllegalArgumentException(
                s"Cannot instantiate transport [$fqn]. " +
                  "Make sure it extends [akka.remote.transport.Transport] and has constructor with " +
                  "[akka.actor.ExtendedActorSystem] and [com.typesafe.config.Config] parameters", exception)
    
            }).get
    
          // Iteratively decorates the bottom level driver with a list of adapters.
          // The chain at this point:
          //   Adapter <- ... <- Adapter <- Driver
          val wrappedTransport =
            adapters.map { TransportAdaptersExtension.get(context.system).getAdapterProvider }.foldLeft(driver) {
              (t: Transport, provider: TransportAdapterProvider) ⇒
                // The TransportAdapterProvider will wrap the given Transport and returns with a wrapped one
                provider.create(t, context.system.asInstanceOf[ExtendedActorSystem])
            }
    
          // Apply AkkaProtocolTransport wrapper to the end of the chain
          // The chain at this point:
          //   AkkaProtocolTransport <- Adapter <- ... <- Adapter <- Driver
          new AkkaProtocolTransport(wrappedTransport, context.system, new AkkaProtocolSettings(conf), AkkaPduProtobufCodec)
        }
    
        // Collect all transports, listen addresses and listener promises in one future
        Future.sequence(transports.map { transport ⇒
          transport.listen map { case (address, listenerPromise) ⇒ (transport, address, listenerPromise) }
        })
      }
    

       很明显这是一个transports集合,每个transports应该是一个AkkaProtocolTransport对象,AkkaProtocolTransport创建完成之后,调用了listen方法,最终返回AkkaProtocolTransport的列表。其实分析到这里我们可以不必再继续深入AkkaProtocolTransport的具体功能,从上面的官方注释以及我们的猜测来看,这大概是在初始化网络相关的对象。比如它可以是一个socket或者netty封装后的socket,是用来listen某个端口号,接收和发送数据的。

      当然RemoteActorRefProvider.init的最后两行分别创建了RemoteWatcher、RemoteDeploymentWatcher,这两个Actor的作用后面再具体分析。

      至此,remote模式下的初始化基本就算结束了,其实就是用RemoteActorRefProvider替换了LocalActorRefProvider,并完成了provider相关的初始化。remote模式与local模式下,ActorSystem初始化过程区别并不大,这还得多谢Akka框架封装的好。下一篇博客我们会分析actor的创建过程,毕竟在remote模式下,actor的创建过程还是有点不同的。

  • 相关阅读:
    设计模式工厂模式
    设计模式原型模式
    Excel自定义格式千分符
    浏览器报:net::ERR_EMPTY_RESPONSE解决方案
    git branch a无法显示远程分支解决办法
    .Net启动程序报错:It was not possible to find any compatible framework version
    自动化测试框架pytest 安装和入门到精通实战
    2020非常全的接口测试面试题及参考答案软件测试工程师没有碰到算我输!
    Python+unittest+requests+excel实现接口自动化测试框架项目实战
    软件测试必学之python+unittest+requests+HTMLRunner编写接口自动化测试集
  • 原文地址:https://www.cnblogs.com/gabry/p/9373422.html
Copyright © 2011-2022 走看看