zoukankan      html  css  js  c++  java
  • Actor模型原理

    Actor模型=数据+行为+消息。

    Actor模型内部的状态由自己的行为维护,外部线程不能直接调用对象的行为,必须通过消息才能激发行为,这样就保证Actor内部数据只有被自己修改。

    Actor模型如何实现?

    Scala或ErLang的进程信箱都是一种Actor模型,也有Java的专门的Actor模型,这里是几种Actor模型比较

    明白了Actor模型原理,使用Disruptor这样无锁队列也可以自己实现Actor模型,让一个普通对象与外界的交互调用通过Disruptor消息队列实现,比如LMAX架构就是这样实现高频交易,从2009年成功运行至今,被Martin Fowler推崇。

    Scala Actor线程模型可以这样理解:所有Actor共享一个线程池,总的线程个数可以配置,也可以根据CPU个数决定;当一个Actor启动之后,Scala分配一个线程给它使用,如果使用receive模型,这个线程就一直为该Actor所有,如果使用react模型,Scala执行完react方法后抛出异常,则该线程就可以被其它Actor使用。

    下面看一些核心代码。

    1.  def start(): Actor = synchronized {  
    2.   // Reset various flags.  
    3.   //  
    4.   // Note that we do *not* reset `trapExit`. The reason is that  
    5.   // users should be able to set the field in the constructor  
    6.   // and before `act` is called.  
    7.  
    8.   exitReason = 'normal  
    9.   exiting = false 
    10.   shouldExit = false 
    11.  
    12.   scheduler execute {  
    13.     ActorGC.newActor(Actor.this)  
    14.     (new Reaction(Actor.this)).run()  
    15.   }  
    16.  
    17.   this 
    18. }  

    其中Reaction实现Runnable接口,scheduler基本相当于是一个线程池,所以调用start方法之后会有一个线程来为该Actor服务。

    使用receive模型。

    1. def receive[R](f: PartialFunction[Any, R]): R = {  
    2.  assert(Actor.self == this"receive from channel belonging to other actor")  
    3.  this.synchronized {  
    4.    if (shouldExit) exit() // links  
    5.    val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))  
    6.    if (null eq qel) {  
    7.      waitingFor = f.isDefinedAt  
    8.      isSuspended = true 
    9.      suspendActor()  
    10.    } else {  
    11.      received = Some(qel.msg)  
    12.      sessions = qel.session :: sessions  
    13.    }  
    14.    waitingFor = waitingForNone  
    15.    isSuspended = false 
    16.  }  
    17.  val result = f(received.get)  
    18.  sessions = sessions.tail  
    19.  result  
    20.   

    如果当前mailbox里面没有可以处理的消息,调用suspendActor,该方法会调用wait;如果有消息,这调用PartialFunction进行处理。

    使用react模型。

    1. def react(f: PartialFunction[Any, Unit]): Nothing = {  
    2.  assert(Actor.self == this"react on channel belonging to other actor")  
    3.  this.synchronized {  
    4.    if (shouldExit) exit() // links  
    5.    val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))  
    6.    if (null eq qel) {  
    7.      waitingFor = f.isDefinedAt  
    8.      continuation = f  
    9.      isDetached = true 
    10.    } else {  
    11.      sessions = List(qel.session)  
    12.      scheduleActor(f, qel.msg)  
    13.    }  
    14.    throw new SuspendActorException  
    15.  }  
    16.   

    如果当前mailbox没有可以处理的消息,设置waitingFor和continuation,这两个变量会在接收到消息的时候使用;如果有消息,则调用scheduleActor,该方法会在线程池里选择一个新的线程来处理,具体的处理方法也是由PartialFunction决定。不管是哪条路径,react都会立即返回,或者说是立即抛出异常,结束该线程的执行,这样该线程就可以被其它Actor使用。

    再来看看接收消息的处理代码。

    1. def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized {  
    2.  if (waitingFor(msg)) {  
    3.    received = Some(msg)  
    4.  
    5.    if (isSuspended)  
    6.      sessions = replyTo :: sessions  
    7.    else 
    8.      sessions = List(replyTo)  
    9.  
    10.    waitingFor = waitingForNone  
    11.  
    12.    if (!onTimeout.isEmpty) {  
    13.      onTimeout.get.cancel()  
    14.      onTimeout = None  
    15.    }  
    16.  
    17.    if (isSuspended)  
    18.      resumeActor()  
    19.    else // assert continuation != null  
    20.      scheduler.execute(new Reaction(this, continuation, msg))  
    21.  } else {  
    22.    mailbox.append(msg, replyTo)  
    23.  }   

    如果当前没有在等待消息或者接收到的消息不能处理,就丢到mailbox里去;相反,则进行消息的处理。这里对于receive模型和react模型就有了分支:如果isSuspended为true,表示是receive模型,并且线程在wait,就调用resumeActor,该方法会调用notify;否则就是react模型,同样在线程池里选择一个线程进行处理。

    这样,相信大家对Scala Actor就有了一个基本的认识。


  • 相关阅读:
    分解让复杂问题简单化:字符串的排列
    分解让复杂问题简单化:二叉搜索树与双向链表
    分解让复杂问题简单化:复杂链表的复制
    举例让抽象问题具体化:二叉树中和为某一值的路径
    举例让抽象问题具体化:二叉搜索树的后序遍历序列
    Java Collection Framework
    Spring Boot 项目部署到本地Tomcat,出现访问路径问题
    happens-before规则
    NoClassDefFoundError
    《Java编程思想》笔记 第十六章 数组
  • 原文地址:https://www.cnblogs.com/shenlanzifa/p/5288747.html
Copyright © 2011-2022 走看看