      前面提到过,akka-typed中较重要的改变是加入了EventSourcedBehavior。也就是说增加了一种专门负责EventSource模式的actor, 最终和其它种类的actor一道可以完美实现CQRS。新的actor,我还是把它称为persistentActor,还是一种能维护和维持运行状态的actor。即,actor内部状态可以存放在数据库里,然后通过一组功能函数来提供对状态的处理转变,即持续化处理persistence。当然作为一种具备EventSourcedBehavior的actor, 普遍应有的actor属性、方法、消息处理协议、监管什么的都还必须存在。在这篇讨论里我们就通过案例和源码来说明一下EventSourcedBehavior是如何维护内部状态及作为一种actor又应该怎么去使用它。


    object ItemInfo {
      case class Item(name: String, price: Double)
    object MyCart {
     import ItemInfo._
      sealed trait Command 
      sealed trait Event extends CborSerializable
      sealed trait Response 
      case class AddItem(item: Item) extends Command
      case object PayCart extends Command
      case class CountItems(replyTo: ActorRef[Response]) extends Command
      case class ItemAdded(item: Item) extends Event
      case object CartPaid extends Event
      case class CartLoad(load: List[Item] = Nil)
      case class PickedItems(items: List[Item]) extends Response
      case object CartEmpty extends Response
      val commandHandler: (CartLoad, Command) => Effect[Event,CartLoad] = { (state, cmd) =>
        cmd match {
          case AddItem(item) =>
          case PayCart =>
          case CountItems(replyTo) =>
            Effect.none.thenRun { cart =>
              cart.load match {
                case Nil =>
                  replyTo ! CartEmpty
                case listOfItems =>
                  replyTo ! PickedItems(listOfItems)
      val eventHandler: (CartLoad,Event) => CartLoad = { (state,evt) =>
        evt match {
          case ItemAdded(item) =>
             state.copy(load = item :: state.load)
          case CartPaid =>
            state.copy(load = Nil)
      def apply(): Behavior[Command] = EventSourcedBehavior[Command,Event,CartLoad](
        persistenceId = PersistenceId("10","1013"),
        emptyState = CartLoad(),
        commandHandler = commandHandler,
        eventHandler = eventHandler
    object Shopper {
      import ItemInfo._
      sealed trait Command extends CborSerializable
      case class GetItem(item: Item) extends Command
      case object Settle extends Command
      case object GetCount extends Command
      case class WrappedResponse(res: MyCart.Response) extends Command
      def apply(): Behavior[Command] = Behaviors.setup[Command] { ctx =>
        val shoppingCart = ctx.spawn(MyCart(), "shopping-cart")
        val cartRef: ActorRef[MyCart.Response] = ctx.messageAdapter(WrappedResponse)
        Behaviors.receiveMessage { msg =>
          msg match {
            case GetItem(item) =>
              shoppingCart ! MyCart.AddItem(item)
            case Settle =>
              shoppingCart ! MyCart.PayCart
            case GetCount =>
              shoppingCart ! MyCart.CountItems(cartRef)
            case WrappedResponse(res) => res match {
              case MyCart.PickedItems(items) =>
                ctx.log.info("**************Current: {}*************", items)
              case MyCart.CartEmpty =>
                ctx.log.info("**************shopping cart is empty!***************")
    object ShoppingCart extends App {
      import ItemInfo._
      val shopper = ActorSystem(Shopper(),"shopper")
      shopper ! Shopper.GetItem(Item("banana",11.20))
      shopper ! Shopper.GetItem(Item("watermelon",4.70))
      shopper ! Shopper.GetCount
      shopper ! Shopper.Settle
      shopper ! Shopper.GetCount


    private def withdraw(acc: OpenedAccount, cmd: Withdraw): ReplyEffect[Event, Account] = {
      if (acc.canWithdraw(cmd.amount))
        Effect.persist(Withdrawn(cmd.amount)).thenReply(cmd.replyTo)(_ => Confirmed)
        Effect.reply(cmd.replyTo)(Rejected(s"Insufficient balance ${acc.balance} to be able to withdraw ${cmd.amount}"))


    说到side-effect, 如Effect.persist().thenRun(produceSideEffect): 当成功持续化event后可以安心进行一些其它的操作。例如,当影响库存数的event被persist后可以马上从账上扣减库存。

    在上面这个ShoppingCart例子里我们没有发现状态转换代码如Behaviors.same。这只能是EventSourcedBehavior属于更高层次的Behavior,状态转换已经嵌入在eventHandler里了,还记着这个函数的款式吧  (State,Event) => State, 这个State就是状态了。

    Events persist在journal里,如果persist操作中journal出现异常,EventSourcedBehavior自备了安全监管策略,如下:

      def apply(): Behavior[Command] = EventSourcedBehavior[Command,Event,CartLoad](
        persistenceId = PersistenceId("10","1013"),
        emptyState = CartLoad(),
        commandHandler = commandHandler,
        eventHandler = eventHandler
        .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)


      def apply(): Behavior[Command] =
          Behaviors.setup { ctx =>
            EventSourcedBehavior[Command, Event, CartLoad](
              persistenceId = PersistenceId("10", "1013"),
              emptyState = CartLoad(),
              commandHandler = commandHandler,
              eventHandler = eventHandler
                .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
            .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)



     def apply(): Behavior[Command] =
          Behaviors.setup[Command] { ctx =>
            EventSourcedBehavior[Command, Event, CartLoad](
              persistenceId = PersistenceId("10", "1013"),
              emptyState = CartLoad(),
              commandHandler = commandHandler,
              eventHandler = eventHandler
                .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
            ).receiveSignal {
              case (state, RecoveryCompleted) =>
                ctx.log.info("**************Recovery Completed with state: {}***************",state)
              case (state, SnapshotCompleted(meta))  =>
                ctx.log.info("**************Snapshot Completed with state: {},id({},{})***************",state,meta.persistenceId, meta.sequenceNr)
              case (state,RecoveryFailed(err)) =>
                ctx.log.error("recovery failed with: {}",err.getMessage)
              case (state,SnapshotFailed(meta,err)) =>
                ctx.log.error("snapshoting failed with: {}",err.getMessage)
            .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)


      def receiveSignal(signalHandler: PartialFunction[(State, Signal), Unit]): EventSourcedBehavior[Command, Event, State]

    下面是一个EventSourcedBehavior Signal 清单:

    sealed trait EventSourcedSignal extends Signal
    @DoNotInherit sealed abstract class RecoveryCompleted extends EventSourcedSignal
    case object RecoveryCompleted extends RecoveryCompleted {
      def instance: RecoveryCompleted = this
    final case class RecoveryFailed(failure: Throwable) extends EventSourcedSignal {
      def getFailure(): Throwable = failure
    final case class SnapshotCompleted(metadata: SnapshotMetadata) extends EventSourcedSignal {
      def getSnapshotMetadata(): SnapshotMetadata = metadata
    final case class SnapshotFailed(metadata: SnapshotMetadata, failure: Throwable) extends EventSourcedSignal {
      def getFailure(): Throwable = failure
      def getSnapshotMetadata(): SnapshotMetadata = metadata
    object SnapshotMetadata {
       * @param persistenceId id of persistent actor from which the snapshot was taken.
       * @param sequenceNr sequence number at which the snapshot was taken.
       * @param timestamp time at which the snapshot was saved, defaults to 0 when unknown.
       *                  in milliseconds from the epoch of 1970-01-01T00:00:00Z.
      def apply(persistenceId: String, sequenceNr: Long, timestamp: Long): SnapshotMetadata =
        new SnapshotMetadata(persistenceId, sequenceNr, timestamp)
     * Snapshot metadata.
     * @param persistenceId id of persistent actor from which the snapshot was taken.
     * @param sequenceNr sequence number at which the snapshot was taken.
     * @param timestamp time at which the snapshot was saved, defaults to 0 when unknown.
     *                  in milliseconds from the epoch of 1970-01-01T00:00:00Z.
    final class SnapshotMetadata(val persistenceId: String, val sequenceNr: Long, val timestamp: Long) {
      override def toString: String =
    final case class DeleteSnapshotsCompleted(target: DeletionTarget) extends EventSourcedSignal {
      def getTarget(): DeletionTarget = target
    final case class DeleteSnapshotsFailed(target: DeletionTarget, failure: Throwable) extends EventSourcedSignal {
      def getFailure(): Throwable = failure
      def getTarget(): DeletionTarget = target
    final case class DeleteEventsCompleted(toSequenceNr: Long) extends EventSourcedSignal {
      def getToSequenceNr(): Long = toSequenceNr
    final case class DeleteEventsFailed(toSequenceNr: Long, failure: Throwable) extends EventSourcedSignal {
      def getFailure(): Throwable = failure
      def getToSequenceNr(): Long = toSequenceNr


      def apply(): Behavior[Command] =
          Behaviors.setup[Command] { ctx =>
            EventSourcedBehavior[Command, Event, CartLoad](
              persistenceId = PersistenceId("10", "1013"),
              emptyState = CartLoad(),
              commandHandler = commandHandler,
              eventHandler = eventHandler
                .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
            ).receiveSignal {
              case (state, RecoveryCompleted) =>
                ctx.log.info("**************Recovery Completed with state: {}***************",state)
              case (state, SnapshotCompleted(meta))  =>
                ctx.log.info("**************Snapshot Completed with state: {},id({},{})***************",state,meta.persistenceId, meta.sequenceNr)
              case (state,RecoveryFailed(err)) =>
                ctx.log.error("recovery failed with: {}",err.getMessage)
              case (state,SnapshotFailed(meta,err)) =>
                ctx.log.error("snapshoting failed with: {}",err.getMessage)
            }.snapshotWhen {
              case (state,CartPaid,seqnum) =>
                ctx.log.info("*****************snapshot taken at: {} with state: {}",seqnum,state)
              case (state,event,seqnum) => false
            }.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2))
            .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)



    name := "learn-akka-typed"
    version := "0.1"
    scalaVersion := "2.13.1"
    scalacOptions in Compile ++= Seq("-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint")
    javacOptions in Compile ++= Seq("-Xlint:unchecked", "-Xlint:deprecation")
    val AkkaVersion = "2.6.5"
    val AkkaPersistenceCassandraVersion = "1.0.0"
    libraryDependencies ++= Seq(
      "com.typesafe.akka" %% "akka-cluster-sharding-typed" % AkkaVersion,
      "com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion,
      "com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion,
      "com.typesafe.akka" %% "akka-serialization-jackson" % AkkaVersion,
      "com.typesafe.akka" %% "akka-persistence-cassandra" % AkkaPersistenceCassandraVersion,
      "com.typesafe.akka" %% "akka-slf4j" % AkkaVersion,
      "ch.qos.logback"     % "logback-classic"             % "1.2.3"


    akka.actor.allow-java-serialization = on
    akka {
      loglevel = DEBUG
      actor {
        serialization-bindings {
          "com.learn.akka.CborSerializable" = jackson-cbor
      # use Cassandra to store both snapshots and the events of the persistent actors
      persistence {
        journal.plugin = "akka.persistence.cassandra.journal"
        snapshot-store.plugin = "akka.persistence.cassandra.snapshot"
    akka.persistence.cassandra {
      # don't use autocreate in production
      journal.keyspace = "poc"
      journal.keyspace-autocreate = on
      journal.tables-autocreate = on
      snapshot.keyspace = "poc_snapshot"
      snapshot.keyspace-autocreate = on
      snapshot.tables-autocreate = on
    datastax-java-driver {
      basic.contact-points = [""]
      basic.load-balancing-policy.local-datacenter = "datacenter1"


    package com.learn.akka
    import akka.actor.typed._
    import akka.persistence.typed._
    import akka.actor.typed.scaladsl.Behaviors
    import akka.persistence.typed.scaladsl._
    import scala.concurrent.duration._
    object ItemInfo {
      case class Item(name: String, price: Double)
    object MyCart {
     import ItemInfo._
      sealed trait Command
      sealed trait Event extends CborSerializable
      sealed trait Response
      case class AddItem(item: Item) extends Command
      case object PayCart extends Command
      case class CountItems(replyTo: ActorRef[Response]) extends Command
      case class ItemAdded(item: Item) extends Event
      case object CartPaid extends Event
      case class CartLoad(load: List[Item] = Nil)
      case class PickedItems(items: List[Item]) extends Response
      case object CartEmpty extends Response
      val commandHandler: (CartLoad, Command) => Effect[Event,CartLoad] = { (state, cmd) =>
        cmd match {
          case AddItem(item) =>
          case PayCart =>
          case CountItems(replyTo) =>
            Effect.none.thenRun { cart =>
              cart.load match {
                case Nil =>
                  replyTo ! CartEmpty
                case listOfItems =>
                  replyTo ! PickedItems(listOfItems)
      val eventHandler: (CartLoad,Event) => CartLoad = { (state,evt) =>
        evt match {
          case ItemAdded(item) =>
             state.copy(load = item :: state.load)
          case CartPaid =>
            state.copy(load = Nil)
      def apply(): Behavior[Command] =
          Behaviors.setup[Command] { ctx =>
            EventSourcedBehavior[Command, Event, CartLoad](
              persistenceId = PersistenceId("10", "1013"),
              emptyState = CartLoad(),
              commandHandler = commandHandler,
              eventHandler = eventHandler
                .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
            ).receiveSignal {
              case (state, RecoveryCompleted) =>
                ctx.log.info("**************Recovery Completed with state: {}***************",state)
              case (state, SnapshotCompleted(meta))  =>
                ctx.log.info("**************Snapshot Completed with state: {},id({},{})***************",state,meta.persistenceId, meta.sequenceNr)
              case (state,RecoveryFailed(err)) =>
                ctx.log.error("recovery failed with: {}",err.getMessage)
              case (state,SnapshotFailed(meta,err)) =>
                ctx.log.error("snapshoting failed with: {}",err.getMessage)
            }.snapshotWhen {
              case (state,CartPaid,seqnum) =>
                ctx.log.info("*****************snapshot taken at: {} with state: {}",seqnum,state)
              case (state,event,seqnum) => false
            }.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2))
            .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
    object Shopper {
      import ItemInfo._
      sealed trait Command extends CborSerializable
      case class GetItem(item: Item) extends Command
      case object Settle extends Command
      case object GetCount extends Command
      case class WrappedResponse(res: MyCart.Response) extends Command
      def apply(): Behavior[Command] = Behaviors.setup[Command] { ctx =>
        val shoppingCart = ctx.spawn(MyCart(), "shopping-cart")
        val cartRef: ActorRef[MyCart.Response] = ctx.messageAdapter(WrappedResponse)
        Behaviors.receiveMessage { msg =>
          msg match {
            case GetItem(item) =>
              shoppingCart ! MyCart.AddItem(item)
            case Settle =>
              shoppingCart ! MyCart.PayCart
            case GetCount =>
              shoppingCart ! MyCart.CountItems(cartRef)
            case WrappedResponse(res) => res match {
              case MyCart.PickedItems(items) =>
                ctx.log.info("**************Current: {}*************", items)
              case MyCart.CartEmpty =>
                ctx.log.info("**************shopping cart is empty!***************")
    object ShoppingCart extends App {
      import ItemInfo._
      val shopper = ActorSystem(Shopper(),"shopper")
      shopper ! Shopper.GetItem(Item("banana",11.20))
      shopper ! Shopper.GetItem(Item("watermelon",4.70))
      shopper ! Shopper.GetCount
      shopper ! Shopper.Settle
      shopper ! Shopper.GetCount
