zoukankan      html  css  js  c++  java
  • Spark RPC框架源码分析(二)RPC运行时序

    前情提要:

    一. Spark RPC概述

    上一篇我们已经说明了Spark RPC框架的一个简单例子,Spark RPC相关的两个编程模型,Actor模型和Reactor模型以及一些常用的类。这一篇我们还是用上一篇的例子,从代码的角度讲述Spark RPC的运行时序,从而揭露Spark RPC框架的运行原理。我们主要将分成两部分来讲,分别从服务端的角度和客户端的角度深度解析。

    不过源码解析部分都是比较枯燥的,Spark RPC这里也是一样,其中很多东西都是绕来绕去,墙裂建议使用上一篇中介绍到的那个Spark RPC项目,下载下来并运行,通过断点的方式来一步一步看,结合本篇文章,你应该会有更大的收获。

    PS:所用spark版本:spark2.1.0

    二. Spark RPC服务端

    我们将以上一篇HelloworldServer为线索,深入到Spark RPC框架内部的源码中,来看看启动一个服务时都做了些什么。

    因为代码部分都是比较绕的,每个类也经常会搞不清楚,我在介绍一个方法的源码时,通常都会将类名也一并写出来,这样应该会更加清晰一些。

    HelloworldServer{
      ......
      def main(args: Array[String]): Unit = {
    	//val host = args(0)
        val host = "localhost"
        val config = RpcEnvServerConfig(new RpcConf(), "hello-server", host, 52345)
        val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)
        val helloEndpoint: RpcEndpoint = new HelloEndpoint(rpcEnv)
        rpcEnv.setupEndpoint("hello-service", helloEndpoint)
        rpcEnv.awaitTermination()
      }
      ......
    }
    
    Spark RPC服务端运行主要时许

    这段代码中有两个主要流程,我们分别来说

    2.1 服务端NettyRpcEnvFactory.create(config)

    首先是下面这条代码的运行流程:

    val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)

    其实就是通过 NettyRpcEnvFactory 创建出一个 RPC Environment ,其具体类是 NettyRpcEnv 。

    我们再来看看创建过程中会发生什么。

    object NettyRpcEnvFactory extends RpcEnvFactory {
    	......
    	def create(config: RpcEnvConfig): RpcEnv = {
    		val conf = config.conf
    	
    		// Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support
    		// KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance
    		val javaSerializerInstance =
    		new JavaSerializer(conf).newInstance().asInstanceOf[JavaSerializerInstance]
    		//根据配置以及地址,new 一个 NettyRpcEnv ,
    		val nettyEnv =
    		new NettyRpcEnv(conf, javaSerializerInstance, config.bindAddress)
    		//如果是服务端创建的,那么会启动服务。服务端和客户端都会通过这个方法创建一个 NettyRpcEnv ,但区别就在这里了。
    		if (!config.clientMode) {
    		val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
    			//启动服务的方法,下一步就是调用这个方法了
    			nettyEnv.startServer(config.bindAddress, actualPort)
    			(nettyEnv, nettyEnv.address.port)
    		}
    		try {
    			Utils.startServiceOnPort(config.port, startNettyRpcEnv, conf, config.name)._1
    		} catch {
    			case NonFatal(e) =>
    			nettyEnv.shutdown()
    			throw e
    		}
    		}
    		nettyEnv
    	}
    	......
    }
    

    还没完,如果是服务端调用这段代码,那么主要的功能是创建RPCEnv,即NettyRpcEnv(客户端在后面说)。以及通过下面这行代码,

    nettyEnv.startServer(config.bindAddress, actualPort)

    去调用相应的方法启动服务端的服务。下面进入到这个方法中去看看。

    class NettyRpcEnv(
                       val conf: RpcConf,
                       javaSerializerInstance: JavaSerializerInstance,
                       host: String) extends RpcEnv(conf) {
      ......
      def startServer(bindAddress: String, port: Int): Unit = {
        // here disable security
        val bootstraps: java.util.List[TransportServerBootstrap] = java.util.Collections.emptyList()
        //TransportContext 属于 spark.network 中的部分,负责 RPC 消息在网络中的传输
        server = transportContext.createServer(bindAddress, port, bootstraps)
        //在每个 RpcEndpoint 注册的时候都会注册一个默认的 RpcEndpointVerifier,它的作用是客户端调用的时候先用它来询问 Endpoint 是否存在。
        dispatcher.registerRpcEndpoint(
          RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
      }
      ......
    }
    

    执行完毕之后这个create方法就结束。这个流程主要就是开启一些服务,然后返回一个新的NettyRpcEnv。

    2.2 服务端rpcEnv.setupEndpoint("hello-service",helloEndpoint)

    这条代码会去调用NettyRpcEnv中相应的方法

    class NettyRpcEnv(
                       val conf: RpcConf,
                       javaSerializerInstance: JavaSerializerInstance,
                       host: String) extends RpcEnv(conf) {
      ......
      override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
        dispatcher.registerRpcEndpoint(name, endpoint)
      }
      ......
    }
    

    我们看到,这个方法主要是调用dispatcher进行注册的。dispatcher的功能上一节已经说了,

    Dispatcher的主要作用是保存注册的RpcEndpoint、分发相应的Message到RpcEndPoint中进行处理。Dispatcher即是上图中ThreadPool的角色。它同时也维系一个threadpool,用来处理每次接受到的 InboxMessage。而这里处理InboxMessage是通过inbox实现的。

    这里我们就说一说dispatcher的流程。

    dispatcher

    dispatcher在NettyRpcEnv被创建的时候创建出来。

    class NettyRpcEnv(
                       val conf: RpcConf,
                       javaSerializerInstance: JavaSerializerInstance,
                       host: String) extends RpcEnv(conf) {
    	......
    	//初始化时创建 dispatcher
    	private val dispatcher: Dispatcher = new Dispatcher(this)
    	......
    }
    

    dispatcher类被创建的时候也有几个属性需要注意:

    private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) {
    	......
    	//每个 RpcEndpoint 其实都会被整合成一个 EndpointData 。并且每个 RpcEndpoint 都会有一个 inbox。
    	private class EndpointData(
    								val name: String,
    								val endpoint: RpcEndpoint,
    								val ref: NettyRpcEndpointRef) {
    		val inbox = new Inbox(ref, endpoint)
    	}
    	
    	//一个阻塞队列,当有 RpcEndpoint 相关请求(InboxMessage)的时候,就会将请求塞到这个队列中,然后被线程池处理。
    	private val receivers = new LinkedBlockingQueue[EndpointData]
    	
    	//初始化便创建出来的线程池,当上面的 receivers 队列中没内容时,会阻塞。当有 RpcEndpoint 相关请求(即 InboxMessage )的时候就会立刻执行。
    	//这里处理 InboxMessage 本质上是调用相应 RpcEndpoint 的 inbox 去处理。
    	private val threadpool: ThreadPoolExecutor = {
    		val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
    		math.max(2, Runtime.getRuntime.availableProcessors()))
    		val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
    		for (i <- 0 until numThreads) {
    			pool.execute(new MessageLoop)
    		}
    		pool
    	}
    	......
    }
    

    了解一些Dispatcher的逻辑流程后,我们来正式看看Dispatcher的registerRpcEndpoint方法。

    顾名思义,这个方法就是将RpcEndpoint注册到Dispatcher中去。当有Message到来的时候,便会分发Message到相应的RpcEndPoint中进行处理。

    private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) {
      ......
      def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
        val addr = RpcEndpointAddress(nettyEnv.address, name)
    	//注册 RpcEndpoint 时需要的是 上面的 EndpointData ,其中就包含 endpointRef ,这个主要是供客户端使用的。
        val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
    	//多线程环境下,注册一个 RpcEndpoint 需要判断现在是否处于 stop 状态。
        synchronized {
          if (stopped) {
            throw new IllegalStateException("RpcEnv has been stopped")
          }
    	  //新建 EndpointData 并存储到一个 ConcurrentMap 中。
          if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
            throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
          }
          val data = endpoints.get(name)
          endpointRefs.put(data.endpoint, data.ref)
          //将 这个 EndpointData 加入到 receivers 队列中,此时 dispatcher 中的 threadpool 会去处理这个加进来的 EndpointData 
          //处理过程是调用它的 inbox 的 process()方法。然后 inbox 会等待消息到来。
          receivers.offer(data) // for the OnStart message
        }
        endpointRef
      }
      ......
    }
    

    Spark RPC服务端逻辑小结:我们说明了Spark RPC服务端启动的逻辑流程,分为两个部分,第一个是RPC env,即NettyRpcEnv的创建过程,第二个则是RpcEndpoint注册到dispatcher的流程。
    1. NettyRpcEnvFactory 创建 NettyRpcEnv

    • 根据地址创建NettyRpcEnv。
    • NettyRpcEnv开始启动服务,包括TransportContext根据地址开启监听服务,向Dispacther注册一个RpcEndpointVerifier等待。

    2. Dispatcher注册RpcEndpoint

    • Dispatcher初始化时便创建一个线程池并阻塞等待receivers队列中加入新的EndpointData
    • 一旦新加入EndpointData便会调用该EndpointData的inbox去处理消息。比如OnStart消息,或是RPCMessage等等。

    三.Spark RPC客户端

    依旧是以上一节 HelloWorld 的客户端为线索,我们来逐层深入在 RPC 中,客户端 HelloworldClient 的 asyncCall() 方法。

    object HelloworldClient {
      ......
      def asyncCall() = {
        val rpcConf = new RpcConf()
        val config = RpcEnvClientConfig(rpcConf, "hello-client")
        val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)
        val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(RpcAddress("localhost", 52345), "hello-service")
        val future: Future[String] = endPointRef.ask[String](SayHi("neo"))
        future.onComplete {
          case scala.util.Success(value) => println(s"Got the result = $value")
          case scala.util.Failure(e) => println(s"Got error: $e")
        }
        Await.result(future, Duration.apply("30s"))
        rpcEnv.shutdown()
      }
      ......
    }
    
    Spark RPC客户端时许

    创建Spark RPC客户端Env(即NettyRpcEnvFactory)部分和Spark RPC服务端是一样的,只是不会开启监听服务,这里就不详细展开。

    我们从这一句开始看,这也是Spark RPC客户端和服务端区别的地方所在。

    val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(RpcAddress("localhost", 52345), "hello-service")
    

    setupEndpointRef()

    上面的的setupEndpointRef最终会去调用下面setupEndpointRef()这个方法,这个方法中又进行一次跳转,跳转去setupEndpointRefByURI这个方法中。需要注意的是这两个方法都是RpcEnv里面的,而RpcEnv是抽象类,它里面只实现部分方法,而NettyRpcEnv继承了它,实现了全部方法。

    abstract class RpcEnv(conf: RpcConf) {
      ......
      def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef = {
        //会跳转去调用下面的方法
        setupEndpointRefByURI(RpcEndpointAddress(address, endpointName).toString)
      }
      
      def setupEndpointRefByURI(uri: String): RpcEndpointRef = {
        //其中 asyncSetupEndpointRefByURI() 返回的是 Future[RpcEndpointRef]。 这里就是阻塞,等待返回一个 RpcEndpointRef。
        // defaultLookupTimeout.awaitResult 底层调用 Await.result 阻塞 直到结果返回或返回异常
        defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri))
      }
      ......
    }  
    

    这里最主要的代码其实就一句,

    defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri))

    这一段可以分为两部分,第一部分的defaultLookupTimeout.awaitResult其实底层是调用Await.result阻塞等待一个异步操作,直到结果返回。

    而asyncSetupEndpointRefByURI(uri)则是根据给定的uri去返回一个RpcEndpointRef,它是在NettyRpcEnv中实现的:

    class NettyRpcEnv(
                       val conf: RpcConf,
                       javaSerializerInstance: JavaSerializerInstance,
                       host: String) extends RpcEnv(conf) {
      ......
      def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef] = {
        //获取地址
        val addr = RpcEndpointAddress(uri)
        //根据地址等信息新建一个 NettyRpcEndpointRef 。
        val RpcendpointRef = new NettyRpcEndpointRef(conf, addr, this) 
        //每个新建的 RpcendpointRef 都有先有一个对应的verifier 去检查服务端存不存在对应的 Rpcendpoint 。  
        val verifier = new NettyRpcEndpointRef(
          conf, RpcEndpointAddress(addr.rpcAddress, RpcEndpointVerifier.NAME), this)
        //向服务端发送请求判断是否存在对应的 Rpcendpoint。
        verifier.ask[Boolean](RpcEndpointVerifier.createCheckExistence(endpointRef.name)).flatMap { find =>
          if (find) {
            Future.successful(endpointRef)
          } else {
            Future.failed(new RpcEndpointNotFoundException(uri))
          }
        }(ThreadUtils.sameThread)
      }
      ......
    }
      
    

    asyncSetupEndpointRefByURI()这个方法实现两个功能,第一个就是新建一个RpcEndpointRef。第二个是新建一个verifier,这个verifier的作用就是先给服务端发送一个请求判断是否存在RpcEndpointRef对应的RpcEndpoint。

    这段代码中最重要的就是verifiter.ask[Boolean](...)了。如果有找到之后就会调用Future.successful这个方法,反之则会通过Future.failed抛出一个异常。

    ask可以算是比较核心的一个方法,我们可以到ask方法中去看看。

    class NettyRpcEnv{
    	......
        private[netty] def ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout): Future[T] = {
          val promise = Promise[Any]()
          val remoteAddr = message.receiver.address
          //
          def onFailure(e: Throwable): Unit = {
      //      println("555");
            if (!promise.tryFailure(e)) {
              log.warn(s"Ignored failure: $e")
            }
          }
      
          def onSuccess(reply: Any): Unit = reply match {
            case RpcFailure(e) => onFailure(e)
            case rpcReply =>
              println("666");
              if (!promise.trySuccess(rpcReply)) {
                log.warn(s"Ignored message: $reply")
              }
          }
      
          try {
            if (remoteAddr == address) {
              val p = Promise[Any]()
              p.future.onComplete {
                case Success(response) => onSuccess(response)
                case Failure(e) => onFailure(e)
              }(ThreadUtils.sameThread)
              dispatcher.postLocalMessage(message, p)
            } else {
              //跳转到这里执行
              //封装一个 RpcOutboxMessage ,同时 onSuccess 方法也是在这里注册的。
              val rpcMessage = RpcOutboxMessage(serialize(message),
                onFailure,
                (client, response) => onSuccess(deserialize[Any](client, response)))
              postToOutbox(message.receiver, rpcMessage)
              promise.future.onFailure {
                case _: TimeoutException =>  println("111");rpcMessage.onTimeout()
      //          case _ => println("222");
              }(ThreadUtils.sameThread)
            }
            
            val timeoutCancelable = timeoutScheduler.schedule(new Runnable {
              override def run(): Unit = {
      //          println("333");
                onFailure(new TimeoutException(s"Cannot receive any reply in ${timeout.duration}"))
              }
            }, timeout.duration.toNanos, TimeUnit.NANOSECONDS)
            //promise 对应的 future onComplete时会去调用,但当 successful 的时候,上面的 run 并不会被调用。
            promise.future.onComplete { v =>
      //        println("4444");
              timeoutCancelable.cancel(true)
            }(ThreadUtils.sameThread)
      
          } catch {
            case NonFatal(e) =>
              onFailure(e)
          }
      
          promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread)
        }
    	......
    }
    

    这里涉及到使用一些scala多线程的高级用法,包括Promise和Future。如果想要对这些有更加深入的了解,可以参考这篇文章

    这个函数的作用从名字中就可以看得出,其实就是将要发送的消息封装成一个RpcOutboxMessage,然后交给OutBox去发送,OutBox和前面所说的InBox对应,对应Actor模型中的MailBox(信箱)。用于发送和接收消息。

    其中使用到了Future和Promise进行异步并发以及错误处理,比如当发送时间超时的时候Promise就会返回一个TimeoutException,而我们就可以设置自己的onFailure函数去处理这些异常。

    OK,注册完RpcEndpointRef后我们便可以用它来向服务端发送消息了,而其实RpcEndpointRef发送消息还是调用ask方法,就是上面的那个ask方法。上面也有介绍,本质上就是通过OutBox进行处理。

    我们来梳理一下RPC的客户端的发送流程。

    客户端逻辑小结:客户端和服务端比较类似,都是需要创建一个NettyRpcEnv。不同的是接下来客户端创建的是RpcEndpointRef,并用之向服务端对应的RpcEndpoint发送消息。

    1.NettyRpcEnvFactory创建NettyRpcEnv

    • 根据地址创建NettyRpcEnv。根据地址开启监听服务,向Dispacther注册一个RpcEndpointVerifier等待。

    2. 创建RpcEndpointRef

    • 创建一个新的RpcEndpointRef
    • 创建对应的verifier,使用verifier向服务端发送请求,判断对应的RpcEndpoint是否存在。若存在,返回该RpcEndpointRef,否则抛出异常。

    3. RpcEndpointRef使用同步或者异步的方式发送请求。

    OK,以上就是SparkRPC时序的源码分析。下一篇会将一个实际的例子,Spark的心跳机制和代码。喜欢的话就关注一波吧


    推荐阅读 :
    从分治算法到 MapReduce
    Actor并发编程模型浅析
    大数据存储的进化史 --从 RAID 到 Hadoop Hdfs
    一个故事告诉你什么才是好的程序员

  • 相关阅读:
    [Unity3D] 如何将飞飞游戏资源提取并加载到uinty3d中
    [Unity3D] 解决提示参数动作文件不存在的错误Parameter 'XXX' does not exist.
    [Unity3D] 字体垂直自动滚动&鼠标拖拽滑动字体滚动
    [Unity3D] 图集按照顺序显示(幻灯片效果)&指定间隔时间显示
    [Unity3D] 在UI界面上显示播放视频
    TypeScript
    Css 设置超过再两行显示省略号
    vue scss 样式穿透
    JavaScript 严格模式(strict mode)
    Webpack file-loader 和 url-loader
  • 原文地址:https://www.cnblogs.com/listenfwind/p/10434380.html
Copyright © 2011-2022 走看看