原文地址:http://my.oschina.net/jingxing05/blog/287462
ActorSystem(“companyname”)
相当于注册一家公司一样,负责:
- 通用配置 如:dispatchers, deployments, remote capabilities and addresses
- 创建Actor和搜索actor
- 通常一个应用一个Actorsystem
ActorRef
actOf会异步的启动一个Actor,并返回这个Actor的ActorRef,作为消息目的地,ActorRef的特征:
- immutable
- 与Actor是一对一关系
- 可序列化和网络传输,以实现远程透明性
启动Actor
actor_system.actorOf(Props(new HelloActor("jingxing05")), name = "helloactor")
如果一个Actor有多个属性,可以通过如下方式设置其值:
- 发送适当的消息
- 放到构造函数中
- 重写preStart方法
Actor交互消息
消息必须是immutable的
通过 ! 给Actor发送消息
Actor的消息处理方式
def receive = { // 接受消息时隐式的传入了 发送者的 ActorRef 名为 sender
case "hello" =>
println("hello back to you")
send ! “done”
case _ => println("Huh ? ")
}
Actor的生命周期
- construct Actor代码体 是constructor的一部分
- preStart Actor刚启动后被call
- receive
- preRestart 待重启Actor知晓 重启原因
- postRestart 新启动的Actor 立即调用 显示重启原因 在其中调 preStart
- postStop Actor停止后 被call 用来 清理现场资源
停止Actor的方法
- 在Actor系统层 system.stop(actorRef)
- 在Actor里面context.stop(actorRef)
- 向Actor发送 PoisonPill消息
- gracefuStop方法 + try + future技术
stop方法 等Actor执行完正在处理的消息 就停止,其后的消息丢失
PoisonPill跟普通消息一样进入Actor的邮箱队列,遇到PoisonPill消息 就停止,PoisonPill之后的消息丢失
停止Actor是异步的,有两步:1、挂起邮箱不再接受消息,并向下属Actor发送stop消息
2、处理下属Actor返回的终结消息,直到所有下属Actor都结束,然后 自行了断
3、如果子Actor没响应,则必须等待
丢失的没有处理的消息被送到 Actor系统的deadLetter Actor中
向Actor发送 Kill 消息会导致该Actor重启
监控Actor的死
使用Actor的context的watch方法可以监控该context生出的子Actor
当子Actor收到Kill或停止时 其上级Actor会收到Terminated(actorrefName)的消息,可进行处理
Actor抛出异常时,并不会发送Terminated消息,而是自动restart
搜索actor
一个actor系统中有很多actor可以使用ActorSystem实例或context的actorSelection方法进行搜索
四种实例
- val kenny = system.actorSelection("/user/Parent/Kenny")
- val kenny = context.actorSelection("../Kenny") //相对路径
- val kenny = system.actorFor("akka://DeathWatchTest/user/Parent/Kenny")
- val kenny = system.actorFor(Seq("user", "Parent", "Kenny"))
- val kenny = system.actorFor(Seq("..", "Kenny"))
Future对象
使用future
可以包裹你想要执行的代码到future对象中
返回值可以是future
有三个回调可以使用 : onSuccess onFailure onComplete 都是偏函数 partial func
scala移位操作
左移:补充0, 右移:补充最高的符号位, 无符号右移则 补充0
可以调用几个返回future的代码
然后用for 表达式来join返回的结果,如下示例:
1: println("starting futures")
2: val result1 = Cloud.runAlgorithm(10)3: val result2 = Cloud.runAlgorithm(20)4: val result3 = Cloud.runAlgorithm(30)5: println("before for-comprehension")
6: val result = for {
7: r1 <- result18: r2 <- result29: r3 <- result310: } yield (r1 + r2 + r3)11: println("before onSuccess")
12: result onSuccess {13: case result => println(s"total = $result")14: }
Future及其执行上下文ExecutionContext
- Future[T]是包含并发计算的容器,在executioncontext分配给他的线程中,在某个不确定的时间开始执行,在不确定的时间返回T 或者Exception
- Future计算完毕可以返回结果时视为completed,有三种情况:complete success failure
- Future提供从其返回结果中提取值的接口,如三个回调函数,for、map、flatMap等
- ExecutionContext可视为一个线程池,Future在这个池中的某个线程中执行
- ExecutionContext.Implicits.global是默认的执行上下文,可以import
- 回调函数也是异步的,可能在不同的线程中执行,顺序不确定
- 多个Future可通过以下函数组合:map flatMap filter foreach recoverWith fallbackTo andThen
? and ask来发送消息并等待回复
在actor中定义一些状态,然后使用 become(somestate)方法转换状态
使用scala的parallel集合提高性能
适用于 顺序无关的计算,在测试使用和不用并行集合的性能后取舍
两种途径:1、调用集合的par方法转换为并行集合;2、使用ParXX集合类
不可变并行集合:
ParHashMap ParHashSet ParIterable ParMap
ParRange ParSeq ParSet ParVector
可变的并行集合: ParArray