zoukankan      html  css  js  c++  java
  • Scala使用Akka模拟RPC机制代码2

    RemoteMessage.scala

    //对象要序列化才能通过网络传输   这个地方没有大括号....这有这个extends声明
    trait RemoteMessage extends Serializable
    
    //Worker ->(发送给) Master  Worker给Master节点发送注册消息
    case class RegisterWorker(id:String, memory:Int, scores:Int ) extends RemoteMessage
    
    //接收已经在Master注册过的Worker定时发送来的心跳  Woker->Master
    case class Heartbeat(id:String) extends RemoteMessage
    
    //Master -> Worker   Master反馈给Worker已经注册的Worker有哪些
    case class RegisteredWorker(masterUrl:String) extends RemoteMessage
    
    //Worker -> self  //Worker先向自己发送一下心跳,这个地方可以做一下判断,根据实际情况,定义什么时候向Master发送消息
    //在这个样例类中发送给Master的心跳  case class Heartbeat
    case object SendHeartbeat
    
    //Master -> self  Master自检查看Worker的状态
    case object CheckTimeOutWorker

    WorkerInfo.scala

    class WorkerInfo(val id:String,val memory:Int,val scores:Int) {
      //记录上一次心跳
      var lastHeartbeatTime : Long = _
    }

    Worker.scala

    import java.util.UUID
    import com.typesafe.config.ConfigFactory
    import akka.actor.Actor
    import akka.actor.ActorSystem
    import akka.actor.Props
    import akka.actor.ActorSelection
    import scala.concurrent.duration._  //如果不引入这个包 在 context.system.scheduler.schedule 中的mills 地方会报错
    
    //使用Akka框架来实现RCP不同进程之间的方法调用   导入Actor是导入的akka包下的
    //在Worker类上定义一个主构造器
    class Worker (val masterHost:String ,val masterPort:Int ,val memory:Int ,val cores: Int) extends Actor {
      var master : ActorSelection = _
      //记录Worker从节点的id
      val workerId = UUID.randomUUID().toString()
      //记录Worker从节点的发送心跳的时间间隔
      val HEART_INTERVAL = 10000
      
      //preStart()方法在构造器之后,在receive之前执行  actorOf()方法执行 就会执行生命周期方法
      override def preStart() : Unit={
        //跟Master建立连接
        master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master")
        //向Master发送注册消息
        master ! RegisterWorker(workerId,memory,cores)
      }
      
      override def receive: Receive = {
        //Master对注册的Worker存储注册之后需要向Worker发送消息 来说明该Woker注册成功
        //此处是Worker节点接收到Master节点的反馈消息
        case RegisteredWorker(masterUrl) => {
          println(masterUrl)
          //启动定时器发送心跳
          import context.dispatcher
          //多长时间后执行 单位 ,多长时间执行一次 单位  ,消息的接受者(直接给master发不好,先给自己发送消息,以后可以做下判断,什么情况下再发送消息), 消息
          context.system.scheduler.schedule(0 millis, HEART_INTERVAL millis, self, SendHeartbeat)
        }
        //Worker先向自己self发送一下心跳,这个地方可以做一下判断,根据实际情况,定义什么时候向Master发送消息
        case SendHeartbeat =>{
          println("send heartbeat to Master")
          master ! Heartbeat(workerId)
        }
      }
    }
    object Worker{
      def main(args: Array[String]): Unit = {
        //从传入的参数列表中取得
        val host = args(0)
        val port = args(1).toInt
        val masterHost = args(2)
        val masterPort = args(3).toInt
        val memory = args(4).toInt
        val cores = args(5).toInt
        //为了通过ActorSystem老大来获得Actor  准备配置
        val configStr = 
          s"""
             |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
             |akka.remote.netty.tcp.hostname = "$host"
             |akka.remote.netty.tcp.port = "$port"
            """.stripMargin
        val config = ConfigFactory.parseString(configStr)
        //ActorSystem是所有Actor的老大,辅助创建和监控下面的Actor, 该老大是单例的.
        //得到该老大的对象需要一个名字和配置对象config
        val actorSystem = ActorSystem("WorkerSystem",config) 
        //actorOf方法执行,就会执行生命周期方法
        actorSystem.actorOf(Props(new Worker(masterHost, masterPort, memory, cores)),"Worker")
        actorSystem.awaitTermination()
      }
    }

    Master.scala

    import com.typesafe.config.ConfigFactory
    
    import akka.actor.Actor
    import akka.actor.ActorSystem
    import akka.actor.Props
    import scala.collection.mutable
    import scala.concurrent.duration._
    
    class Master(val host:String,val port:Int) extends Actor{
      
      //定义一个HashMap用来存储注册Worker的信息
      val idToWorkerHashMap = new mutable.HashMap[String,WorkerInfo]()
      //定义一个HashSet用于存储 注册的Worker对象 WorkerInfo
      val workersSet = new mutable.HashSet[WorkerInfo]() //使用set删除快, 也可用linkList
      //超时检查的间隔  这个时间间隔一定要大于Worker向Master发送心跳的时间间隔.
      val CHECK_INTERVAL = 15000
      
      override def preStart() :Unit = {
        println("preStart invoked")
        //导入隐式转换
        //使用timer太low了,可以使用akka的定时器,需要导入这个包
        import context.dispatcher 
        context.system.scheduler.schedule(0 millis, CHECK_INTERVAL millis, self, CheckTimeOutWorker)
      }
      //接收Worker发送来的消息  可以有注册RegisterWorker 心跳Heartbeat
      override def receive :Receive = {
        case RegisterWorker(id,memory,cores) =>{
          //判断该Worker是否已经注册过 
          if(!idToWorkerHashMap.contains(id)){
            //把Worker的信息封装起来保存到内存中
            val workerInfo = new WorkerInfo(id,memory,cores)
            //以id为键  worker对象为值 添加到该idToWorker的HashMap对象中
            idToWorkerHashMap(id) = workerInfo 
            //把workerInfo对象放到存储Woker的HashSet中
            workersSet += workerInfo
            //对注册的Worker存储注册之后需要向Worker发送消息 来说明该Woker注册成功
            sender ! RegisteredWorker(s"akka.tcp://MasterSystem@$host:$port/user/Master")//通知worker注册
          }
        }
        //接收已经在Master注册过的Worker定时发送来的心跳 
        case Heartbeat(id) =>{
          if(idToWorkerHashMap.contains(id)){
            val workerInfo = idToWorkerHashMap(id)
            //报活  把该Worker给Master心跳的时间记录下来
            val currentTime = System.currentTimeMillis()
            workerInfo.lastHeartbeatTime = currentTime
          }
        }
        //Master节点主动检测在Master自己这里注册Worker节点的存活状态
        case CheckTimeOutWorker =>{
          val currentTime = System.currentTimeMillis();
          //如果当前时间和最近一次心跳时间的差值大于检测的时间间隔 就要干掉这个Worker
          val toRemove = workersSet.filter(x => currentTime - x.lastHeartbeatTime > CHECK_INTERVAL)
          for(w <- toRemove){
            workersSet -= w
            idToWorkerHashMap -= w.id
          }
          println(workersSet.size)      
        }
      }
    }
    
    object Master{
      def main(args: Array[String]): Unit = {
        val host = args(0)
        val port = args(1).toInt
        //准备配置
        val configStr = 
          s"""
             |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
             |akka.remote.netty.tcp.hostname = "$host"
             |akka.remote.netty.tcp.port = "$port"        
            """.stripMargin
        //ConfigFactory 既有parseString 也有parseFile 
        val config = ConfigFactory.parseString(configStr)
        //ActorSystem是所有Actor的老大,辅助创建和监控下面的所有Actor,它是单例的
        val actorSystem = ActorSystem("MasterSystem",config)
        val master = actorSystem.actorOf(Props(new Master(host,port)),"Master")
        actorSystem.awaitTermination()
      }
    }

    另外一个版本:http://www.cnblogs.com/DreamDrive/p/6736471.html

  • 相关阅读:
    两个简单的画验证码图形程序
    Cisco路由技术基础知识详解
    网络管理中的常用命令
    网络管理中的常用命令
    基于SNMP的MIB库访问实现
    SNMP编程基础
    SNMP编程基础
    Cisco路由技术基础知识详解
    两个简单的画验证码图形程序
    模版方法
  • 原文地址:https://www.cnblogs.com/DreamDrive/p/6740440.html
Copyright © 2011-2022 走看看