Scala从入门开始04
1.分布式通信框架Akka
- 对底层异步IO(NIO)封装,使用起来方便。Java中有Netty,Mina。Scala有Akka。
1.1什么是ACTORS
- Actor是用来收发消息的,一个Actor就是一个实例,可以创建多个Actor实现并发。所有的Actor都有管理者,它用于创建Actor和管理Actor。以后根据发送带类型的消息进行匹配,处理不同逻辑。用到Scala实现用到技术点:case class/case object。 Akka Actors遵循Actor模型,Actors并发编程不需要锁,而是通过通信的机制实现并发。
1.2什么是Akka
-
Akka是JAVA虚拟机JVM平台上构建高并发、分布式和容错应用的工具包。Akka用Scala语言写成,同时提供了Scala和JAVA的开发接口。
Akka处理并发的方法基于Actor模型。在Akka里,Actor之间通信的唯一机制就是消息传递
1.3Akka的特点
1. 单并发性 和 分布式
2. 可恢复的,弹性的
3. 高效的
4. 弹性,去中心化(分散)
5. 可扩展
1.4 Akka快速入门
-
负责管理的角色:ActorSystem
-
负责通信:Actor
-
发送的消息:case class 和 case object
-
发消息的方式: 异步、同步
-
示例演示步骤:
-
创建一个Maven工程
- idea中New --> project--> Maven --> Next --> 填写项目名
-
当前创建项目为Java项目,需要导入scala插件
<build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <!-- 编译scala --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <!-- <arg>-make:transitive</arg>--> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <!-- 打包插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>cn._51doit.rpc.Master</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
-
根据导入scala插件创建目录:main下创建scala文件夹,右键Make Directory as ---> Sources root。test下创建scala文件夹。右键Make Directory as ---> Sources test root。
-
右键项目->Add Frameworks Support->选择scala
-
引入scala依赖
<!-- 导入scala的依赖 --> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <encoding>UTF-8</encoding> <scala.version>2.11.12</scala.version> <scala.compat.version>2.11</scala.compat.version> <akka.version>2.4.17</akka.version> </properties> <dependencies> <!-- scala的依赖 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- akka actor依赖 --> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor_2.11</artifactId> <version>${akka.version}</version> </dependency> <!-- akka远程通信依赖 --> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-remote_2.11</artifactId> <version>${akka.version}</version> </dependency> </dependencies>
-
联网等待maven下载依赖。
-
idea强制清除Maven缓存:https://www.cnblogs.com/-beyond/p/11557196.html
-
1.5Akka RPC通信过程:
1.启动Master,内部启动一个定时器,定期检测超时Worker
2.启动所有Worker,Worker首先向Master建立连接,然后向Master注册,Worker把自身的信息发送给Master(ip,端口,资源)。
3.Master接收到Worker发送的注册信息,然后将Worker的信息保存起来,然后向Worker反馈信息,告诉Worker注册成功了。
4.Worker接收到了Master反馈注册成功信息,然后定期向Master发送心跳。
5.Master接收Worker的心跳信息,然后定期更新对应Worker的上一次心跳时间。
1.6 Akka创建Actor
- Master.scala
package com._xjk.rpc
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
class Master extends Actor {
/*
* 接收消息
* */
override def receive: Receive = {
case "hi" => {
println("hi")
}
case "hello" => {
println("hello")
}
}
}
//
object Master {
def main(args: Array[String]): Unit = {
// 报错:Error:scalac:bad option: "-make:transitive"
// 注销掉.idea/scala_compiler.xml下的 带有 -make:transitive 一行
val host = args(0)
val port = args(1).toInt
// 字符串传入配置信息
val confStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname="$host"
|akka.remote.netty.tcp.port="$port"
""".stripMargin
val conf = ConfigFactory.parseString(confStr)
// 创建一个 ActorSystem(创建管理Actor,单例)
val actorSystem = ActorSystem("Master-Actor-System", conf)
// 通过ActorSystem 创建Actor
val master = actorSystem.actorOf(Props[Master], "Master-Acotory")
// master给自己发送消息
// ! 表示异步发送
master ! "hello"
}
}
- 配置Master.scala 的Program arguments,运行即可。
localhost 8888
1.7 Akka 中Master和Worker建立连接
- Akka 中Master和Worker建立连接 并发送一条消息
package com._xjk.rpc
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
class Worker extends Actor{
var masterRef: ActorSelection = _
// 让Worker跟Master建立连接:在Worker的构造方法之后,在receive方法之前。
override def preStart(): Unit = {
// 跟Master 建立连接, 拿到Master代理对象
// 此处指定Master地址是akka的通信协议,是长链接,书写方式: 地址/user/Master名字
masterRef = context.actorSelection("akka.tcp://Master-Actor-System@localhost:8888/user/Master-Actor")
// 代理对象发送消息
masterRef ! "hello"
}
// masterRef 为 代理对象,并不是 真正Master的引用,因为Master和Worker不在同一个进程中,Worker实际拿到是Master代理对象。
override def receive: Receive = {
case "hello" => {
println("recv hello!")
}
}
}
object Worker {
def main(args: Array[String]): Unit = {
val host = args(0)
val port = args(1).toInt
// 字符串传入配置信息
val confStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname="$host"
|akka.remote.netty.tcp.port="$port"
""".stripMargin
val config: Config = ConfigFactory.parseString(confStr)
// 创建ActorSystem
val actorSystem = ActorSystem.apply("Worker-Actor-System", config)
// 创建Actor, Props指定类型
val worker = actorSystem.actorOf(Props[Worker], "Worker-Actor")
// worker ! "hello"
}
}
- Worker将注册信息发送Master,定义case class里面放置信息。
// 注册worker信息
case class RegisterWorker(workerId:String, memory:Int, cores:Int)
// Worker发送:
val WORKER_ID = UUID.randomUUID().toString
// 向Master发送注册消息: WorkerId,内存,核数等
masterRef ! RegisterWorker(WORKER_ID, 10240, 24)
// Master接收消息并打印:
class Master extends Actor {
/*
* 接收消息
* */
override def receive: Receive = {
case RegisterWorker(workerId, memory, cores) => {
println(s"$workerId, $memory, $cores")// bc2ba2c8-c640-4d64-9b6a-01cbdfaf00f6, 10240, 24
}
}
}
- Master保存注册信息,并返回注册成功消息
// Master.scala
class Master extends Actor {
val id2Worker = new mutable.HashMap[String, WorkerInfo]()
val workers = new mutable.HashSet[WorkerInfo]()
/*
* 接收消息
* */
override def receive: Receive = {
case RegisterWorker(workerId, memory, cores) => {
println(s"$workerId, $memory, $cores")// bc2ba2c8-c640-4d64-9b6a-01cbdfaf00f6, 10240, 24
val workerInfo = new WorkerInfo(workerId,memory,cores)
// 保存Worker信息到Map
// id2Worker += ((workerId, workerInfo))
// id2Worker += (workerId -> workerInfo)
id2Worker(workerId) = workerInfo
// 保存Worker信息到Set中
workers += workerInfo
// 向Worker反馈成功的消息,获取消息发送者的引用(代理)
// 消息接收者,可以通过sender方法获取消息发送者的连接
sender() ! RegisterdWorker
}
}
}
// Worker.scala
case RegisterdWorker => {
println("worker 接收到注册成功消息!")
}
// WorkerInfo.scala
package com._xjk.rpc
class WorkerInfo(val workerId:String, var memory: Int, var cores:Int) {
}
// Messages
package com._xjk.rpc
// 注册worker信息
case class RegisterWorker(workerId:String, memory:Int, cores:Int)
// Master向 Worker发送注册成功消息
case object RegisterdWorker
- Worker定期向Master发送心跳。
2.完整Akka练习
- Master和Worker建立连接,Worker向Master发送注册消息,Master保存注册信息并返回注册成功消息,Worker定期向Master发送心跳,Master定期检测超时Worker并移除。
- Master.scala
package com._xjk.rpc
import java.util.Date
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import scala.collection.mutable
class Master extends Actor {
val id2Worker = new mutable.HashMap[String, WorkerInfo]()
val workers = new mutable.HashSet[WorkerInfo]()
// 检测时间
val CHECK_INTERVAL = 15000
// 启动定时器,定期检测超时Worker
override def preStart(): Unit = {
// 导入隐式转换
import context.dispatcher
// 启动定时器
context.system.scheduler.schedule(
0 millis,
CHECK_INTERVAL millis,
self,
CheckTimeoutWorker
)
}
/*
* 接收消息
* */
override def receive: Receive = {
case RegisterWorker(workerId, memory, cores) => {
println(s"$workerId, $memory, $cores")// bc2ba2c8-c640-4d64-9b6a-01cbdfaf00f6, 10240, 24
val workerInfo = new WorkerInfo(workerId,memory,cores)
// 保存Worker信息到Map
// id2Worker += ((workerId, workerInfo))
// id2Worker += (workerId -> workerInfo)
id2Worker(workerId) = workerInfo
// 保存Worker信息到Set中
workers += workerInfo
// 向Worker反馈成功的消息,获取消息发送者的引用(代理)
// 消息接收者,可以通过sender方法获取消息发送者的连接
sender() ! RegisterdWorker
}
// Worker发送给Master的心跳消息
case Heartbeat(workerId) => {
println(new Date() + "," + workerId)
// 根据workerId取出对应worker的Info
val info = id2Worker(workerId)
// 获取当前时间:
val currentTime = System.currentTimeMillis()
// 更新workerInfo 的心跳时间
info.lastHeartbeatTime = currentTime
}
// 发给自己消息,用于检查Worker超时消息
case CheckTimeoutWorker => {
// 获取当前时间:
var currentTime = System.currentTimeMillis()
// 过滤超时workers
val deadWorkers = workers.filter(w => currentTime - w.lastHeartbeatTime > CHECK_INTERVAL)
// 删除出现问题的worker
// for (workerInfo <- deadWorkers) {
// val workerId= workerInfo.workerId
// // 从map中删除
// id2Worker -= workerId
// // 从set中删除
// workers -= workerInfo
// }
deadWorkers.foreach(workerInfo => {
val workerId= workerInfo.workerId
// 从map中删除
id2Worker -= workerId
// 从set中删除
workers -= workerInfo
println(s"当前worker的个数为:${id2Worker.size}")
})
}
}
}
//
object Master {
val MASTER_ACTOR_SYSTEM = "Master-Actor-System"
val MASTER_ACTOR_NAME = "Master-Actor"
def main(args: Array[String]): Unit = {
// 报错:Error:scalac:bad option: "-make:transitive"
// 注销掉.idea/scala_compiler.xml下的 带有 -make:transitive 一行
val host = args(0)
val port = args(1).toInt
// 字符串传入配置信息
val confStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname="$host"
|akka.remote.netty.tcp.port="$port"
""".stripMargin
val conf = ConfigFactory.parseString(confStr)
// 创建一个 ActorSystem(创建管理Actor,单例)
val actorSystem = ActorSystem(MASTER_ACTOR_SYSTEM, conf)
// 通过ActorSystem 创建Actor
val master = actorSystem.actorOf(Props[Master], MASTER_ACTOR_NAME)
// master给自己发送消息
// ! 表示异步发送
// master ! "hello"
}
}
- Messages
package com._xjk.rpc
// 注册worker信息
case class RegisterWorker(workerId:String, memory:Int, cores:Int)
// Master向 Worker发送注册成功消息
case object RegisterdWorker
// Worker发给Master心跳消息
case class Heartbeat(workerId:String)
// Worker发送给自己的消息
case object SendHeartbeat
// Master发送给自己检测Worker超时消息
case object CheckTimeoutWorker
- Worker.scala
package com._xjk.rpc
import java.util.UUID
// 导入时间单位
import scala.concurrent.duration._
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
class Worker (var masterHost:String, var masterPort:Int, var memory:Int, var cores:Int) extends Actor{
var masterRef: ActorSelection = _
// worker 的 ID
val WORKER_ID = UUID.randomUUID().toString
var HEARTBEAT_INTERVAL = 10000
// 让Worker跟Master建立连接:在Worker的构造方法之后,在receive方法之前。
override def preStart(): Unit = {
// 跟Master 建立连接, 拿到Master代理对象
masterRef = context.actorSelection(s"akka.tcp://${Master.MASTER_ACTOR_SYSTEM}@$masterHost:$masterPort/user/${Master.MASTER_ACTOR_NAME}")
// 代理对象发送消息
// masterRef ! "hello"
// 向Master发送注册消息: WorkerId,内存,核数等
masterRef ! RegisterWorker(WORKER_ID, memory, cores)
}
override def receive: Receive = {
case "hello" => {
println("recv hello!")
}
// Master返回给Worker注册成功的消息
case RegisterdWorker => {
println("worker 接收到注册成功消息!")
// 导入隐式转换
import context.dispatcher
// 上下文中定义延时0秒,每10秒给自己发送消息。self 表示发送给自己。
context.system.scheduler.schedule(
0 millis, HEARTBEAT_INTERVAL millis, self, SendHeartbeat
)
}
// 接收自己发送消息
case SendHeartbeat => {
// 判断是否断开连接
// 判断是否Master发生变化
// 将心跳消息发送给master
masterRef ! Heartbeat(WORKER_ID)
}
}
}
object Worker {
val WORKER_ACTOR_SYSTEM = "Worker-Actor-System"
val WORKER_ACTOR_NAME = "Worker-Actor"
def main(args: Array[String]): Unit = {
val masterHost = args(0)
val masterHort = args(1).toInt
val workerHost = args(2)
val workerPort = args(3).toInt
val workerMemory = args(4).toInt
val workerCores = args(5).toInt
// 字符串传入配置信息
val confStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname="$workerHost"
|akka.remote.netty.tcp.port="$workerPort"
""".stripMargin
val config: Config = ConfigFactory.parseString(confStr)
// 创建ActorSystem
val actorSystem = ActorSystem.apply(WORKER_ACTOR_SYSTEM, config)
// 创建Actor, Props指定类型
val worker = actorSystem.actorOf(Props(new Worker(masterHost, masterHort, workerMemory, workerCores)), WORKER_ACTOR_NAME)
// worker ! "hello"
}
}
- WorkerInfo.scala
package com._xjk.rpc
class WorkerInfo(val workerId:String, var memory: Int, var cores:Int) {
var lastHeartbeatTime: Long = _
}
-
通过maven打jar包.Lifecycle->package.(注意路径有中文会有错误)
-
运行Master
java -jar .AkkaRPC-Master-1.0.jar 127.0.0.1 8888
- 运行Worker
java -jar .AkkaRPC-Worker-1.0.jar 127.0.0.1 8888 127.0.0.1 9999 1024 2