RemoteMessage.scala
//对象要序列化才能通过网络传输 这个地方没有大括号....这有这个extends声明 trait RemoteMessage extends Serializable //Worker ->(发送给) Master Worker给Master节点发送注册消息 case class RegisterWorker(id:String, memory:Int, scores:Int ) extends RemoteMessage //接收已经在Master注册过的Worker定时发送来的心跳 Woker->Master case class Heartbeat(id:String) extends RemoteMessage //Master -> Worker Master反馈给Worker已经注册的Worker有哪些 case class RegisteredWorker(masterUrl:String) extends RemoteMessage //Worker -> self //Worker先向自己发送一下心跳,这个地方可以做一下判断,根据实际情况,定义什么时候向Master发送消息 //在这个样例类中发送给Master的心跳 case class Heartbeat case object SendHeartbeat //Master -> self Master自检查看Worker的状态 case object CheckTimeOutWorker
WorkerInfo.scala
class WorkerInfo(val id:String,val memory:Int,val scores:Int) { //记录上一次心跳 var lastHeartbeatTime : Long = _ }
Worker.scala
import java.util.UUID import com.typesafe.config.ConfigFactory import akka.actor.Actor import akka.actor.ActorSystem import akka.actor.Props import akka.actor.ActorSelection import scala.concurrent.duration._ //如果不引入这个包 在 context.system.scheduler.schedule 中的mills 地方会报错 //使用Akka框架来实现RCP不同进程之间的方法调用 导入Actor是导入的akka包下的 //在Worker类上定义一个主构造器 class Worker (val masterHost:String ,val masterPort:Int ,val memory:Int ,val cores: Int) extends Actor { var master : ActorSelection = _ //记录Worker从节点的id val workerId = UUID.randomUUID().toString() //记录Worker从节点的发送心跳的时间间隔 val HEART_INTERVAL = 10000 //preStart()方法在构造器之后,在receive之前执行 actorOf()方法执行 就会执行生命周期方法 override def preStart() : Unit={ //跟Master建立连接 master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master") //向Master发送注册消息 master ! RegisterWorker(workerId,memory,cores) } override def receive: Receive = { //Master对注册的Worker存储注册之后需要向Worker发送消息 来说明该Woker注册成功 //此处是Worker节点接收到Master节点的反馈消息 case RegisteredWorker(masterUrl) => { println(masterUrl) //启动定时器发送心跳 import context.dispatcher //多长时间后执行 单位 ,多长时间执行一次 单位 ,消息的接受者(直接给master发不好,先给自己发送消息,以后可以做下判断,什么情况下再发送消息), 消息 context.system.scheduler.schedule(0 millis, HEART_INTERVAL millis, self, SendHeartbeat) } //Worker先向自己self发送一下心跳,这个地方可以做一下判断,根据实际情况,定义什么时候向Master发送消息 case SendHeartbeat =>{ println("send heartbeat to Master") master ! Heartbeat(workerId) } } } object Worker{ def main(args: Array[String]): Unit = { //从传入的参数列表中取得 val host = args(0) val port = args(1).toInt val masterHost = args(2) val masterPort = args(3).toInt val memory = args(4).toInt val cores = args(5).toInt //为了通过ActorSystem老大来获得Actor 准备配置 val configStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin val config = ConfigFactory.parseString(configStr) //ActorSystem是所有Actor的老大,辅助创建和监控下面的Actor, 该老大是单例的. //得到该老大的对象需要一个名字和配置对象config val actorSystem = ActorSystem("WorkerSystem",config) //actorOf方法执行,就会执行生命周期方法 actorSystem.actorOf(Props(new Worker(masterHost, masterPort, memory, cores)),"Worker") actorSystem.awaitTermination() } }
Master.scala
import com.typesafe.config.ConfigFactory import akka.actor.Actor import akka.actor.ActorSystem import akka.actor.Props import scala.collection.mutable import scala.concurrent.duration._ class Master(val host:String,val port:Int) extends Actor{ //定义一个HashMap用来存储注册Worker的信息 val idToWorkerHashMap = new mutable.HashMap[String,WorkerInfo]() //定义一个HashSet用于存储 注册的Worker对象 WorkerInfo val workersSet = new mutable.HashSet[WorkerInfo]() //使用set删除快, 也可用linkList //超时检查的间隔 这个时间间隔一定要大于Worker向Master发送心跳的时间间隔. val CHECK_INTERVAL = 15000 override def preStart() :Unit = { println("preStart invoked") //导入隐式转换 //使用timer太low了,可以使用akka的定时器,需要导入这个包 import context.dispatcher context.system.scheduler.schedule(0 millis, CHECK_INTERVAL millis, self, CheckTimeOutWorker) } //接收Worker发送来的消息 可以有注册RegisterWorker 心跳Heartbeat override def receive :Receive = { case RegisterWorker(id,memory,cores) =>{ //判断该Worker是否已经注册过 if(!idToWorkerHashMap.contains(id)){ //把Worker的信息封装起来保存到内存中 val workerInfo = new WorkerInfo(id,memory,cores) //以id为键 worker对象为值 添加到该idToWorker的HashMap对象中 idToWorkerHashMap(id) = workerInfo //把workerInfo对象放到存储Woker的HashSet中 workersSet += workerInfo //对注册的Worker存储注册之后需要向Worker发送消息 来说明该Woker注册成功 sender ! RegisteredWorker(s"akka.tcp://MasterSystem@$host:$port/user/Master")//通知worker注册 } } //接收已经在Master注册过的Worker定时发送来的心跳 case Heartbeat(id) =>{ if(idToWorkerHashMap.contains(id)){ val workerInfo = idToWorkerHashMap(id) //报活 把该Worker给Master心跳的时间记录下来 val currentTime = System.currentTimeMillis() workerInfo.lastHeartbeatTime = currentTime } } //Master节点主动检测在Master自己这里注册Worker节点的存活状态 case CheckTimeOutWorker =>{ val currentTime = System.currentTimeMillis(); //如果当前时间和最近一次心跳时间的差值大于检测的时间间隔 就要干掉这个Worker val toRemove = workersSet.filter(x => currentTime - x.lastHeartbeatTime > CHECK_INTERVAL) for(w <- toRemove){ workersSet -= w idToWorkerHashMap -= w.id } println(workersSet.size) } } } object Master{ def main(args: Array[String]): Unit = { val host = args(0) val port = args(1).toInt //准备配置 val configStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin //ConfigFactory 既有parseString 也有parseFile val config = ConfigFactory.parseString(configStr) //ActorSystem是所有Actor的老大,辅助创建和监控下面的所有Actor,它是单例的 val actorSystem = ActorSystem("MasterSystem",config) val master = actorSystem.actorOf(Props(new Master(host,port)),"Master") actorSystem.awaitTermination() } }