zoukankan      html  css  js  c++  java
  • Spark中master与worker的进程RPC通信实现

    1.构建master的actor

    package SparkRPC

    import akka.actor.{Actor, ActorSystem, Props}
    import com.typesafe.config.ConfigFactory

    import scala.collection.mutable

    /**
    * Created by hqs on 2018/1/24.
    * 1.启动master,启动worker
    * 2.worker启动后连接master,发送注册消息(封装起来)
    * 3.master受到注册消息并保存,返回注册成功消息给worker
    * 4.worker启动一个定时任务,发送心跳给master(先发送给自己,在发送给master)
    * 5.master接收心跳消息,更新保存的心跳信息
    *
    * 6.master主动启动一个定时任务,检查心跳时间是否超过设定值,若超过,则删除worker的注册信息。
    */
    class Master extends Actor {

    private val workerMp: mutable.HashMap[String, WorkerInfo] = new mutable.HashMap[String, WorkerInfo]()

    override def preStart(): Unit = {
    //启动定时任务去检查是否有死去的worker
    import scala.concurrent.duration._
    import context.dispatcher
    context.system.scheduler.schedule(0 second, 15 second, self, CheckWorker)
    }

    override def receive: Receive = {
    case "start" => println("master start...")
    //接收并注册,返回成功消息。
    case Register2Master(workerId, cores, memory) => {
    workerMp(workerId) = new WorkerInfo(cores, memory)
    println(s"add a worker,workerId = ${workerId}")
    println(s"now total workers = ${workerMp.size}")
    sender() ! RegisSuccess
    }
    //接收心跳,更新信息
    case HeartBeat(workerId) => {
    if (workerMp.contains(workerId)) {
    workerMp(workerId).lastloginTime = System.currentTimeMillis()
    }
    }
    case CheckWorker => {
    //过滤出已经超时的worker,大于两个心跳认为超时。
    val deadWorkers = workerMp.filter({
    mp => {
    System.currentTimeMillis() - mp._2.lastloginTime > 20 * 1000
    }
    })
    //用一个map来减去另外一个map
    workerMp --= deadWorkers.map(mp => mp._1)
    println(s"now total workers = ${workerMp.size}")
    }
    }
    }

    object Master {


    val MASTER_ACS_NAME = "master_acs_name"
    val MASTER_AC_NAME = "master_ac_name"


    def main(args: Array[String]): Unit = {

    if (args.length != 2) {
    println("Master <masterIp,masterPort>")
    sys.exit()
    }

    val Array(masterIp, masterPort) = args
    val str =
    s"""
    |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
    |akka.remote.netty.tcp.hostname = "${masterIp}"
    |akka.remote.netty.tcp.port = "${masterPort}"
    """.stripMargin
    val conf = ConfigFactory.parseString(str)
    val acs: ActorSystem = ActorSystem.create(MASTER_ACS_NAME, conf)
    val masterRef = acs.actorOf(Props(new Master), MASTER_AC_NAME)

    masterRef ! "start"


    }
    }

    2.构建worker的actor

    package SparkRPC

    import java.util.UUID

    import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
    import com.typesafe.config.ConfigFactory

    /**
    * Created by hqs on 2018/1/24.
    */
    class Worker(val masterIp:String,val masterPort:Int,val cores:Int,val memory:Int) extends Actor{
    val workerId = UUID.randomUUID().toString
    var masSele: ActorSelection = null
    //注册worker信息
    override def preStart(): Unit = {
    //取得master的路径
    val path = s"akka.tcp://${Master.MASTER_ACS_NAME}@${masterIp}:${masterPort}/user/${Master.MASTER_AC_NAME}"
    masSele = context.actorSelection(path)
    masSele ! Register2Master(workerId,cores,memory)
    }

    override def receive: Receive = {
    case "start" => println("worker starting")
    //发送定时心跳信息
    case RegisSuccess => {
    println("success start scheduler")
    /**
    * initialDelay: FiniteDuration, 延迟时间 延迟启动定时任务的时间
    * interval: FiniteDuration, 间隔时间 每隔多长时间
    * receiver: ActorRef, 信息发给谁 接收方
    * message: Any 发送的信息 封装成case class
    */
    //导入时间单位,启动定时任务。
    import scala.concurrent.duration._
    import context.dispatcher
    context.system.scheduler.schedule(0 second,10 second,self,SendHeartBeat)
    }
    case SendHeartBeat => {
    masSele ! HeartBeat(workerId)
    println("worker 向 master 发送心跳信息...")
    }
    }
    }
    object Worker{

    val WORKER_ACS_NAME = "worker_acs_name"
    val WORKER_AC_NAME = "worker_ac_name"
    def main(args: Array[String]): Unit = {

    if(args.length != 6){
    println("Worker <masterIp,masterPort,workerIp,workerPort,cores,memory>")
    sys.exit()
    }
    val Array(masterIp,masterPort,workerIp,workerPort,cores,memory) = args

    val str =
    s"""
    |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
    |akka.remote.netty.tcp.hostname = "${workerIp}"
    |akka.remote.netty.tcp.port = "${workerPort}"
    """.stripMargin
    val conf = ConfigFactory.parseString(str)
    val acs = ActorSystem.create(WORKER_ACS_NAME,conf)
    val scRef = acs.actorOf(Props(new Worker(masterIp,masterPort.toInt,cores.toInt,memory.toInt)),WORKER_AC_NAME)

    scRef ! "start"
    }
    }

    3.master与worker的消息传递封装

    package SparkRPC

    /**
    * Created by hqs on 2018/1/27.
    */
    class Message {

    }
    //worker发送注册消息
    case class Register2Master(workerId:String,cores:Int,memory:Int)
    //master返回注册成功的消息
    case object RegisSuccess
    //发送心跳给自己
    case object SendHeartBeat
    //发送心跳给master
    case class HeartBeat(workerId:String)
    //master定时检查worker存活状态
    case object CheckWorker


    4.总结:master与worker依赖于akka的actor来实现通信。会产生定时心跳任务,检查超时的worker。

  • 相关阅读:
    P1903 [国家集训队]数颜色 / 维护队列 莫对算法
    P1016 旅行家的预算 模拟 贪心
    P3948 数据结构 差分数组
    乘法逆元 模板
    二分法 最大化平均值
    HDU5213 Lucky 莫队算法 容斥定理
    P1083 借教室 差分数组
    发布订阅、redis的配置文件、redis的主从、redis的持久化、
    nosql、redis、性能测试、命令相关、redis的数据类型string、list、hash、set、zset、
    nginx的日志、禁止访问、反向代理、权重、nginx location匹配规则、location分离、WSGI、
  • 原文地址:https://www.cnblogs.com/beiyi888/p/9724129.html
Copyright © 2011-2022 走看看