zoukankan      html  css  js  c++  java
  • 【Spark】编程实战之模拟SparkRPC原理实现自定义RPC

    1. 什么是RPC   

        RPC(Remote Procedure Call)远程过程调用。在Hadoop和Spark中都使用了PRC,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。简单来说,就是有A、B两台机器,A机器可以调用B机器上的程序。

    2. Spark 的RPC

        Master和Worker的启动流程:

        (1) 启动Master,会启动一个定时器,定时检查超时的Worker,并移除超时Worker信息。

        (2) 启动Worker,向Master发送注册信息。

        (3) Master收到Worker发来的注册信息后,保存到内存中,并返回一个响应信息,这个信息就是自己的masterUrl。

        (4) Worker接收到Master发来的响应信息(masterUrl)之后,保存到内存中,并开启一个定时器,定时向Master发送心跳信息。

        (5) Master 不断的接收Worker发来的心跳信息,并将每个Worker的最后一次心跳时间为当前接收到心跳信息的时间。

        流程如下图。

    3. 编程实战

    3.1 项目代码(Scala语言)

        WorkInfo.scala

    package com.nova.rpc
    /**
      * @author Supernova
      * @date 2018/06/15
      */
    class WorkerInfo(val id: String, val host: String, val port: Int,val memory: Int, val cores: Int) {
      // 记录最后一次心跳时间
      var lastHeartbeatTime: Long = _
    }

        RemoteMsg.scala 

    package com.nova.rpc
    
    /**
      * @author Supernova
      * @date 2018/06/15
      */
    trait RemoteMsg extends Serializable{
    
    }
    
    // Master 向自己发送检查超时Worker的信息
    case object CheckTimeOutWorker
    
    // Worker向Master发送的注册信息
    case class RegisterWorker(id: String, host: String,port: Int, memory: Int, cores: Int) extends RemoteMsg
    
    // Master向Worker发送的响应信息
    case class RegisteredWorker(masterUrl: String) extends RemoteMsg
    
    // Worker向Master发送的心跳信息
    case class Heartbeat(workerId: String) extends RemoteMsg
    
    // Worker向自己发送的要执行发送心跳信息的消息
    case object SendHeartbeat
    

      

    Master.scala    

    package com.nova.rpc
    
    import akka.actor.{Actor, ActorSystem, Props}
    import com.typesafe.config.{Config, ConfigFactory}
    
    import scala.collection.mutable
    import scala.concurrent.duration._
    
    /**
      * @author Supernova
      * @date 2018/06/15
      */
    class Master(val masterHost: String, val masterPort: Int) extends Actor{
      // 用来存储Worker的注册信息: <workerId, WorkerInfo>
      val idToWorker = new mutable.HashMap[String, WorkerInfo]()
    
      // 用来存储Worker的信息,必须使用可变的HashSet
      val workers = new mutable.HashSet[WorkerInfo]()
    
      // Worker的超时时间间隔
      val checkInterval: Long = 15000
    
      /**
        * 重写生命周期preStart方法
        * 作用:当Master启动时,开启定时器,定时检查超时Worker
        */
      override def preStart(): Unit = {
        // 启动定时器,定时检查超时的Worker
        import context.dispatcher
        context.system.scheduler.schedule(0 millis,checkInterval millis, self,CheckTimeOutWorker)
      }
    
      /**
        *  重写生命周期receive方法
        *  作用:
        *  1.接收Worker发来的注册信息
        *  2.不断接收Worker发来的心跳信息,并更新最后一次心跳时间
        *  3.过滤出超时的Worker并移除
        */
      override def receive = {
    
        // 接收Worker给Master发送过来的注册信息
        case RegisterWorker(id, host, port, memory, cores) => {
          //判断改Worker是否已经注册过,已注册的不执行任何操作,未注册的将进行注册
          if (!idToWorker.contains(id)) {
            val workerInfo = new WorkerInfo(id, host, port, memory, cores)
    
            idToWorker += (id -> workerInfo)
            workers += workerInfo
    
            println("一个新的Worker注册成功")
    
            //向Worker发送响应信息,将masterUrl返回
            sender ! RegisteredWorker(s"akka.tcp://${Master.MASTER_SYSTEM}" +
              s"@${masterHost}:${masterPort}/user/${Master.MASTER_ACTOR}")
          }
        }
        //接收Worker发来的心跳信息
        case Heartbeat(workerId) => {
          // 通过传输过来的workerId获取对应的WorkerInfo
          val workerInfo = idToWorker(workerId)
          // 获取当前时间
          val currentTime = System.currentTimeMillis()
          // 更新最后一次心跳时间
          workerInfo.lastHeartbeatTime = currentTime
        }
        //检查超时Worker并移除
        case CheckTimeOutWorker => {
          val currentTime = System.currentTimeMillis()
          // 把超时的Worker过滤出来
          val toRemove: mutable.HashSet[WorkerInfo] =
            workers.filter(w => currentTime - w.lastHeartbeatTime > checkInterval)
          // 将超时的Worker移除
          toRemove.foreach(deadWorker => {
            idToWorker -= deadWorker.id
            workers -= deadWorker
          })
        }
        println(s"当前Worker的数量: ${workers.size}")
      }
    }
    
    object Master{
      val MASTER_SYSTEM = "MasterSystem"
      val MASTER_ACTOR = "Master"
    
      def main(args: Array[String]): Unit = {
    
        val host = args(0) // 通过main方法参数制定master主机名
        val port = args(1).toInt //通过main方法参数指定Master的端口号
    
        //akka配置信息
        val configStr: String =
          s"""
             |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
             |akka.remote.netty.tcp.hostname = "$host"
             |akka.remote.netty.tcp.port = "$port"
          """.stripMargin
    
        // 配置创建Actor需要的配置信息
        val config: Config = ConfigFactory.parseString(configStr)
    
        // 创建ActorSystem
        val actorSystem: ActorSystem = ActorSystem(MASTER_SYSTEM, config)
    
        // 用actorSystem实例创建Actor
        actorSystem.actorOf(Props(new Master(host, port)), MASTER_ACTOR)
    
        actorSystem.awaitTermination()
      }
    }
    

      

    Worker.scala

    package com.nova.rpc
    
    import java.util.UUID
    import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
    import com.typesafe.config.{Config, ConfigFactory}
    
    import scala.concurrent.duration._
    /**
      * @author Supernova
      * @date 2018/06/15
      */
    class Worker(val host: String, val port: Int, val masterHost: String,val masterPort: Int, val memory: Int, val cores: Int) extends Actor{
    
      // 生成一个Worker ID
      val workerId: String = UUID.randomUUID().toString
    
      // 用来存储MasterUrl
      var masterUrl: String = _
    
      // 心跳时间间隔
      val heartbeat_interval: Long = 10000
    
      // Master的Actor
      var master: ActorSelection = _
    
      /**
        * 生命周期preStart方法
        * 作用:当启动Worker时,向master发送注册信息
        */
      override def preStart(): Unit = {
        // 获取Master的Actor
        master = context.actorSelection(s"akka.tcp://${Master.MASTER_SYSTEM}" +
          s"@${masterHost}:${masterPort}/user/${Master.MASTER_ACTOR}")
        master ! RegisterWorker(workerId, host, port, memory, cores)
      }
    
      /**
        * 生命周期receive方法
        * 作用:
        * 定时向Master发送心跳信息
        */
      override def receive: Receive = {
        // Worker接收到Master发送过来的注册成功的信息(masterUrl)
        case RegisteredWorker(masterUrl) => {
          this.masterUrl = masterUrl
          // 启动一个定时器, 定时的给Master发送心跳
          import context.dispatcher
          context.system.scheduler.schedule(
            0 millis, heartbeat_interval millis, self, SendHeartbeat)
        }
        case SendHeartbeat => {
          // 向Master发送心跳信息
          master ! Heartbeat(workerId)
        }
      }
    
    }
    
    object Worker{
      val WORKER_SYSTEM = "WorkerSystem"
      val WORKER_ACTOR = "Worker"
    
      def main(args: Array[String]): Unit = {
        /**
          * 通过main方法参数指定相应的
          * worker主机名、端口号,master主机名、端口号,使用的内存和核数
          */
        val host = args(0)
        val port = args(1).toInt
        val masterHost = args(2)
        val masterPort = args(3).toInt
        val memory = args(4).toInt
        val cores = args(5).toInt
    
        //akka配置信息
        val configStr =
          s"""
             |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
             |akka.remote.netty.tcp.hostname = "$host"
             |akka.remote.netty.tcp.port = "$port"
          """.stripMargin
    
        // 配置创建Actor需要的配置信息
        val config: Config = ConfigFactory.parseString(configStr)
    
        // 创建ActorSystem
        val actorSystem: ActorSystem = ActorSystem(WORKER_SYSTEM, config)
    
        // 用actorSystem实例创建Actor
        actorSystem.actorOf(Props(new Worker(
          host, port, masterHost, masterPort, memory, cores)), WORKER_ACTOR)
    
        actorSystem.awaitTermination()
      }
    }
    

      

        3.2 测试运行

    由于Master 和Worker的运行都是使用main方法参数传入相应的主机名端口等参数,所以在运行前要在IDEA中的Edit Configurations 窗口中传入相应的参数。在本次测试中,我指定的参数如图:

    【Master端】

    【Worker端】


    【运行结果】

    1. 先运行Master,可以看到一旦运行Master,就启动了定时器检查超时Worker,因为还没有Worker进行注册,所以结果一直为0


    2. 启动Worker


    3. 启动Worker后,再看Master的窗口可以发现Worker注册成功,并且数量为1

    4. 关闭Worker,此时Worker已经宕掉了,可以发现Master窗口会收到一条警告信息,并且Master在定时检查超时Worker的时候移除了过期未收到心跳的Worker

  • 相关阅读:
    抓取登录后的数据
    Form认证的几点说明
    eclipse启动错误:java.lang.NoClassDefFoundError: org/eclipse/core/resources/IContainer
    mysql游标的使用 No data
    mysql insert 主键 重复问题
    tail 命令
    maven 打包可执行jar的方法
    maven中如何打包源代码
    工程师,请优化你的代码
    在服务器端判断request来自Ajax请求(异步)还是传统请求(同步)
  • 原文地址:https://www.cnblogs.com/snova/p/9195690.html
Copyright © 2011-2022 走看看