zoukankan      html  css  js  c++  java
  • Scala-Akka 实例

    Akka 实例

    18.1需求分析

     

      实现一个分布式模型,Master 保持所有 Worker 节点的信息,根据
    Worker 的心跳信息维持与 Worker 的连接,Worker 启动时向 Master 节点进行
    注册,Master 节点回复 ACK 信息。 

    18.2项目源代码

    18.2.1 新建 Maven 项目 AkkaSystem

    pom.xml 文件如下: 
     
     
     
     
     
     
     

    18.2.2 WorkInfo 类抽象 

    class WorkerInfo(val id : String, val workerHost : String, val memory :
    String, val cores : String) {
      var lastHeartbeat : Long = System.currentTimeMillis()
      override def toString = s"WorkerInfo($id, $workerHost, $memory, $cores)"
    
    }

    18.2.3 ActorMessage 

    case class RegisterWorker(val id : String, val workerHost : String, val memory : String, val cores : String)
    case class HeartBeat(val workid : String)
    case class CheckOfTimeOutWorker()
    case class RegisteredWorker(val workerHost : String)
    case class SendHeartBeat()

    18.2.4 Master

    import akka.actor.{Actor, ActorSystem, Props}
    import com.typesafe.config.ConfigFactory
    import scala.collection.mutable
    import scala.concurrent.duration._
    import scala.concurrent.ExecutionContext.Implicits.global
    
    
    class Master extends Actor{
      //保存 WorkerID 和 Work 信息的 map
      val idToWorker = new mutable.HashMap[String, WorkerInfo]
      //保存所有 Worker 信息的 Set
      val workers = new mutable.HashSet[WorkerInfo]
      //Worker 超时时间
      val WORKER_TIMEOUT = 10 * 1000
      //构造方法执行完执行一次
      override def preStart(): Unit = {
        //启动定时器,定时执行
        context.system.scheduler.schedule(5 millis, WORKER_TIMEOUT millis, self, CheckOfTimeOutWorker)
      }
      //该方法会被反复执行,用于接收消息,通过 case class 模式匹配接收消息
      override def receive: Receive = {
        //Worker 向 Master 发送的注册消息
        case RegisterWorker(id, workerHost, memory, cores) => {
          if(!idToWorker.contains(id)) {
            val worker = new WorkerInfo(id, workerHost, memory, cores)
            workers.add(worker)
            idToWorker(id) = worker
            println("new register worker: "+worker)
            sender ! RegisteredWorker(worker.id)
          }
        }
        //Worker 向 Master 发送的心跳消息
        case HeartBeat(workerId) => {
          val workerInfo = idToWorker(workerId)
          println("get heartbeat message from: "+workerInfo)
          workerInfo.lastHeartbeat = System.currentTimeMillis()
        }
        //Master 自己向自己发送的定期检查超时 Worker 的消息
        case CheckOfTimeOutWorker => {
          val currentTime = System.currentTimeMillis()
          val toRemove = workers.filter(w => currentTime - w.lastHeartbeat > WORKER_TIMEOUT).toArray
          for(worker <- toRemove){
            workers -= worker
            idToWorker.remove(worker.id)
          }
          println("worker size: " + workers.size)
        }
      }
    }
    object Master {
      //程序执行入口
      def main(args: Array[String]) {
        val host = "localhost"
        val port = 8888
        //创建 ActorSystem 的必要参数
        val configStr =
          s"""
             |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
             |akka.remote.netty.tcp.hostname = "$host"
             |akka.remote.netty.tcp.port = "$port"
    """.stripMargin
        val config = ConfigFactory.parseString(configStr)
        //ActorSystem 是单例的,用来创建 Actor
        val actorSystem = ActorSystem.create("MasterActorSystem", config)
        //启动 Actor,Master 会被实例化,生命周期方法会被调用
        actorSystem.actorOf(Props[Master], "Master")
      }
    } 

    18.2.5 Worker 

    import java.util.UUID
    import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
    import com.typesafe.config.ConfigFactory
    import scala.concurrent.duration._
    import scala.concurrent.ExecutionContext.Implicits.global
    
    
    class Worker extends Actor{
      //Worker 端持有 Master 端的引用(代理对象)
      var master: ActorSelection = null
    
      //生成一个 UUID,作为 Worker 的标识
      val id = UUID.randomUUID().toString
      //构造方法执行完执行一次
      override def preStart(): Unit = {
        //Worker 向 MasterActorSystem 发送建立连接请求
        master =
          context.system.actorSelection("akka.tcp://MasterActorSystem@localhost:8888/use
            r/Master")
          //Worker 向 Master 发送注册消息
          master ! RegisterWorker(id, "localhost", "10240", "8")
          }
          //该方法会被反复执行,用于接收消息,通过 case class 模式匹配接收消息
          override def receive: Receive = {
          //Master 向 Worker 的反馈信息
          case RegisteredWorker(masterUrl) => {
          //启动定时任务,向 Master 发送心跳
          context.system.scheduler.schedule(0 millis, 5000 millis, self,
          SendHeartBeat)
          }
          case SendHeartBeat => {
          println("worker send heartbeat")
          master ! HeartBeat(id)
          }
          }
          }
          object Worker {
          def main(args: Array[String]) {
          val clientPort = 8889
          //创建 WorkerActorSystem 的必要参数
          val configStr =
          s"""
                      |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
                      |akka.remote.netty.tcp.port = $clientPort
    """.stripMargin
        val config = ConfigFactory.parseString(configStr)
        val actorSystem = ActorSystem("WorkerActorSystem", config)
        //启动 Actor,Master 会被实例化,生命周期方法会被调用
        actorSystem.actorOf(Props[Worker], "Worker")
      }
    }

    18.3项目运行 

    Master: 
     
    Worker:
  • 相关阅读:
    shell学习(三)
    shell学习(四)
    自定义yum源
    fpm制作rpm包
    shell学习(三)
    shell学习(二)
    linux系统下创建lvm挂载到指定目录
    nginx做代理安装docker
    df -h命令卡死解决办法
    docker安装
  • 原文地址:https://www.cnblogs.com/LXL616/p/11136054.html
Copyright © 2011-2022 走看看