  • Akka(7): FSM:通过状态变化来转换运算行为


    trait FSM[S, D] extends Actor with Listeners with ActorLogging {...}

    我们看到:FSM就是一个特殊的Actor。带着两个类型参数:S代表状态类型,D代表状态数据类型。实际上S和D结合起来就是FSM的内部状态,即:SomeState+DataX 和 SomeState+DataY分别代表不同的Actor内部状态,这点从State定义可以得到信息:

       * This captures all of the managed state of the [[akka.actor.FSM]]: the state
       * name, the state data, possibly custom timeout, stop reason and replies
       * accumulated while processing the last message.
      case class State[S, D](stateName: S, stateData: D, timeout: Option[FiniteDuration] = None, stopReason: Option[Reason] = None, replies: List[Any] = Nil) {...}


    State(SA) x Event(E) -> Actions (A), State(SB)


       * All messages sent to the [[akka.actor.FSM]] will be wrapped inside an
       * `Event`, which allows pattern matching to extract both state and data.
      final case class Event[D](event: Any, stateData: D) extends NoSerializationVerificationNeeded


    trait Seasons   //States
    case object Spring extends Seasons
    case object Summer extends Seasons
    case object Fall extends Seasons
    case object Winter extends Seasons
    class SeasonInfo(talks: Int, month: Int)  //Data
    case object BeginSeason extends SeasonInfo(0,1)


    object FillSeasons {
      sealed trait Messages    //功能消息
      case object HowYouFeel extends Messages
      case object NextMonth extends Messages


       * *******************************************
       *       Main actor receive() method
       * *******************************************
      override def receive: Receive = {
        case TimeoutMarker(gen) ⇒
          if (generation == gen) {
            processMsg(StateTimeout, "state timeout")
        case t @ Timer(name, msg, repeat, gen) ⇒
          if ((timers contains name) && (timers(name).generation == gen)) {
            if (timeoutFuture.isDefined) {
              timeoutFuture = None
            generation += 1
            if (!repeat) {
              timers -= name
            processMsg(msg, t)
        case SubscribeTransitionCallBack(actorRef) ⇒
          // TODO Use context.watch(actor) and receive Terminated(actor) to clean up list
          // send current state back as reference point
          actorRef ! CurrentState(self, currentState.stateName)
        case Listen(actorRef) ⇒
          // TODO Use context.watch(actor) and receive Terminated(actor) to clean up list
          // send current state back as reference point
          actorRef ! CurrentState(self, currentState.stateName)
        case UnsubscribeTransitionCallBack(actorRef) ⇒
        case Deafen(actorRef) ⇒
        case value ⇒ {
          if (timeoutFuture.isDefined) {
            timeoutFuture = None
          generation += 1
          processMsg(value, sender())

    除timer,subscription等特殊功能外,case value => ... 就是处理自定义消息的地方了。我们看到FSM是用processMsg(value, sender())来处理消息的。processMsg又调用了processEvent:

      private def processMsg(value: Any, source: AnyRef): Unit = {
        val event = Event(value, currentState.stateData)
        processEvent(event, source)
      private[akka] def processEvent(event: Event, source: AnyRef): Unit = {
        val stateFunc = stateFunctions(currentState.stateName)
        val nextState = if (stateFunc isDefinedAt event) {
        } else {
          // handleEventDefault ensures that this is always defined


       * State definitions
      private val stateFunctions = mutable.Map[S, StateFunction]()


     type StateFunction = scala.PartialFunction[Event, State]

    FSM的receive函数在收到消息后把消息包嵌入新构建的Event然后在processEvent里通过stateName取出相应的StateFunction后传入Event产生新的状态State。用户提供的StateFunction是通过FSM的when函数压进stateFunction Map里的:

       * Insert a new StateFunction at the end of the processing chain for the
       * given state. If the stateTimeout parameter is set, entering this state
       * without a differing explicit timeout setting will trigger a StateTimeout
       * event; the same is true when using #stay.
       * @param stateName designator for the state
       * @param stateTimeout default state timeout for this state
       * @param stateFunction partial function describing response to input
      final def when(stateName: S, stateTimeout: FiniteDuration = null)(stateFunction: StateFunction): Unit =
        register(stateName, stateFunction, Option(stateTimeout))
      private def register(name: S, function: StateFunction, timeout: Timeout): Unit = {
        if (stateFunctions contains name) {
          stateFunctions(name) = stateFunctions(name) orElse function
          stateTimeouts(name) = timeout orElse stateTimeouts(name)
        } else {
          stateFunctions(name) = function
          stateTimeouts(name) = timeout

    我们看到when调用了register在stateFunction Map中按stateName放置StateFunction。FSM的这个stateFunction Map解决了become/unbecome产生的堆栈问题。FSM有个比较规范的结构,拿上面例子的FeelingSeasons结构做个示范:

    class FillSeasons extends FSM[Seasons,SeasonInfo] with ActorLogging {
      import FillSeasons._
      startWith(Spring,SeasonInfo(0,1))  //起始状态
      when(Spring) {   //状态在春季
        case Event(HowYouFeel,seasonInfo) => ...
      when(Summer) {  //夏季状态
        case Event(HowYouFeel,_) =>
      when(Fall) {  //秋季状态
        case Event(HowYouFeel,SeasonInfo(tks,mnth)) =>
      when(Winter) {  //冬季状态
        case Event(HowYouFeel,si@ SeasonInfo(tks,_)) =>
      whenUnhandled {  //所有状态未处理的Event
        case Event(NextMonth,seasonInfo) =>
      onTransition {
        case Spring -> Summer => log.info("Season changed from Spring to Summer month 1")
        case Summer -> Fall => log.info("Season changed from Summer to Fall month 1")
        case Fall -> Winter => log.info("Season changed from Fall to Winter month 1")
        case Winter -> Spring => log.info("Season changed from Winter to Spring month 1")
      initialize()  //设定起始状态


      type TransitionHandler = PartialFunction[(S, S), Unit]


       * Verify existence of initial state and setup timers. This should be the
       * last call within the constructor, or [[akka.actor.Actor#preStart]] and
       * [[akka.actor.Actor#postRestart]]
       * An initial `currentState -> currentState` notification will be triggered by calling this method.
       * @see [[#startWith]]
      final def initialize(): Unit =
        if (currentState != null) makeTransition(currentState)
        else throw new IllegalStateException("You must call `startWith` before calling `initialize`")

    完整的FSM FeelingSeasons定义如下: 

    class FillSeasons extends FSM[Seasons,SeasonInfo] with ActorLogging {
      import FillSeasons._
      startWith(Spring,SeasonInfo(0,1))  //起始状态
      when(Spring) {   //状态在春季
        case Event(HowYouFeel,seasonInfo) =>
          val numtalks = seasonInfo.talks + 1
          log.info(s"It's ${stateName.toString}, feel so gooood! You've asked me ${numtalks}times.")
          stay using seasonInfo.copy(talks = numtalks)
      when(Summer) {  //夏季状态
        case Event(HowYouFeel,_) =>
          val numtalks = stateData.talks + 1
          log.info(s"It's ${stateName.toString}, it's so hot! You've asked me ${numtalks}times")
          stay().using(stateData.copy(talks = numtalks))
      when(Fall) {  //秋季状态
        case Event(HowYouFeel,SeasonInfo(tks,mnth)) =>
          val numtalks = tks + 1
          log.info(s"It's ${stateName.toString}, it's no so bad. You've asked me ${numtalks}times.")
          stay using SeasonInfo(numtalks,mnth)
      when(Winter) {  //冬季状态
        case Event(HowYouFeel,si@ SeasonInfo(tks,_)) =>
          val numtalks = tks + 1
          log.info(s"It's ${stateName.toString}, it's freezing cold! You've asked me ${numtalks}times.")
          stay using si.copy(talks = numtalks)
      whenUnhandled {  //所有状态未处理的Event
        case Event(NextMonth,seasonInfo) =>
          val mth = seasonInfo.month
          if (mth <= 3) {
            log.info(s"It's month ${mth+1} of ${stateName.toString}")
            stay using seasonInfo.copy(month = mth + 1)
          else {
            goto(nextSeason(stateName)) using SeasonInfo(0,1)
      onTransition {
        case Spring -> Summer => log.info(s"Season changed from Spring to Summer month ${nextStateData.month}")
        case Summer -> Fall => log.info(s"Season changed from Summer to Fall month ${nextStateData.month}")
        case Fall -> Winter => log.info(s"Season changed from Fall to Winter month ${nextStateData.month}")
        case Winter -> Spring => log.info(s"Season changed from Winter to Spring month ${nextStateData.month}")
      initialize()  //设定起始状态
      log.info(s"It's month 1 of ${stateName.toString}")
      def nextSeason(season: Seasons): Seasons =
        season match {
          case Spring => Summer
          case Summer => Fall
          case Fall => Winter
          case Winter => Spring

    首先注意StateFunction中SeasonInfo的各种意思同等的表达方式及nextStateData。FSM状态数据用不可变对象(immutable object)最安全,所以在更新时必须用case class 的copy或直接构建新的SeasonInfo实例。


     private[akka] def processEvent(event: Event, source: AnyRef): Unit = {
        val stateFunc = stateFunctions(currentState.stateName)
        val nextState = if (stateFunc isDefinedAt event) {
        } else {
          // handleEventDefault ensures that this is always defined

    先运算用户定义的StateFunction处理事件Event获取新的状态State。然后调用applyState运算makeTransition处理状态转换(currentState = nextState):

      private[akka] def applyState(nextState: State): Unit = {
        nextState.stopReason match {
          case None ⇒ makeTransition(nextState)
          case _ ⇒
            nextState.replies.reverse foreach { r ⇒ sender() ! r }
      private[akka] def makeTransition(nextState: State): Unit = {
        if (!stateFunctions.contains(nextState.stateName)) {
          terminate(stay withStopReason Failure("Next state %s does not exist".format(nextState.stateName)))
        } else {
          nextState.replies.reverse foreach { r ⇒ sender() ! r }
          if (currentState.stateName != nextState.stateName || nextState.notifies) {
            this.nextState = nextState
            handleTransition(currentState.stateName, nextState.stateName)
            gossip(Transition(self, currentState.stateName, nextState.stateName))
            this.nextState = null
          currentState = nextState
          def scheduleTimeout(d: FiniteDuration): Some[Cancellable] = {
            import context.dispatcher
            Some(context.system.scheduler.scheduleOnce(d, self, TimeoutMarker(generation)))
          currentState.timeout match {
            case SomeMaxFiniteDuration                    ⇒ // effectively disable stateTimeout
            case Some(d: FiniteDuration) if d.length >= 0 ⇒ timeoutFuture = scheduleTimeout(d)
            case _ ⇒
              val timeout = stateTimeouts(currentState.stateName)
              if (timeout.isDefined) timeoutFuture = scheduleTimeout(timeout.get)

    我们用FSM DSL的stay, goto,using来取得新的FSM状态和数据:

       * Produce transition to other state.
       * Return this from a state function in order to effect the transition.
       * This method always triggers transition events, even for `A -> A` transitions.
       * If you want to stay in the same state without triggering an state transition event use [[#stay]] instead.
       * @param nextStateName state designator for the next state
       * @return state transition descriptor
      final def goto(nextStateName: S): State = FSM.State(nextStateName, currentState.stateData)
       * Produce "empty" transition descriptor.
       * Return this from a state function when no state change is to be effected.
       * No transition event will be triggered by [[#stay]].
       * If you want to trigger an event like `S -> S` for `onTransition` to handle use `goto` instead.
       * @return descriptor for staying in current state
      final def stay(): State = goto(currentState.stateName).withNotification(false) // cannot directly use currentState because of the timeout field


         * Modify state transition descriptor with new state data. The data will be
         * set when transitioning to the new state.
        def using(@deprecatedName('nextStateDate) nextStateData: D): State[S, D] = {
          copy(stateData = nextStateData)



      final def startWith(stateName: S, stateData: D, timeout: Timeout = None): Unit =
        currentState = FSM.State(stateName, stateData, timeout)
      final def goto(nextStateName: S): State = FSM.State(nextStateName, currentState.stateData)
      final def stay(): State = goto(currentState.stateName).withNotification(false) 
      final def stop(): State = stop(Normal)
      final def transform(func: StateFunction): TransformHelper = new TransformHelper(func)


    case class State[S, D](stateName: S, stateData: D, timeout: Option[FiniteDuration] = None, stopReason: Option[Reason] = None, replies: List[Any] = Nil) {
        // defined here to be able to override it in SilentState
        def copy(stateName: S = stateName, stateData: D = stateData, timeout: Option[FiniteDuration] = timeout, stopReason: Option[Reason] = stopReason, replies: List[Any] = replies): State[S, D] = {
          new State(stateName, stateData, timeout, stopReason, replies)
         * Modify state transition descriptor to include a state timeout for the
         * next state. This timeout overrides any default timeout set for the next
         * state.
         * Use Duration.Inf to deactivate an existing timeout.
        def forMax(timeout: Duration): State[S, D] = timeout match {
          case f: FiniteDuration ⇒ copy(timeout = Some(f))
          case Duration.Inf      ⇒ copy(timeout = SomeMaxFiniteDuration) // we map the Infinite duration to a special marker,
          case _                 ⇒ copy(timeout = None) // that means "cancel stateTimeout". This marker is needed
        } // so we do not have to break source/binary compat.
        // TODO: Can be removed once we can break State#timeout signature to `Option[Duration]`
         * Send reply to sender of the current message, if available.
         * @return this state transition descriptor
        def replying(replyValue: Any): State[S, D] = {
          copy(replies = replyValue :: replies)
         * Modify state transition descriptor with new state data. The data will be
         * set when transitioning to the new state.
        def using(@deprecatedName('nextStateDate) nextStateData: D): State[S, D] = {
          copy(stateData = nextStateData)

    FSM DSL中的transform是这样定义的:

      final class TransformHelper(func: StateFunction) {
        def using(andThen: PartialFunction[State, State]): StateFunction =
          func andThen (andThen orElse { case x ⇒ x })
      final def transform(func: StateFunction): TransformHelper = new TransformHelper(func)

    我们看到TransformHelper用using对入参func:StateFunction施用用户提供的andThen: PartialFunction[State,State]后返回新的状态State。这个using与State.using是不同的。下面是一个transform用法例子:

    when(Running) {
        transform {
          case Event(m, Target(Actor.noSender)) =>
            goto(Uninitialised) using NoConfig
          case Event(m, Target(ref)) =>
            ref ! m
        } using targetTransformer
      def targetTransformer: PartialFunction[State, State] = {
        case s @ State(stateName, Target(ref), _, _, _) if ref.path.name.startsWith("testActor") =>
          log.debug("Setting target to dead letters")



    when(Uninitialised) {
        case Event(Config(ref), _) =>
          goto(Running) using Target(ref)
        case Event(_, _) =>
      when(Running) {
        case Event(m, Target(ref)) =>
          ref ! m
      onTransition {
        case Uninitialised -> Running => unstashAll()



    import akka.actor._
    sealed trait Seasons   //States
    case object Spring extends Seasons
    case object Summer extends Seasons
    case object Fall extends Seasons
    case object Winter extends Seasons
    //sealed trait SeasonData  //Data
    case class SeasonInfo(talks: Int, month: Int)
    object FillSeasons {
      sealed trait Messages    //功能消息
      case object HowYouFeel extends Messages
      case object NextMonth extends Messages
      def props = Props(new FillSeasons)
    class FillSeasons extends FSM[Seasons,SeasonInfo] with ActorLogging {
      import FillSeasons._
      startWith(Spring,SeasonInfo(0,1))  //起始状态
      when(Spring) {   //状态在春季
        case Event(HowYouFeel,seasonInfo) =>
          val numtalks = seasonInfo.talks + 1
          log.info(s"It's ${stateName.toString}, feel so gooood! You've asked me ${numtalks} times.")
          stay using seasonInfo.copy(talks = numtalks)
      when(Summer) {  //夏季状态
        case Event(HowYouFeel,_) =>
          val numtalks = stateData.talks + 1
          log.info(s"It's ${stateName.toString}, it's so hot! You've asked me ${numtalks} times")
          stay().using(stateData.copy(talks = numtalks))
      when(Fall) {  //秋季状态
        case Event(HowYouFeel,SeasonInfo(tks,mnth)) =>
          val numtalks = tks + 1
          log.info(s"It's ${stateName.toString}, it's no so bad. You've asked me ${numtalks} times.")
          stay using SeasonInfo(numtalks,mnth)
      when(Winter) {  //冬季状态
        case Event(HowYouFeel,si@ SeasonInfo(tks,_)) =>
          val numtalks = tks + 1
          log.info(s"It's ${stateName.toString}, it's freezing cold! You've asked me ${numtalks} times.")
          stay using si.copy(talks = numtalks)
      whenUnhandled {  //所有状态未处理的Event
        case Event(NextMonth,seasonInfo) =>
          val mth = seasonInfo.month
          if (mth <= 3) {
            log.info(s"It's month ${mth+1} of ${stateName.toString}")
            stay using seasonInfo.copy(month = mth + 1)
          else {
            goto(nextSeason(stateName)) using SeasonInfo(0,1)
      onTransition {
        case Spring -> Summer => log.info(s"Season changed from Spring to Summer month ${nextStateData.month}")
        case Summer -> Fall => log.info(s"Season changed from Summer to Fall month ${nextStateData.month}")
        case Fall -> Winter => log.info(s"Season changed from Fall to Winter month ${nextStateData.month}")
        case Winter -> Spring => log.info(s"Season changed from Winter to Spring month ${nextStateData.month}")
      initialize()  //设定起始状态
      log.info(s"It's month 1 of ${stateName.toString}")
      def nextSeason(season: Seasons): Seasons =
        season match {
          case Spring => Summer
          case Summer => Fall
          case Fall => Winter
          case Winter => Spring
    object FSMDemo extends App {
      import scala.util.Random
      val fsmSystem = ActorSystem("fsmSystem")
      val fsmActor = fsmSystem.actorOf(FillSeasons.props,"fsmActor")
      (1 to 15).foreach { _ =>
        (1 to Random.nextInt(3)).foreach{ _ =>
          fsmActor ! FillSeasons.HowYouFeel
        fsmActor ! FillSeasons.NextMonth





