zoukankan      html  css  js  c++  java
  • Scala使用Akka模拟RPC机制代码

     上代码:  另一个版本(自己加注释):http://www.cnblogs.com/DreamDrive/p/6740440.html

    RemoteMessage.scala

    trait RemoteMessage extends Serializable
    
    //Worker -> Master
    case class RegisterWorker(id: String, memory: Int, cores: Int) extends RemoteMessage
    
    case class Heartbeat(id: String) extends RemoteMessage
    
    //Master -> Worker
    case class RegisteredWorker(masterUrl: String) extends RemoteMessage
    
    //Worker -> self
    case object SendHeartbeat
    
    // Master -> self
    case object CheckTimeOutWorker

    WorkerInfo.scala

    class WorkerInfo(val id: String, val memory: Int, val cores: Int) {
    
      // 上一次心跳
      var lastHeartbeatTime : Long = _
    }

    Worker.scala

    import java.util.UUID
    
    import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
    import com.typesafe.config.ConfigFactory
    import scala.concurrent.duration._
    
    class Worker(val masterHost: String, val masterPort: Int, val memory: Int, val cores: Int) extends Actor{
    
      var master : ActorSelection = _
      val workerId = UUID.randomUUID().toString
      val HEART_INTERVAL = 10000
    
      //
      override def preStart(): Unit = {
        //跟Master建立连接
        master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master")
        //向Master发送注册消息
        master ! RegisterWorker(workerId, memory, cores)
      }
    
      override def receive: Receive = {
        case RegisteredWorker(masterUrl) => {
          println(masterUrl)
          //启动定时器发送心跳
          import context.dispatcher
          //多长时间后执行 单位,多长时间执行一次 单位, 消息的接受者(直接给master发不好, 先给自己发送消息, 以后可以做下判断, 什么情况下再发送消息), 信息
          context.system.scheduler.schedule(0 millis, HEART_INTERVAL millis, self, SendHeartbeat)
        }
    
        case SendHeartbeat => {
          println("send heartbeat to master")
          master ! Heartbeat(workerId)
        }
      }
    }
    
    object Worker {
      def main(args: Array[String]) {
        val host = args(0)
        val port = args(1).toInt
        val masterHost = args(2)
        val masterPort = args(3).toInt
        val memory = args(4).toInt
        val cores = args(5).toInt
        // 准备配置
        val configStr =
          s"""
             |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
             |akka.remote.netty.tcp.hostname = "$host"
             |akka.remote.netty.tcp.port = "$port"
           """.stripMargin
        val config = ConfigFactory.parseString(configStr)
        //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的
        val actorSystem = ActorSystem("WorkerSystem", config)
        actorSystem.actorOf(Props(new Worker(masterHost, masterPort, memory, cores)), "Worker")
        actorSystem.awaitTermination()
      }
    }

    Master.scala

    import akka.actor.{Actor, ActorSystem, Props}
    import com.typesafe.config.ConfigFactory
    
    import scala.collection.mutable
    import scala.concurrent.duration._
    class Master(val host: String, val port: Int) extends Actor {
    
      // workerId -> WorkerInfo
      val idToWorker = new mutable.HashMap[String, WorkerInfo]()
      // WorkerInfo
      val workers = new mutable.HashSet[WorkerInfo]() //使用set删除快, 也可用linkList
      //超时检查的间隔
      val CHECK_INTERVAL = 15000//这个时间间隔一定要大于心跳的时间间隔.
    
    
      override def preStart(): Unit = {
        println("preStart invoked")
        //导入隐式转换
        import context.dispatcher //使用timer太low了, 可以使用akka的, 使用定时器, 要导入这个包
        context.system.scheduler.schedule(0 millis, CHECK_INTERVAL millis, self, CheckTimeOutWorker)
      }
    
      // 用于接收消息
      override def receive: Receive = {
        case RegisterWorker(id, memory, cores) => {
          //判断一下,是不是已经注册过
          if(!idToWorker.contains(id)){
            //把Worker的信息封装起来保存到内存当中
            val workerInfo = new WorkerInfo(id, memory, cores)
            idToWorker(id) = workerInfo
            workers += workerInfo
            sender ! RegisteredWorker(s"akka.tcp://MasterSystem@$host:$port/user/Master")//通知worker注册
          }
        }
        case Heartbeat(id) => {
          if(idToWorker.contains(id)){
            val workerInfo = idToWorker(id)
            //报活
            val currentTime = System.currentTimeMillis()
            workerInfo.lastHeartbeatTime = currentTime
          }
        }
    
        case CheckTimeOutWorker => {
          val currentTime = System.currentTimeMillis()
          val toRemove = workers.filter(x => currentTime - x.lastHeartbeatTime > CHECK_INTERVAL)
          for(w <- toRemove) {
            workers -= w
            idToWorker -= w.id
          }
          println(workers.size)
        }
      }
    }
    
    object Master {
      def main(args: Array[String]) {
    
        val host = args(0)
        val port = args(1).toInt
        // 准备配置
        val configStr =
          s"""
             |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
             |akka.remote.netty.tcp.hostname = "$host"
             |akka.remote.netty.tcp.port = "$port"
           """.stripMargin
        val config = ConfigFactory.parseString(configStr)
        //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的
        val actorSystem = ActorSystem("MasterSystem", config)
        //创建Actor
        val master = actorSystem.actorOf(Props(new Master(host, port)), "Master")
        actorSystem.awaitTermination()
      }
    }
  • 相关阅读:
    单据的多个状态字段
    Win7 如何阻止程序联网
    强制关机.bat
    Delphi Class of
    坐标转换 GetCursorPos与转换
    Delphi 的RTTI机制浅探-2
    Delphi 的RTTI机制浅探-1
    Delphi 的RTTI机制-3
    Delphi 的RTTI机制-2
    Delphi 的RTTI机制-1
  • 原文地址:https://www.cnblogs.com/DreamDrive/p/6736471.html
Copyright © 2011-2022 走看看