zoukankan      html  css  js  c++  java
  • 初见akka-02:rpc框架

      1.RPC:简单点说,就是多线程之间的通信,我们今天用了scala以及akka

       来简单的实现了

       rpc框架的一些简单的内容,一脸包括了,心跳,间隔时间,

       注册以及一些问题,

       模式匹配的一些东西,虽然比较简单,但是属于麻雀虽小,五脏俱全

       这个里面一共有有四个文件:

       Master.scala

       RemoteMessage.scala

       Worker.scala

       WorkerInfo

      

       Master.scala

    package cn.wj.rpc
    
    import akka.actor.{Actor, ActorSystem}
    import akka.actor.Actor.Receive
    import com.typesafe.config.ConfigFactory
    import akka.actor.Props
    import scala.concurrent.duration._
    
    import scala.collection.mutable
    
    /**
      * Created by WJ on 2016/12/23.
      */
    class Master(val host:String,val port:Int ) extends Actor {
    
      // workerId -> WorkerInfo
      val idToWorker = new mutable.HashMap[String,WorkerInfo]()
      val workers = new mutable.HashSet[WorkerInfo]()
    
      //时间间隔时间,超时检测的间隔
      val CHECK_INTERVAL = 15000
      //用于接收消息
      override def receive: Receive = {
        case RegisterWorker(id,memory,cores) => {
    //      println("a client connected")
    //      sender ! "reply"  //往发送给他消息的人回复一个消息
          //判断一下是不是已经注册过了
        if(!(idToWorker.contains(id))){
          //把Worker的信息封装以前,保存到内存当中
          val workerInfo = new WorkerInfo(id,memory,cores)
          idToWorker(id) = workerInfo  //这个应该是scala的特定版本
          workers += workerInfo
          sender ! RegisteredWorker(s"akka.tcp://MasterSystem@$host:$port/user/Master")
        }
    
        }
        case Heartbeat(id) =>{
          if(idToWorker.contains(id)) {
            val workerInfo = idToWorker(id)
            //报活
            //得到系统当前时间
            val currentTime = System.currentTimeMillis()
            workerInfo.lastHeartbeatTime = currentTime
          }
        }
    
        case CheckTimeOutWorker => {
          val currentTime = System.currentTimeMillis()
          val toRemove = workers.filter(x => currentTime - x.lastHeartbeatTime > CHECK_INTERVAL)
          for(w <- toRemove){
            workers -= w
            idToWorker -= w.id
          }
          println(workers.size)
        }
      }
    
      override def preStart(): Unit = {
         println("prestart invoked")
         //导入隐式转换的功能
         import context.dispatcher
         context.system.scheduler.schedule(0 millis,CHECK_INTERVAL millis,self,CheckTimeOutWorker)
      }
    }
    
    object Master{
      def main(args: Array[String]): Unit = {
        val host = args(0)
        val port = args(1).toInt
        // 准备配置
        val configStr =
          s"""
             |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
             |akka.remote.netty.tcp.hostname = "$host"
             |akka.remote.netty.tcp.port = "$port"
           """.stripMargin
        val config = ConfigFactory.parseString(configStr)
        //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的
        val actorSystem = ActorSystem("MasterSystem",config )
        //创建Actor
        val master = actorSystem.actorOf(Props(new Master(host,port)),"Master")
        actorSystem.awaitTermination()
      }
    }
    

      Worker.scala

    package cn.wj.rpc
    
    import java.util.UUID
    
    import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
    import com.typesafe.config.ConfigFactory
    import scala.concurrent.duration._
    /**
      * Created by WJ on 2016/12/23.
      */
    class Worker(val masterHost:String,val masterPort:Int,val memory:Int,val cores:Int) extends Actor {
    
      var master : ActorSelection = _
      val workerId = UUID.randomUUID().toString
      val HEART_INTERVAL = 10000
      //preStart执行方法的时机:构造器之后,receive之前
      //与Master(Actor)建立连接
      override def preStart(): Unit = {
        //master已经是别的Master的引用了 ,这是跟master建立连接
        master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master")
        //向Master发送注册消息
        master ! RegisterWorker(workerId,memory,cores)
      }
    
      override def receive: Receive = {
    
        case RegisteredWorker(masterUrl) => {
          println(masterUrl)
          //启动定时器发送心跳
          import context.dispatcher
          context.system.scheduler.schedule(0 millis,HEART_INTERVAL millis,self,SendHeartbeat)
        }
    
        case SendHeartbeat =>{
          println("send heartbeat to master")
          master ! Heartbeat(workerId)
        }
    
      }
    }
    
    object Worker{
      def main(args: Array[String]): Unit = {
        val host = args(0)
        val port = args(1).toInt
        val masterHost = args(2)
        val masterPort = args(3).toInt
        val memory = args(4).toInt
        val cores = args(5).toInt
    
        // 准备配置
        val configStr =
        s"""
           |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
           |akka.remote.netty.tcp.hostname = "$host"
           |akka.remote.netty.tcp.port = "$port"
           """.stripMargin
        val config = ConfigFactory.parseString(configStr)
        //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的
        val actorSystem = ActorSystem("WorkerSystem",config )
        //创建Actor,此时调用该(Actor)的prestart以及receive方法
          actorSystem.actorOf(Props(new Worker(masterHost,masterPort,memory,cores)),"Worker")
         actorSystem.awaitTermination()
      }
    }
    

      RemoteMessage.scala

    package cn.wj.rpc
    
    /**
      * Created by WJ on 2016/12/25.
      */
    trait RemoteMessage extends Serializable
    
    //Worker->Master(这个表明当master接受这个worker时的信息,是receive)
     case class RegisterWorker (id:String, memory:Int, cores:Int) extends RemoteMessage
    
    //Master -> Worker(这个是master收到workerd的注册信息,表明已经注册过这条信息,是sender ! xxx时候出现的)
    case class RegisteredWorker(masterUrl:String) extends RemoteMessage
    
    //这是进程之间自己给自己发送消息,所以采用case object,并且不需要实现Serializable
    //Worker -> Worker(self)
    case object SendHeartbeat
    
    //这个是work向master发送定时器,其中的id是work的id,因为要向master说明,是哪一个work给他发送的心跳
    //Worker -> Master
    case class Heartbeat(id:String)  extends RemoteMessage
    
    //Master -> self
    case object CheckTimeOutWorker
    

      WorkerInfo.scala

    package cn.wj.rpc
    
    /**
      * Created by WJ on 2016/12/25.
      */
    class WorkerInfo(val id:String ,val memory :Int,val cores:Int) {
        //TODO 上一次心跳
      var lastHeartbeatTime:Long = _
    }
    

      这个上面的四个就是简单的实现了RPC框架,其实就是一个Master监控多个Worker,

      当一个Worker创建了,他就是需要在Master注册信息,其实这个Master个人感觉就像

      是个Zookeeper,掌管Worker的信息,为其Worker分配一些资源,当Master接到Worker

      的注册信息的时候,他就在自己的注册表添加上这个Worker,然后向Worker发送一个注册

      成功的信息,此时这个Worker的收到这个注册信息,然后他就给Master发送心跳,这个的

      作用是在告诉Master,我这个Worker是存活的(报活),当一个Worke发送心跳的时间间隔

      过长,长过我们规定的时间,那么此时我们就需要主动杀死这个Worker,感觉hadoop的一些

      分布式和这个原理差不多。

      下面奉上原理图一张:

      

      其中的receive是用于接受信息,因为继承Actor,

      prestart这个方法是执行实在类实例之后,receive的方法之后

    2.RPC的大概流程

      首先定义了一个worker,一个master,master首先启动了,

      然后它在prestart()的方法里面
      检测超时的worker,那么在这个里面启动了一个定时器,

      那么我们自己是不是自己可以手写一个定时器,
      比如我们可以用线程来搞定时器,但是我们的akka

      里面提供了一个超级简单的定时器,
      context.system.schedular.schedule

      (0 millis,CHECK_INTERVAL millis,self,CheckTimeOutWorker)
      其中第一个参数:延迟多少秒
      第二个参数:时间间隔
      第三个参数:把这个消息发给谁
      第四个参数:发送什么消息
      虽然它起了消息,但是他不能一下子就把消息发送出去
      ,它只能把消息先发送给自己的receive接收到这个消息,
      然后在发送给我们master,这个里面有一个检测,
      检测worker有多长时间没有向我发送心跳了,
      如果这个时间大过了我规定的范围,
      这样,Master启动完成检测心跳,worker启动完成后
      ,首先向master建立连接,然后发送注册消息
      ,master接受到这个注册消息,
      把worker的信息保存到内存当中,然后向worker反馈一个消息,
      说你注册成功了,然后worker启动一个定时器,
      定时的向master发送心跳,就是这样的流程

      

    何当共剪西窗烛,却话巴山夜雨时
  • 相关阅读:
    JetBrains注册码计算(IntelliJ IDEA 15.0注册码激活)
    java分页数据导出excel
    linux系统关机与重新启动命令
    无向图的连通性分析
    流域水文模拟
    深信服笔试题(网络project师售后)
    CSS这些代码你都不会,你还有什么好说的!!!
    springMVC3学习(四)--訪问静态文件如js,jpg,css
    POJ 3311 Hie with the Pie(状压DP + Floyd)
    NSDictionary所有API的学习。
  • 原文地址:https://www.cnblogs.com/wnbahmbb/p/6220528.html
Copyright © 2011-2022 走看看