zoukankan      html  css  js  c++  java
  • 大数据学习——akka自定义RPC

    实现

    package cn.itcast.akka
    
    import akka.actor.{Actor, ActorSystem, Props}
    import akka.actor.Actor.Receive
    import com.typesafe.config.ConfigFactory
    
    import scala.collection.mutable
    
    import scala.concurrent.duration._
    
    class Master(val host: String, val port: Int) extends Actor {
    
      //保存WorkerID 到 WorkerInfo的映射
      val idToWorker = new mutable.HashMap[String, WorkerInfo]()
      //保存所的WorkerInfo信息
      val workers = new mutable.HashSet[WorkerInfo]()
    
      val CHECK_INTERVAL = 15000
    
      override def preStart(): Unit = {
        //导入隐式转换
        import context.dispatcher
        context.system.scheduler.schedule(0 millis, CHECK_INTERVAL millis, self, CheckTimeOutWorker)
      }
    
      override def receive: Receive = {
        //Worker发送个Mater的注册消息
        case RegisterWorker(workerId, cores, memory) => {
          if (!idToWorker.contains(workerId)) {
            //封装worker发送的信息
            val workerInfo = new WorkerInfo(workerId, cores, memory)
            //保存workerInfo
            idToWorker(workerId) = workerInfo
            workers += workerInfo
            //Master向Worker反馈注册成功的消息
            sender ! RegisteredWorker(s"akka.tcp://${Master.MASTER_SYSTEM}@$host:$port/user/${Master.MASTER_NAME}")
          }
        }
    
        //Worker发送给Master的心跳信息
        case Heartbeat(workerId) => {
          if (idToWorker.contains(workerId)) {
            val workerInfo = idToWorker(workerId)
            val currentTime = System.currentTimeMillis()
            //更新上一次心跳时间
            workerInfo.lastHeartbeatTime = currentTime
          }
        }
    
        //检测超时的Worker
        case CheckTimeOutWorker => {
          val currentTime = System.currentTimeMillis()
          val deadWorkers: mutable.HashSet[WorkerInfo] = workers.filter(w => currentTime - w.lastHeartbeatTime > CHECK_INTERVAL)
          //      for(w <- deadWorkers) {
          //        idToWorker -= w.id
          //        workers -= w
          //      }
          deadWorkers.foreach(w => {
            idToWorker -= w.id
            workers -= w
          })
          println("alive worker size : " + workers.size)
        }
      }
    }
    
    object Master {
    
      val MASTER_SYSTEM = "MaterActorSystem"
      val MASTER_NAME = "Master"
    
      def main(args: Array[String]) {
    
        //    val host = args(0)
        //    val port = args(1).toInt
        val host = "127.0.0.1"
        val port = 8888
        val confStr =
          s"""
             |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
             |akka.remote.netty.tcp.hostname = "$host"
             |akka.remote.netty.tcp.port = "$port"
           """.stripMargin
        val conf = ConfigFactory.parseString(confStr)
        //ActorSystem是单例的,用于创建Acotor并监控actor
        val actorSystem = ActorSystem(MASTER_SYSTEM, conf)
        //通过ActorSystem创建Actor
        actorSystem.actorOf(Props(new Master(host, port)), MASTER_NAME)
        actorSystem.awaitTermination()
    
      }
    }
    package cn.itcast.akka
    
    trait Message extends Serializable
    
    //Worker -> Master
    case class RegisterWorker(id: String, cores: Int, memory: Int) extends Message
    
    //Master -> Worker
    case class RegisteredWorker(masterUrl: String) extends Message
    
    //Worker -> Master
    case class Heartbeat(id: String) extends Message
    
    //Worker internal message
    case object SendHeartbeat
    
    //Master internal message
    case object CheckTimeOutWorker
    package cn.itcast.akka
    
    import java.util.UUID
    import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
    import com.typesafe.config.ConfigFactory
    
    import scala.concurrent.duration._
    
    class Worker(val cores: Int, val memory: Int, val masterHost: String, val masterPort: Int) extends Actor {
    
      //Master的引用
      var master: ActorSelection = _
      //Worker的ID
      val workerId = UUID.randomUUID().toString
      //masterUrl
      var masterUrl: String = _
    
      val HEARTBEAT_INTERVAL = 10000
    
      //preStart在构造器之后receive之前执行
      override def preStart(): Unit = {
        //首先跟Master建立连接
        master = context.actorSelection(s"akka.tcp://${Master.MASTER_SYSTEM}@$masterHost:$masterPort/user/${Master.MASTER_NAME}")
        //通过master的引用向Master发送注册消息
        master ! RegisterWorker(workerId, cores, memory)
      }
    
      override def receive: Receive = {
        //Master发送给Worker注册成功的消息
        case RegisteredWorker(masterUrl) => {
          this.masterUrl = masterUrl
          //启动定时任务,向Master发送心跳
          //导入隐式转换
          import context.dispatcher
          context.system.scheduler.schedule(0 millis, HEARTBEAT_INTERVAL millis, self, SendHeartbeat)
        }
    
        case SendHeartbeat => {
          //向Master发送心跳
          master ! Heartbeat(workerId)
        }
      }
    }
    
    object Worker {
      def main(args: Array[String]) {
    
        //Worker的地址和端口
        //    val host = args(0)
        //    val port = args(1).toInt
        //    val cores = args(2).toInt
        //    val memory = args(3).toInt
        val host = "127.0.0.1"
        val port = 9999
        val cores = 8
        val memory = 1024
    
        //Master的地址和端口
        //    val masterHost = args(4)
        //    val masterPort = args(5).toInt
        val masterHost = "127.0.0.1"
        val masterPort = 8888
    
        val confStr =
          s"""
             |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
             |akka.remote.netty.tcp.hostname = "$host"
             |akka.remote.netty.tcp.port = "$port"
           """.stripMargin
        val conf = ConfigFactory.parseString(confStr)
        //单例的ActorSystem
        val actorSystem = ActorSystem("WorkerActorSystem", conf)
        //通过actorSystem来创建Actor
        val worker = actorSystem.actorOf(Props(new Worker(cores, memory, masterHost, masterPort)), "Worker")
        actorSystem.awaitTermination()
      }
    }
    package cn.itcast.akka
    
    
    class WorkerInfo(val id: String, val cores: Int, val memory: Int) {
    
      //TODO
      var lastHeartbeatTime: Long = _
    
    }

     

  • 相关阅读:
    TcIC(Teamcenter集成CatiaV5)的安装
    centos7上使用bind解析子域名
    Windows10 家庭版(1903/1909)中用RDPWrapper-v1.6.2和autoupdate补丁开启远程桌面功能
    修改SQL Server Express 2019 sa用户密码的方法
    微星B450主板安装64G内存的一个小招数
    缩小xfs文件系统的CentOS/RedHat虚拟机硬盘的迂回方法
    MQL命令的打开方式
    台电TBook二合一本全新安装Windows10
    django_auth_ldap
    开始认真学计算机网络----computer network学习笔记(一)
  • 原文地址:https://www.cnblogs.com/feifeicui/p/10996077.html
Copyright © 2011-2022 走看看