zoukankan      html  css  js  c++  java
  • Akka源码分析-Remote-网络链接

      上一篇博客中,我们分析了Akka remote模式下消息发送的过程,但细心的读者一定发现没有介绍网络相关初始化、创建链接、释放链接的过程,本文就介绍一下相关的内容。

      网络初始化就离不开ActorSystem的初始化,毕竟ActorSystem初始化之后就可以创建Actor并发送远程消息了。在ActorSystem初始化时,调用了RemoteActorRefProvider的init函数,init创建了Remoting这个RemoteTransport,并调用了start,而RemoteTransport作用是什么呢?

    /**
     * INTERNAL API
     *
     * The remote transport is responsible for sending and receiving messages.
     * Each transport has an address, which it should provide in
     * Serialization.currentTransportInformation (thread-local) while serializing
     * actor references (which might also be part of messages). This address must
     * be available (i.e. fully initialized) by the time the first message is
     * received or when the start() method returns, whatever happens first.
     */
    private[akka] abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: RemoteActorRefProvider) 
    

       transport一般用来做网络传输层,负责收发消息。每个transport有一个本地address与之绑定,这个address一般就是我们配置的remote监听的本地地址(或者端口随机的本地地址)。

      Remoting.start我们有分析过,简单来说就是创建了EndpointManager对象,并发送Listen消息等待返回,listen结束后发送StartupFinished消息。那么网络初始化(或绑定)的代码应该就是在Listen消息的处理过程中了,处理Listen消息时,调用了listen函数,这个之前也分析到过。

    private def listens: Future[Seq[(AkkaProtocolTransport, Address, Promise[AssociationEventListener])]] = {
        /*
         * Constructs chains of adapters on top of each driver as given in configuration. The resulting structure looks
         * like the following:
         *   AkkaProtocolTransport <- Adapter <- ... <- Adapter <- Driver
         *
         * The transports variable contains only the heads of each chains (the AkkaProtocolTransport instances).
         */
        val transports: Seq[AkkaProtocolTransport] = for ((fqn, adapters, config) ← settings.Transports) yield {
    
          val args = Seq(classOf[ExtendedActorSystem] → context.system, classOf[Config] → config)
    
          // Loads the driver -- the bottom element of the chain.
          // The chain at this point:
          //   Driver
          val driver = extendedSystem.dynamicAccess
            .createInstanceFor[Transport](fqn, args).recover({
    
              case exception ⇒ throw new IllegalArgumentException(
                s"Cannot instantiate transport [$fqn]. " +
                  "Make sure it extends [akka.remote.transport.Transport] and has constructor with " +
                  "[akka.actor.ExtendedActorSystem] and [com.typesafe.config.Config] parameters", exception)
    
            }).get
    
          // Iteratively decorates the bottom level driver with a list of adapters.
          // The chain at this point:
          //   Adapter <- ... <- Adapter <- Driver
          val wrappedTransport =
            adapters.map { TransportAdaptersExtension.get(context.system).getAdapterProvider }.foldLeft(driver) {
              (t: Transport, provider: TransportAdapterProvider) ⇒
                // The TransportAdapterProvider will wrap the given Transport and returns with a wrapped one
                provider.create(t, context.system.asInstanceOf[ExtendedActorSystem])
            }
    
          // Apply AkkaProtocolTransport wrapper to the end of the chain
          // The chain at this point:
          //   AkkaProtocolTransport <- Adapter <- ... <- Adapter <- Driver
          new AkkaProtocolTransport(wrappedTransport, context.system, new AkkaProtocolSettings(conf), AkkaPduProtobufCodec)
        }
    
        // Collect all transports, listen addresses and listener promises in one future
        Future.sequence(transports.map { transport ⇒
          transport.listen map { case (address, listenerPromise) ⇒ (transport, address, listenerPromise) }
        })
      }
    

       我们来着重分析下listen函数,它读取了settings.Transports并为之创建了对应的AkkaProtocolTransport对象,settings.Transports是什么呢?

    val Transports: immutable.Seq[(String, immutable.Seq[String], Config)] = transportNames.map { name ⇒
        val transportConfig = transportConfigFor(name)
        (
          transportConfig.getString("transport-class"),
          immutableSeq(transportConfig.getStringList("applied-adapters")).reverse,
          transportConfig)
      }
    
      val Adapters: Map[String, String] = configToMap(getConfig("akka.remote.adapters"))
    
      private def transportNames: immutable.Seq[String] = immutableSeq(getStringList("akka.remote.enabled-transports"))
    

       其实就是transport-class,applied-adapters,transportConfig三元组的seq。通过源码中的reference.conf,知道transport-class的值默认是akka.remote.transport.netty.NettyTransport,applied-adapters为空。

          # The class given here must implement the akka.remote.transport.Transport
          # interface and offer a public constructor which takes two arguments:
          #  1) akka.actor.ExtendedActorSystem
          #  2) com.typesafe.config.Config
          transport-class = "akka.remote.transport.netty.NettyTransport"
    
          # Transport drivers can be augmented with adapters by adding their
          # name to the applied-adapters list. The last adapter in the
          # list is the adapter immediately above the driver, while
          # the first one is the top of the stack below the standard
          # Akka protocol
          applied-adapters = []
    

       NettyTransport我们应该知道具体是做啥的了吧,就是用来具体创建链接、监听链接状态,收发消息的。另外adapters默认为空,所以这段for代码,就是加载了transport-class对应类的实例,然后把它作为参数传给了AkkaProtocolTransport。由于可以配置多个transport并监听不同的地址,所以这里是一个seq,不过默认只有一个。

      listen函数的最后一行调用了所有transport的listen函数,并返回transport/address/listenerPromise三元组seq。

    /**
     * Implementation of the Akka protocol as a Transport that wraps an underlying Transport instance.
     *
     * Features provided by this transport are:
     *  - Soft-state associations via the use of heartbeats and failure detectors
     *  - Secure-cookie handling
     *  - Transparent origin address handling
     *  - pluggable codecs to encode and decode Akka PDUs
     */
    private[remote] class AkkaProtocolTransport(
      wrappedTransport:     Transport,
      private val system:   ActorSystem,
      private val settings: AkkaProtocolSettings,
      private val codec:    AkkaPduCodec) extends ActorTransportAdapter(wrappedTransport, system)
    

       AkkaProtocolTransport其实是对底层协议的代理、封装,并提供其他一些特性,例如心跳和失败检测,编解码插件等。那AkkaProtocolTransport的listen在哪里实现呢?

    override def listen: Future[(Address, Promise[AssociationEventListener])] = {
        val upstreamListenerPromise: Promise[AssociationEventListener] = Promise()
    
        for {
          (listenAddress, listenerPromise) ← wrappedTransport.listen
          // Enforce ordering between the signalling of "listen ready" to upstream
          // and initialization happening in interceptListen
          _ ← listenerPromise.tryCompleteWith(interceptListen(listenAddress, upstreamListenerPromise.future)).future
        } yield (augmentScheme(listenAddress), upstreamListenerPromise)
      }
    

       通过它的继承关系我们找到了AbstractTransportAdapter,其实也就是调用wrappedTransport的listen,那不就是在调用NettyTransport的listen么?

    override def listen: Future[(Address, Promise[AssociationEventListener])] = {
        for {
          address ← addressToSocketAddress(Address("", "", settings.BindHostname, settings.BindPortSelector))
        } yield {
          try {
            val newServerChannel = inboundBootstrap match {
              case b: ServerBootstrap         ⇒ b.bind(address)
              case b: ConnectionlessBootstrap ⇒ b.bind(address)
            }
    
            // Block reads until a handler actor is registered
            newServerChannel.setReadable(false)
            channelGroup.add(newServerChannel)
    
            serverChannel = newServerChannel
    
            addressFromSocketAddress(newServerChannel.getLocalAddress, schemeIdentifier, system.name, Some(settings.Hostname),
              if (settings.PortSelector == 0) None else Some(settings.PortSelector)) match {
                case Some(address) ⇒
                  addressFromSocketAddress(newServerChannel.getLocalAddress, schemeIdentifier, system.name, None, None) match {
                    case Some(address) ⇒ boundTo = address
                    case None          ⇒ throw new NettyTransportException(s"Unknown local address type [${newServerChannel.getLocalAddress.getClass.getName}]")
                  }
                  localAddress = address
                  associationListenerPromise.future.foreach { _ ⇒ newServerChannel.setReadable(true) }
                  (address, associationListenerPromise)
                case None ⇒ throw new NettyTransportException(s"Unknown local address type [${newServerChannel.getLocalAddress.getClass.getName}]")
              }
          } catch {
            case NonFatal(e) ⇒ {
              log.error("failed to bind to {}, shutting down Netty transport", address)
              try { shutdown() } catch { case NonFatal(e) ⇒ } // ignore possible exception during shutdown
              throw e
            }
          }
        }
      }
    
    def addressToSocketAddress(addr: Address): Future[InetSocketAddress] = addr match {
        case Address(_, _, Some(host), Some(port)) ⇒ Future { blocking { new InetSocketAddress(InetAddress.getByName(host), port) } }
        case _                                     ⇒ Future.failed(new IllegalArgumentException(s"Address [$addr] does not contain host or port information."))
      }
    

       简单来说就是根据address配置去创建一个InetSocketAddress并进行绑定、创建chennel等其他网络初始化。

      至此EndpointManager对Listen消息处理完毕,就是初始化网络状态,进行监听,由于最后还收到了StartupFinished,所以EndpointManager进入了accepting状态。

        case StartupFinished ⇒
          context.become(accepting)
    

       EndpointManager.accepting之前只分析了对Send消息的处理,其实还有一个非常重要的消息处理过程:接收连接请求、并创建链接。也就是对InboundAssociation消息的处理。那InboundAssociation是如何产生的呢?这还需要回过头分析NettyTransport。其中有一个字段,在listen中也用到了:inboundBootstrap

    private val inboundBootstrap: Bootstrap = settings.TransportMode match {
        case Tcp ⇒ setupBootstrap(new ServerBootstrap(serverChannelFactory), serverPipelineFactory)
        case Udp ⇒ setupBootstrap(new ConnectionlessBootstrap(serverChannelFactory), serverPipelineFactory)
      }
    

       根据mode判断创建哪种类型的Bootstrap。

    val TransportMode: Mode = getString("transport-protocol") match {
        case "tcp"   ⇒ Tcp
        case "udp"   ⇒ Udp
        case unknown ⇒ throw new ConfigurationException(s"Unknown transport: [$unknown]")
      }
    

       很显然是创建ServerBootstrap。此过程中还传入了一个非常关键的对象:serverPipelineFactory。

    private val serverPipelineFactory: ChannelPipelineFactory = new ChannelPipelineFactory {
        override def getPipeline: ChannelPipeline = {
          val pipeline = newPipeline
          if (EnableSsl) pipeline.addFirst("SslHandler", sslHandler(isClient = false))
          val handler = if (isDatagram) new UdpServerHandler(NettyTransport.this, associationListenerPromise.future)
          else new TcpServerHandler(NettyTransport.this, associationListenerPromise.future, log)
          pipeline.addLast("ServerHandler", handler)
          pipeline
        }
      }
    

       根据上下文以及netty基础概念得知,ChannelPipelineFactory,而getPipeline中addLast函数添加了一个非常重要的handler:TcpServerHandler。

    private[remote] class TcpServerHandler(_transport: NettyTransport, _associationListenerFuture: Future[AssociationEventListener], val log: LoggingAdapter)
      extends ServerHandler(_transport, _associationListenerFuture) with TcpHandlers {
    
      override def onConnect(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit =
        initInbound(e.getChannel, e.getChannel.getRemoteAddress, null)
    
    }
    

       TcpServerHandler在OnConnect时会调用initInbound。

    final protected def initInbound(channel: Channel, remoteSocketAddress: SocketAddress, msg: ChannelBuffer): Unit = {
        channel.setReadable(false)
        associationListenerFuture.foreach {
          listener ⇒
            val remoteAddress = NettyTransport.addressFromSocketAddress(remoteSocketAddress, transport.schemeIdentifier,
              transport.system.name, hostName = None, port = None).getOrElse(
                throw new NettyTransportException(s"Unknown inbound remote address type [${remoteSocketAddress.getClass.getName}]"))
            init(channel, remoteSocketAddress, remoteAddress, msg) { listener notify InboundAssociation(_) }
        }
      }
    

       initInbound会调用init函数,init源码如下:

    final protected def init(channel: Channel, remoteSocketAddress: SocketAddress, remoteAddress: Address, msg: ChannelBuffer)(
        op: (AssociationHandle ⇒ Any)): Unit = {
        import transport._
        NettyTransport.addressFromSocketAddress(channel.getLocalAddress, schemeIdentifier, system.name, Some(settings.Hostname), None) match {
          case Some(localAddress) ⇒
            val handle = createHandle(channel, localAddress, remoteAddress)
            handle.readHandlerPromise.future.foreach {
              listener ⇒
                registerListener(channel, listener, msg, remoteSocketAddress.asInstanceOf[InetSocketAddress])
                channel.setReadable(true)
            }
            op(handle)
    
          case _ ⇒ NettyTransport.gracefulClose(channel)
        }
      }
    

       init简单来说就是创建一个handle,并调用op(handle),联系上下文我们知道op应该就是“{ listener notify InboundAssociation(_) }”这段代码。就是用InboundAssociation封装handle,并调用listener的notify!!!我们好像找到了InboundAssociation消息的来源!!!那listener究竟是在哪里赋值的呢?

    这个就比较曲折了,具体过程不再展示,只说结果。就是在listens成功之后,给self发送了一个ListensResult消息,收到消息后,有一段代码是在赋值:promise.success(ActorAssociationEventListener(self))。这里的promise就是上面代码中的listener。

        case Listen(addressesPromise) ⇒
          listens map { ListensResult(addressesPromise, _) } recover {
            case NonFatal(e) ⇒ ListensFailure(addressesPromise, e)
          } pipeTo self
        case ListensResult(addressesPromise, results) ⇒
          transportMapping = results.groupBy {
            case (_, transportAddress, _) ⇒ transportAddress
          } map {
            case (a, t) if t.size > 1 ⇒
              throw new RemoteTransportException(s"There are more than one transports listening on local address [$a]", null)
            case (a, t) ⇒ a → t.head._1
          }
          // Register to each transport as listener and collect mapping to addresses
          val transportsAndAddresses = results map {
            case (transport, address, promise) ⇒
              promise.success(ActorAssociationEventListener(self))
              transport → address
          }
          addressesPromise.success(transportsAndAddresses)
    

       ActorAssociationEventListener源码非常简单,就是把收到的消息发送给actor,而这里的actor就是上面代码中的self,其实就是EndpointManager。

      /**
       * Class to convert ordinary [[akka.actor.ActorRef]] instances to an AssociationEventListener. The adapter will
       * forward event objects as messages to the provided ActorRef.
       * @param actor
       */
      final case class ActorAssociationEventListener(actor: ActorRef) extends AssociationEventListener {
        override def notify(ev: AssociationEvent): Unit = actor ! ev
      }
    

       至此我们就回到了EndpointManager中对InboundAssociation消息的处理,handleInboundAssociation就不再详细分析,这应该就是建立连接的过程。至此我们就可以正常的发送消息了,毕竟本地服务的socket已经与远程ActorSystem对应的socket建立了链接。但远程ActorSystem网络对象收到消息之后如何分发给指定的Actor呢?我们下篇博客继续分析。

  • 相关阅读:
    mybatis与spring的整合(代码实现)
    使用maven构建一个web项目
    解决maven 找不到指定路径应该如何
    建一个maven项目
    spring xml的配置
    mybatis.xml和mapper.xml的配置
    mvc @helper 创建用户自定义html
    sqlserver CLR sqlserver使用C# dll
    索引碎片
    压缩数据库
  • 原文地址:https://www.cnblogs.com/gabry/p/9389558.html
Copyright © 2011-2022 走看看