最近在学习akka,在看rpc相关的东西,有点脑子疼,哈哈
1.需求:
目前大多数分布式架构底层通信是通过RPC实现的,RPC框架非常多,
比如我们学过的Hadoop项目的RPC通信框架,但是Hadoop在设计之初就
是为了运行长达数小时的批量而设计的,在某些极端的情况下,
任务提交的延迟很高,所有Hadoop的RPC显得有些笨重。
2.特点
Spark 的RPC是通过Akka类库实现的,Akka用Scala语言开发,
基于Actor并发模型实现,
Akka具有高可靠、高性能、可扩展等特点,使用Akka可以轻松
实现分布式RPC功能
3.Akka简介
Akka基于Actor模型,提供了一个用于构建可扩展的(Scalable)、
弹性的(Resilient)、快速响应的(Responsive)应用程序的平台
4.一句话描述RPC:
不同进程之间的通信调用叫做RPC,只要有网络通信即可
5.进程和线程之间的关系
一个进程包含多个线程,因为启动一个进程就相当于启动了
一个jvm(虚拟机)
6.重要的类的描述
ActorSystem是这个进程中Actor的老大,负责和监控所有的actor,
我们可以使用这个,ActorSystem创建很多个Actor,通常是
一个单例对象,Actor负责通信
7.关于一个简单的akka的小例子,自己给自己发送信息
package cn.xx.rpc import akka.actor.{Actor, ActorSystem} import akka.actor.Actor.Receive import com.typesafe.config.ConfigFactory import akka.actor.Props /** * Created by XX on 2016/12/23. */ class Master extends Actor { println("constructor invoked") //用于接收消息 override def receive: Receive = { case "connect" => { println("a client connected") } case "hello" =>{ println("hello") } } override def preStart(): Unit = { println("prestart invoked") } } object Master{ def main(args: Array[String]): Unit = { val host = args(0) val port = args(1).toInt // 准备配置 val configStr = s""" //这个s要确定,只有这样才能加入变量 |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin val config = ConfigFactory.parseString(configStr) //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的 val actorSystem = ActorSystem("MasterSystem",config ) //创建Actor val master = actorSystem.actorOf(Props[Master],"Master") master ! "hello" actorSystem.awaitTermination() } }
8.简单的不同通信之间的RPC的通行
Master.scala
package cn.wj.rpc import akka.actor.{Actor, ActorSystem} import akka.actor.Actor.Receive import com.typesafe.config.ConfigFactory import akka.actor.Props /** * Created by WJ on 2016/12/23. */ class Master extends Actor { println("constructor invoked") //用于接收消息 override def receive: Receive = { case "connect" => { println("a client connected") sender ! "reply" //往发送给他消息的人回复一个消息 } case "hello" =>{ println("hello") } } override def preStart(): Unit = { println("prestart invoked") } } object Master{ def main(args: Array[String]): Unit = { val host = args(0) val port = args(1).toInt // 准备配置 val configStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin val config = ConfigFactory.parseString(configStr) //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的 val actorSystem = ActorSystem("MasterSystem",config ) //创建Actor val master = actorSystem.actorOf(Props[Master],"Master") master ! "hello" actorSystem.awaitTermination() } }
Worker.scala
package cn.wj.rpc import akka.actor.{Actor, ActorSelection, ActorSystem, Props} import com.typesafe.config.ConfigFactory /** * Created by WJ on 2016/12/23. */ class Worker(val mastHost:String,val mastPort:Int) extends Actor { var master : ActorSelection = _ //preStart执行方法的时机:构造器之后,receive之前 //与Master(Actor)建立连接 override def preStart(): Unit = { //master已经是别的Master的引用了 // master = context.actorSelection(s"akka.tcp://MasterSystem@$mastHost:$mastPort/user/Master") master = context.actorSelection(s"akka.tcp://MasterSystem@192.168.109.1:8888/user/Master") //akka.tcp://MasterSystem@192.168.109.1:8888 master ! "connect" } override def receive: Receive = { case "reply" => { println("a reply form master") } } } object Worker{ def main(args: Array[String]): Unit = { val host = args(0) val port = args(1).toInt val masterHost = args(2) val masterPort = args(3).toInt // 准备配置 val configStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin val config = ConfigFactory.parseString(configStr) //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的 val actorSystem = ActorSystem("WorkerSystem",config ) //创建Actor,此时调用该(Actor)的prestart以及receive方法 actorSystem.actorOf(Props(new Worker(masterHost,masterPort)),"Worker") actorSystem.awaitTermination() } }
9.通信业务逻辑
首先启动Master,然后启动所有的Worker
1.Worker启动后,在preStart方法中与Master建立连接,
向Master发送注册,将Worker的信息
通过case class封装起来发送给Master
2.Master接受Worker的注册消息后将Worker的消息保存起来
3.Worker定期向Master发送心跳,为了报活