zoukankan      html  css  js  c++  java
  • 基于akka实现简单的主从框架

    ========================Master==============================

    package com.scala.akka.rpc.demo2

    import akka.actor.{Actor, ActorSystem, Props}
    import com.typesafe.config.ConfigFactory

    import scala.collection.mutable
    import scala.concurrent.duration._

    /**
    * Created by 贵斌 on 2016-5-13.
    */
    class Master(masterHost: String, masterPort: Int) extends Actor {

    val registerWorkersMap = new mutable.HashMap[String, WorkerInfo]()
    val workers = new mutable.HashSet[WorkerInfo]()
    val CHECKWORKERTIME = 10000

    override def preStart(): Unit = {
    println("Master start....")
    import context.dispatcher
    context.system.scheduler.schedule(0 millis, CHECKWORKERTIME millis, self, SendMaskSelfHearBeat)
    }

    override def receive: Receive = {
    case "clientconnect" => {
    println("a client connected....")
    sender() ! "reply"
    }
    case RegisterWorker(workerId, memory, cores) => {
    if (!registerWorkersMap.contains(workerId)) {
    val workerInfo = new WorkerInfo(workerId, memory, cores)
    registerWorkersMap(workerId) = workerInfo
    workers += workerInfo
    sender ! RegisteredWorker(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master")
    }
    }
    case HearBeat(workerId) => {
    if (registerWorkersMap.contains(workerId)) {
    val workerInfo = registerWorkersMap(workerId)
    val currentTime = System.currentTimeMillis()
    workerInfo.lastHearBeat = currentTime
    }
    }
    case SendMaskSelfHearBeat => {
    val currentTime = System.currentTimeMillis()
    val diedWorkers = workers.filter(x => (currentTime - x.lastHearBeat) > CHECKWORKERTIME)
    val liveWorkers = workers.filter(x => (currentTime - x.lastHearBeat) <= CHECKWORKERTIME)
    for (w <- diedWorkers) {
    workers -= w
    }
    println(workers.size)

    }
    case "master" => println("Master start successful....")
    }
    }

    object Master {
    def main(args: Array[String]) {
    val host = "127.0.0.1"
    val port = 10000
    var configstr =
    s"""
    |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
    |akka.remote.netty.tcp.hostname = "$host"
    |akka.remote.netty.tcp.port = "$port"
    """.stripMargin
    val config = ConfigFactory.parseString(configstr)
    val MasterSystem = ActorSystem("MasterSystem", config)
    //创建Actor
    val master = MasterSystem.actorOf(Props(new Master(host, port)), "Master")
    master ! "master"
    MasterSystem.awaitTermination()

    }
    }
    ===================worker========================
    package com.scala.akka.rpc.demo2

    import java.util.UUID

    import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
    import com.typesafe.config.ConfigFactory
    import scala.concurrent.duration._

    /**
    * Created by 贵斌 on 2016-5-13.
    */
    class Worker(masterHost: String, masterPort: Int) extends Actor {

    println("worker is being restarted....")
    var master: ActorSelection = _
    val SEND_HEARBEATTIME = 5000
    val workerId = UUID.randomUUID().toString

    override def preStart(): Unit = {
    master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master")
    master ! "clientconnect"
    master ! RegisterWorker(workerId, 1000, 24)
    }

    override def receive: Receive = {
    case "work" => println("worker started successful....")
    case "reply" => println("worker connection Master successful....")
    case RegisteredWorker(masterUrl) => {
    println("worker successful registration.... ")
    println(s"Master Url is $masterUrl")
    import context.dispatcher
    context.system.scheduler.schedule(0 millis, SEND_HEARBEATTIME millis, self, SendWorkSelfHearBeat)
    }
    case SendWorkSelfHearBeat => {
    master ! HearBeat(workerId)
    }
    }
    }

    object Worker {

    def main(args: Array[String]) {
    val host = args(0)
    val port = args(1).toInt
    val masterHost = args(2)
    val masterPort = args(3).toInt
    // 准备配置
    val configStr =
    s"""
    |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
    |akka.remote.netty.tcp.hostname = "$host"
    |akka.remote.netty.tcp.port = "$port"
    """.stripMargin
    val config = ConfigFactory.parseString(configStr)
    //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的
    val actorSystem = ActorSystem("WorkerSystem", config)
    val worker = actorSystem.actorOf(Props(new Worker(masterHost, masterPort)), "Worker")
    worker ! "work"
    actorSystem.awaitTermination()
    }

    }
    ===============================================================
     
  • 相关阅读:
    BIOS详解:什么是BIOS ?BIOS的作用?CMOS及其与BIOS的关系?
    随机数不随机
    解决hexo神烦的DTraceProviderBindings MODULE_NOT_FOUND
    保护模式特权级别DPL,RPL,CPL 之间的联系和区别
    Linux内核 hlist_head/hlist_node结构解析
    x86中的页表结构和页表项格式
    Linux下/proc目录简介
    bdev文件系统
    X86 IO端口和MMIO
    Mac OS Alfred 2 tips
  • 原文地址:https://www.cnblogs.com/heitaok/p/5531603.html
Copyright © 2011-2022 走看看