zoukankan      html  css  js  c++  java
  • 初见akka-02:rpc框架

      1.RPC:简单点说,就是多线程之间的通信,我们今天用了scala以及akka

       来简单的实现了

       rpc框架的一些简单的内容,一脸包括了,心跳,间隔时间,

       注册以及一些问题,

       模式匹配的一些东西,虽然比较简单,但是属于麻雀虽小,五脏俱全

       这个里面一共有有四个文件:

       Master.scala

       RemoteMessage.scala

       Worker.scala

       WorkerInfo

      

       Master.scala

    package cn.wj.rpc
    
    import akka.actor.{Actor, ActorSystem}
    import akka.actor.Actor.Receive
    import com.typesafe.config.ConfigFactory
    import akka.actor.Props
    import scala.concurrent.duration._
    
    import scala.collection.mutable
    
    /**
      * Created by WJ on 2016/12/23.
      */
    class Master(val host:String,val port:Int ) extends Actor {
    
      // workerId -> WorkerInfo
      val idToWorker = new mutable.HashMap[String,WorkerInfo]()
      val workers = new mutable.HashSet[WorkerInfo]()
    
      //时间间隔时间,超时检测的间隔
      val CHECK_INTERVAL = 15000
      //用于接收消息
      override def receive: Receive = {
        case RegisterWorker(id,memory,cores) => {
    //      println("a client connected")
    //      sender ! "reply"  //往发送给他消息的人回复一个消息
          //判断一下是不是已经注册过了
        if(!(idToWorker.contains(id))){
          //把Worker的信息封装以前,保存到内存当中
          val workerInfo = new WorkerInfo(id,memory,cores)
          idToWorker(id) = workerInfo  //这个应该是scala的特定版本
          workers += workerInfo
          sender ! RegisteredWorker(s"akka.tcp://MasterSystem@$host:$port/user/Master")
        }
    
        }
        case Heartbeat(id) =>{
          if(idToWorker.contains(id)) {
            val workerInfo = idToWorker(id)
            //报活
            //得到系统当前时间
            val currentTime = System.currentTimeMillis()
            workerInfo.lastHeartbeatTime = currentTime
          }
        }
    
        case CheckTimeOutWorker => {
          val currentTime = System.currentTimeMillis()
          val toRemove = workers.filter(x => currentTime - x.lastHeartbeatTime > CHECK_INTERVAL)
          for(w <- toRemove){
            workers -= w
            idToWorker -= w.id
          }
          println(workers.size)
        }
      }
    
      override def preStart(): Unit = {
         println("prestart invoked")
         //导入隐式转换的功能
         import context.dispatcher
         context.system.scheduler.schedule(0 millis,CHECK_INTERVAL millis,self,CheckTimeOutWorker)
      }
    }
    
    object Master{
      def main(args: Array[String]): Unit = {
        val host = args(0)
        val port = args(1).toInt
        // 准备配置
        val configStr =
          s"""
             |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
             |akka.remote.netty.tcp.hostname = "$host"
             |akka.remote.netty.tcp.port = "$port"
           """.stripMargin
        val config = ConfigFactory.parseString(configStr)
        //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的
        val actorSystem = ActorSystem("MasterSystem",config )
        //创建Actor
        val master = actorSystem.actorOf(Props(new Master(host,port)),"Master")
        actorSystem.awaitTermination()
      }
    }
    

      Worker.scala

    package cn.wj.rpc
    
    import java.util.UUID
    
    import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
    import com.typesafe.config.ConfigFactory
    import scala.concurrent.duration._
    /**
      * Created by WJ on 2016/12/23.
      */
    class Worker(val masterHost:String,val masterPort:Int,val memory:Int,val cores:Int) extends Actor {
    
      var master : ActorSelection = _
      val workerId = UUID.randomUUID().toString
      val HEART_INTERVAL = 10000
      //preStart执行方法的时机:构造器之后,receive之前
      //与Master(Actor)建立连接
      override def preStart(): Unit = {
        //master已经是别的Master的引用了 ,这是跟master建立连接
        master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master")
        //向Master发送注册消息
        master ! RegisterWorker(workerId,memory,cores)
      }
    
      override def receive: Receive = {
    
        case RegisteredWorker(masterUrl) => {
          println(masterUrl)
          //启动定时器发送心跳
          import context.dispatcher
          context.system.scheduler.schedule(0 millis,HEART_INTERVAL millis,self,SendHeartbeat)
        }
    
        case SendHeartbeat =>{
          println("send heartbeat to master")
          master ! Heartbeat(workerId)
        }
    
      }
    }
    
    object Worker{
      def main(args: Array[String]): Unit = {
        val host = args(0)
        val port = args(1).toInt
        val masterHost = args(2)
        val masterPort = args(3).toInt
        val memory = args(4).toInt
        val cores = args(5).toInt
    
        // 准备配置
        val configStr =
        s"""
           |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
           |akka.remote.netty.tcp.hostname = "$host"
           |akka.remote.netty.tcp.port = "$port"
           """.stripMargin
        val config = ConfigFactory.parseString(configStr)
        //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的
        val actorSystem = ActorSystem("WorkerSystem",config )
        //创建Actor,此时调用该(Actor)的prestart以及receive方法
          actorSystem.actorOf(Props(new Worker(masterHost,masterPort,memory,cores)),"Worker")
         actorSystem.awaitTermination()
      }
    }
    

      RemoteMessage.scala

    package cn.wj.rpc
    
    /**
      * Created by WJ on 2016/12/25.
      */
    trait RemoteMessage extends Serializable
    
    //Worker->Master(这个表明当master接受这个worker时的信息,是receive)
     case class RegisterWorker (id:String, memory:Int, cores:Int) extends RemoteMessage
    
    //Master -> Worker(这个是master收到workerd的注册信息,表明已经注册过这条信息,是sender ! xxx时候出现的)
    case class RegisteredWorker(masterUrl:String) extends RemoteMessage
    
    //这是进程之间自己给自己发送消息,所以采用case object,并且不需要实现Serializable
    //Worker -> Worker(self)
    case object SendHeartbeat
    
    //这个是work向master发送定时器,其中的id是work的id,因为要向master说明,是哪一个work给他发送的心跳
    //Worker -> Master
    case class Heartbeat(id:String)  extends RemoteMessage
    
    //Master -> self
    case object CheckTimeOutWorker
    

      WorkerInfo.scala

    package cn.wj.rpc
    
    /**
      * Created by WJ on 2016/12/25.
      */
    class WorkerInfo(val id:String ,val memory :Int,val cores:Int) {
        //TODO 上一次心跳
      var lastHeartbeatTime:Long = _
    }
    

      这个上面的四个就是简单的实现了RPC框架,其实就是一个Master监控多个Worker,

      当一个Worker创建了,他就是需要在Master注册信息,其实这个Master个人感觉就像

      是个Zookeeper,掌管Worker的信息,为其Worker分配一些资源,当Master接到Worker

      的注册信息的时候,他就在自己的注册表添加上这个Worker,然后向Worker发送一个注册

      成功的信息,此时这个Worker的收到这个注册信息,然后他就给Master发送心跳,这个的

      作用是在告诉Master,我这个Worker是存活的(报活),当一个Worke发送心跳的时间间隔

      过长,长过我们规定的时间,那么此时我们就需要主动杀死这个Worker,感觉hadoop的一些

      分布式和这个原理差不多。

      下面奉上原理图一张:

      

      其中的receive是用于接受信息,因为继承Actor,

      prestart这个方法是执行实在类实例之后,receive的方法之后

    2.RPC的大概流程

      首先定义了一个worker,一个master,master首先启动了,

      然后它在prestart()的方法里面
      检测超时的worker,那么在这个里面启动了一个定时器,

      那么我们自己是不是自己可以手写一个定时器,
      比如我们可以用线程来搞定时器,但是我们的akka

      里面提供了一个超级简单的定时器,
      context.system.schedular.schedule

      (0 millis,CHECK_INTERVAL millis,self,CheckTimeOutWorker)
      其中第一个参数:延迟多少秒
      第二个参数:时间间隔
      第三个参数:把这个消息发给谁
      第四个参数:发送什么消息
      虽然它起了消息,但是他不能一下子就把消息发送出去
      ,它只能把消息先发送给自己的receive接收到这个消息,
      然后在发送给我们master,这个里面有一个检测,
      检测worker有多长时间没有向我发送心跳了,
      如果这个时间大过了我规定的范围,
      这样,Master启动完成检测心跳,worker启动完成后
      ,首先向master建立连接,然后发送注册消息
      ,master接受到这个注册消息,
      把worker的信息保存到内存当中,然后向worker反馈一个消息,
      说你注册成功了,然后worker启动一个定时器,
      定时的向master发送心跳,就是这样的流程

      

    何当共剪西窗烛,却话巴山夜雨时
  • 相关阅读:
    LeetCode 81 Search in Rotated Sorted Array II(循环有序数组中的查找问题)
    LeetCode 80 Remove Duplicates from Sorted Array II(移除数组中出现两次以上的元素)
    LeetCode 79 Word Search(单词查找)
    LeetCode 78 Subsets (所有子集)
    LeetCode 77 Combinations(排列组合)
    LeetCode 50 Pow(x, n) (实现幂运算)
    LeetCode 49 Group Anagrams(字符串分组)
    LeetCode 48 Rotate Image(2D图像旋转问题)
    LeetCode 47 Permutations II(全排列)
    LeetCode 46 Permutations(全排列问题)
  • 原文地址:https://www.cnblogs.com/wnbahmbb/p/6220528.html
Copyright © 2011-2022 走看看