zoukankan      html  css  js  c++  java
  • 基于akka实现简单的主从框架

    ========================Master==============================

    package com.scala.akka.rpc.demo2

    import akka.actor.{Actor, ActorSystem, Props}
    import com.typesafe.config.ConfigFactory

    import scala.collection.mutable
    import scala.concurrent.duration._

    /**
    * Created by 贵斌 on 2016-5-13.
    */
    class Master(masterHost: String, masterPort: Int) extends Actor {

    val registerWorkersMap = new mutable.HashMap[String, WorkerInfo]()
    val workers = new mutable.HashSet[WorkerInfo]()
    val CHECKWORKERTIME = 10000

    override def preStart(): Unit = {
    println("Master start....")
    import context.dispatcher
    context.system.scheduler.schedule(0 millis, CHECKWORKERTIME millis, self, SendMaskSelfHearBeat)
    }

    override def receive: Receive = {
    case "clientconnect" => {
    println("a client connected....")
    sender() ! "reply"
    }
    case RegisterWorker(workerId, memory, cores) => {
    if (!registerWorkersMap.contains(workerId)) {
    val workerInfo = new WorkerInfo(workerId, memory, cores)
    registerWorkersMap(workerId) = workerInfo
    workers += workerInfo
    sender ! RegisteredWorker(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master")
    }
    }
    case HearBeat(workerId) => {
    if (registerWorkersMap.contains(workerId)) {
    val workerInfo = registerWorkersMap(workerId)
    val currentTime = System.currentTimeMillis()
    workerInfo.lastHearBeat = currentTime
    }
    }
    case SendMaskSelfHearBeat => {
    val currentTime = System.currentTimeMillis()
    val diedWorkers = workers.filter(x => (currentTime - x.lastHearBeat) > CHECKWORKERTIME)
    val liveWorkers = workers.filter(x => (currentTime - x.lastHearBeat) <= CHECKWORKERTIME)
    for (w <- diedWorkers) {
    workers -= w
    }
    println(workers.size)

    }
    case "master" => println("Master start successful....")
    }
    }

    object Master {
    def main(args: Array[String]) {
    val host = "127.0.0.1"
    val port = 10000
    var configstr =
    s"""
    |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
    |akka.remote.netty.tcp.hostname = "$host"
    |akka.remote.netty.tcp.port = "$port"
    """.stripMargin
    val config = ConfigFactory.parseString(configstr)
    val MasterSystem = ActorSystem("MasterSystem", config)
    //创建Actor
    val master = MasterSystem.actorOf(Props(new Master(host, port)), "Master")
    master ! "master"
    MasterSystem.awaitTermination()

    }
    }
    ===================worker========================
    package com.scala.akka.rpc.demo2

    import java.util.UUID

    import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
    import com.typesafe.config.ConfigFactory
    import scala.concurrent.duration._

    /**
    * Created by 贵斌 on 2016-5-13.
    */
    class Worker(masterHost: String, masterPort: Int) extends Actor {

    println("worker is being restarted....")
    var master: ActorSelection = _
    val SEND_HEARBEATTIME = 5000
    val workerId = UUID.randomUUID().toString

    override def preStart(): Unit = {
    master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master")
    master ! "clientconnect"
    master ! RegisterWorker(workerId, 1000, 24)
    }

    override def receive: Receive = {
    case "work" => println("worker started successful....")
    case "reply" => println("worker connection Master successful....")
    case RegisteredWorker(masterUrl) => {
    println("worker successful registration.... ")
    println(s"Master Url is $masterUrl")
    import context.dispatcher
    context.system.scheduler.schedule(0 millis, SEND_HEARBEATTIME millis, self, SendWorkSelfHearBeat)
    }
    case SendWorkSelfHearBeat => {
    master ! HearBeat(workerId)
    }
    }
    }

    object Worker {

    def main(args: Array[String]) {
    val host = args(0)
    val port = args(1).toInt
    val masterHost = args(2)
    val masterPort = args(3).toInt
    // 准备配置
    val configStr =
    s"""
    |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
    |akka.remote.netty.tcp.hostname = "$host"
    |akka.remote.netty.tcp.port = "$port"
    """.stripMargin
    val config = ConfigFactory.parseString(configStr)
    //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的
    val actorSystem = ActorSystem("WorkerSystem", config)
    val worker = actorSystem.actorOf(Props(new Worker(masterHost, masterPort)), "Worker")
    worker ! "work"
    actorSystem.awaitTermination()
    }

    }
    ===============================================================
     
  • 相关阅读:
    二进制部署k8s集群(7):创建(Pod, Deployment、Service)验证kubernetes集群
    二进制部署k8s集群(六):部署kube-proxy
    centos同步系统时间
    二进制部署k8s集群(五):部署kubelet
    二进制部署k8s集群(四):部署controller-manager与kube-scheduler
    二进制部署k8s集群(三):部署kube-apiserver,签发kube-apiserver证书|kuelete证书|kube-proxy证书
    二进制部署k8s集群(二): 签发etcd证书,安装etcd集群
    二进制部署k8s集群(一):前期准备,安装虚拟机与DNS软件bind9
    docker-compose.yml 使用说明
    python--将字符串类型的list 转换成 list
  • 原文地址:https://www.cnblogs.com/heitaok/p/5531603.html
Copyright © 2011-2022 走看看