zoukankan      html  css  js  c++  java
  • Akka(14): 持久化模式:PersistentActor


    PersistentActor trait定义如下:

     * Scala API: A persistent Actor - can be used to implement command or event sourcing.
    trait PersistentActor extends Eventsourced with PersistenceIdentity {
      def receive = receiveCommand
       * Asynchronously persists `event`. On successful persistence, `handler` is called with the
       * persisted event. It is guaranteed that no new commands will be received by a persistent actor
       * between a call to `persist` and the execution of its `handler`. This also holds for
       * multiple `persist` calls per received command. Internally, this is achieved by stashing new
       * commands and unstashing them when the `event` has been persisted and handled. The stash used
       * for that is an internal stash which doesn't interfere with the inherited user stash.
       * An event `handler` may close over persistent actor state and modify it. The `sender` of a persisted
       * event is the sender of the corresponding command. This means that one can reply to a command
       * sender within an event `handler`.
       * Within an event handler, applications usually update persistent actor state using persisted event
       * data, notify listeners and reply to command senders.
       * If persistence of an event fails, [[#onPersistFailure]] will be invoked and the actor will
       * unconditionally be stopped. The reason that it cannot resume when persist fails is that it
       * is unknown if the even was actually persisted or not, and therefore it is in an inconsistent
       * state. Restarting on persistent failures will most likely fail anyway, since the journal
       * is probably unavailable. It is better to stop the actor and after a back-off timeout start
       * it again.
       * @param event event to be persisted
       * @param handler handler for each persisted `event`
      def persist[A](event: A)(handler: A ⇒ Unit): Unit = {
       * Asynchronously persists `events` in specified order. This is equivalent to calling
       * `persist[A](event: A)(handler: A => Unit)` multiple times with the same `handler`,
       * except that `events` are persisted atomically with this method.
       * @param events events to be persisted
       * @param handler handler for each persisted `events`
      def persistAll[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = {
       * Asynchronously persists `event`. On successful persistence, `handler` is called with the
       * persisted event.
       * Unlike `persist` the persistent actor will continue to receive incoming commands between the
       * call to `persist` and executing it's `handler`. This asynchronous, non-stashing, version of
       * of persist should be used when you favor throughput over the "command-2 only processed after
       * command-1 effects' have been applied" guarantee, which is provided by the plain `persist` method.
       * An event `handler` may close over persistent actor state and modify it. The `sender` of a persisted
       * event is the sender of the corresponding command. This means that one can reply to a command
       * sender within an event `handler`.
       * If persistence of an event fails, [[#onPersistFailure]] will be invoked and the actor will
       * unconditionally be stopped. The reason that it cannot resume when persist fails is that it
       * is unknown if the even was actually persisted or not, and therefore it is in an inconsistent
       * state. Restarting on persistent failures will most likely fail anyway, since the journal
       * is probably unavailable. It is better to stop the actor and after a back-off timeout start
       * it again.
       * @param event event to be persisted
       * @param handler handler for each persisted `event`
      def persistAsync[A](event: A)(handler: A ⇒ Unit): Unit = {
       * Asynchronously persists `events` in specified order. This is equivalent to calling
       * `persistAsync[A](event: A)(handler: A => Unit)` multiple times with the same `handler`,
       * except that `events` are persisted atomically with this method.
       * @param events events to be persisted
       * @param handler handler for each persisted `events`
      def persistAllAsync[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = {
       * Defer the handler execution until all pending handlers have been executed.
       * Allows to define logic within the actor, which will respect the invocation-order-guarantee
       * in respect to `persistAsync` or `persist` calls. That is, if `persistAsync` or `persist` was invoked before `deferAsync`,
       * the corresponding handlers will be invoked in the same order as they were registered in.
       * This call will NOT result in `event` being persisted, use `persist` or `persistAsync` instead
       * if the given event should possible to replay.
       * If there are no pending persist handler calls, the handler will be called immediately.
       * If persistence of an earlier event fails, the persistent actor will stop, and the `handler`
       * will not be run.
       * @param event event to be handled in the future, when preceding persist operations have been processes
       * @param handler handler for the given `event`
      def deferAsync[A](event: A)(handler: A ⇒ Unit): Unit = {



     * Scala API and implementation details of [[PersistentActor]] and [[AbstractPersistentActor]].
    private[persistence] trait Eventsourced extends Snapshotter with PersistenceStash with PersistenceIdentity with PersistenceRecovery {
       * Recovery handler that receives persisted events during recovery. If a state snapshot
       * has been captured and saved, this handler will receive a [[SnapshotOffer]] message
       * followed by events that are younger than the offered snapshot.
       * This handler must not have side-effects other than changing persistent actor state i.e. it
       * should not perform actions that may fail, such as interacting with external services,
       * for example.
       * If there is a problem with recovering the state of the actor from the journal, the error
       * will be logged and the actor will be stopped.
       * @see [[Recovery]]
      def receiveRecover: Receive
       * Command handler. Typically validates commands against current state (and/or by
       * communication with other actors). On successful validation, one or more events are
       * derived from a command and these events are then persisted by calling `persist`.
      def receiveCommand: Receive


       * Called whenever a message replay fails. By default it logs the error.
       * Subclass may override to customize logging.
       * The actor is always stopped after this method has been invoked.
       * @param cause failure cause.
       * @param event the event that was processed in `receiveRecover`, if the exception
       *   was thrown there
      protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Unit =
       * Called when persist fails. By default it logs the error.
       * Subclass may override to customize logging and for example send negative
       * acknowledgment to sender.
       * The actor is always stopped after this method has been invoked.
       * Note that the event may or may not have been saved, depending on the type of
       * failure.
       * @param cause failure cause.
       * @param event the event that was to be persisted
      protected def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit = {
       * Called when the journal rejected `persist` of an event. The event was not
       * stored. By default this method logs the problem as a warning, and the actor continues.
       * The callback handler that was passed to the `persist` method will not be invoked.
       * @param cause failure cause
       * @param event the event that was to be persisted
      protected def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = {


    trait PersistenceRecovery {
       * Called when the persistent actor is started for the first time.
       * The returned [[Recovery]] object defines how the Actor will recover its persistent state before
       * handling the first incoming message.
       * To skip recovery completely return `Recovery.none`.
      def recovery: Recovery = Recovery()


          override def stateReceive(receive: Receive, message: Any) = try message match {
            case ReplayedMessage(p) ⇒
              try {
                eventSeenInInterval = true
                Eventsourced.super.aroundReceive(recoveryBehavior, p)
              } catch {
                case NonFatal(t) ⇒
                  try onRecoveryFailure(t, Some(p.payload)) finally context.stop(self)
            case RecoverySuccess(highestSeqNr) ⇒
              onReplaySuccess() // callback for subclass implementation
              sequenceNr = highestSeqNr
              _recoveryRunning = false
              try Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted)
              finally transitToProcessingState()
            case ReplayMessagesFailure(cause) ⇒
              try onRecoveryFailure(cause, event = None) finally context.stop(self)
            case RecoveryTick(false) if !eventSeenInInterval ⇒
              try onRecoveryFailure(
                new RecoveryTimedOut(s"Recovery timed out, didn't get event within $timeout, highest sequence number seen $lastSequenceNr"),
                event = None)
              finally context.stop(self)
            case RecoveryTick(false) ⇒
              eventSeenInInterval = false
            case RecoveryTick(true) ⇒
            // snapshot tick, ignore
            case other ⇒
          } catch {
            case NonFatal(e) ⇒
              throw e


       * INTERNAL API.
       * Can be overridden to intercept calls to this actor's current behavior.
       * @param receive current behavior.
       * @param msg current message.
      protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = {
        // optimization: avoid allocation of lambda
        if (receive.applyOrElse(msg, Actor.notHandledFun).asInstanceOf[AnyRef] eq Actor.NotHandled) {

    EventSourced又继承了PersistenceRecovery trait,所以重载recovery函数可以改变状态恢复行为。默认的模式是:

     * Recovery mode configuration object to be returned in [[PersistentActor#recovery]].
     * By default recovers from latest snapshot replays through to the last available event (last sequenceId).
     * Recovery will start from a snapshot if the persistent actor has previously saved one or more snapshots
     * and at least one of these snapshots matches the specified `fromSnapshot` criteria.
     * Otherwise, recovery will start from scratch by replaying all stored events.
     * If recovery starts from a snapshot, the persistent actor is offered that snapshot with a [[SnapshotOffer]]
     * message, followed by replayed messages, if any, that are younger than the snapshot, up to the
     * specified upper sequence number bound (`toSequenceNr`).
     * @param fromSnapshot criteria for selecting a saved snapshot from which recovery should start. Default
     *                     is latest (= youngest) snapshot.
     * @param toSequenceNr upper sequence number bound (inclusive) for recovery. Default is no upper bound.
     * @param replayMax maximum number of messages to replay. Default is no limit.
    final case class Recovery(
      fromSnapshot: SnapshotSelectionCriteria = SnapshotSelectionCriteria.Latest,
      toSequenceNr: Long                      = Long.MaxValue,
      replayMax:    Long                      = Long.MaxValue)


     * Sent to a [[PersistentActor]] when the journal replay has been finished.
    case object RecoveryCompleted extends RecoveryCompleted {
    final class RecoveryTimedOut(message: String) extends RuntimeException(message) with NoStackTrace


       * Returns `true` if this persistent actor is currently recovering.
      def recoveryRunning: Boolean = {
        // currentState is null if this is called from constructor
        if (currentState == null) true else currentState.recoveryRunning
       * Returns `true` if this persistent actor has successfully finished recovery.
      def recoveryFinished: Boolean = !recoveryRunning


       * Permanently deletes all persistent messages with sequence numbers less than or equal `toSequenceNr`.
       * If the delete is successful a [[DeleteMessagesSuccess]] will be sent to the actor.
       * If the delete fails a [[DeleteMessagesFailure]] will be sent to the actor.
       * @param toSequenceNr upper sequence number bound of persistent messages to be deleted.
      def deleteMessages(toSequenceNr: Long): Unit =
        journal ! DeleteMessagesTo(persistenceId, toSequenceNr, self)


       * Returns `persistenceId`.
      override def snapshotterId: String = persistenceId
       * Highest received sequence number so far or `0L` if this actor hasn't replayed
       * or stored any persistent events yet.
      def lastSequenceNr: Long = _lastSequenceNr
       * Returns `lastSequenceNr`.
      def snapshotSequenceNr: Long = lastSequenceNr


     * Reply message to a successful [[Eventsourced#deleteMessages]] request.
    final case class DeleteMessagesSuccess(toSequenceNr: Long)
     * Reply message to a failed [[Eventsourced#deleteMessages]] request.
    final case class DeleteMessagesFailure(cause: Throwable, toSequenceNr: Long)


       * Snapshotter id.
      def snapshotterId: String
       * Sequence number to use when taking a snapshot.
      def snapshotSequenceNr: Long
       * Instructs the snapshot store to load the specified snapshot and send it via an [[SnapshotOffer]]
       * to the running [[PersistentActor]].
      def loadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long) =
        snapshotStore ! LoadSnapshot(persistenceId, criteria, toSequenceNr)
       * Saves a `snapshot` of this snapshotter's state.
       * The [[PersistentActor]] will be notified about the success or failure of this
       * via an [[SaveSnapshotSuccess]] or [[SaveSnapshotFailure]] message.
      def saveSnapshot(snapshot: Any): Unit = {
        snapshotStore ! SaveSnapshot(SnapshotMetadata(snapshotterId, snapshotSequenceNr), snapshot)
       * Deletes the snapshot identified by `sequenceNr`.
       * The [[PersistentActor]] will be notified about the status of the deletion
       * via an [[DeleteSnapshotSuccess]] or [[DeleteSnapshotFailure]] message.
      def deleteSnapshot(sequenceNr: Long): Unit = {
        snapshotStore ! DeleteSnapshot(SnapshotMetadata(snapshotterId, sequenceNr))
       * Deletes all snapshots matching `criteria`.
       * The [[PersistentActor]] will be notified about the status of the deletion
       * via an [[DeleteSnapshotsSuccess]] or [[DeleteSnapshotsFailure]] message.
      def deleteSnapshots(criteria: SnapshotSelectionCriteria): Unit = {
        snapshotStore ! DeleteSnapshots(snapshotterId, criteria)


     * Snapshot metadata.
     * @param persistenceId id of persistent actor from which the snapshot was taken.
     * @param sequenceNr sequence number at which the snapshot was taken.
     * @param timestamp time at which the snapshot was saved, defaults to 0 when unknown.
    @SerialVersionUID(1L) //#snapshot-metadata
    final case class SnapshotMetadata(persistenceId: String, sequenceNr: Long, timestamp: Long = 0L)
    object SnapshotMetadata {
      implicit val ordering: Ordering[SnapshotMetadata] = Ordering.fromLessThan[SnapshotMetadata] { (a, b) ⇒
        if (a eq b) false
        else if (a.persistenceId != b.persistenceId) a.persistenceId.compareTo(b.persistenceId) < 0
        else if (a.sequenceNr != b.sequenceNr) a.sequenceNr < b.sequenceNr
        else if (a.timestamp != b.timestamp) a.timestamp < b.timestamp
        else false
     * Sent to a [[PersistentActor]] after successful saving of a snapshot.
     * @param metadata snapshot metadata.
    final case class SaveSnapshotSuccess(metadata: SnapshotMetadata)
      extends SnapshotProtocol.Response
     * Sent to a [[PersistentActor]] after successful deletion of a snapshot.
     * @param metadata snapshot metadata.
    final case class DeleteSnapshotSuccess(metadata: SnapshotMetadata)
      extends SnapshotProtocol.Response
     * Sent to a [[PersistentActor]] after successful deletion of specified range of snapshots.
     * @param criteria snapshot selection criteria.
    final case class DeleteSnapshotsSuccess(criteria: SnapshotSelectionCriteria)
      extends SnapshotProtocol.Response
     * Sent to a [[PersistentActor]] after failed saving of a snapshot.
     * @param metadata snapshot metadata.
     * @param cause failure cause.
    final case class SaveSnapshotFailure(metadata: SnapshotMetadata, cause: Throwable)
      extends SnapshotProtocol.Response
     * Sent to a [[PersistentActor]] after failed deletion of a snapshot.
     * @param metadata snapshot metadata.
     * @param cause failure cause.
    final case class DeleteSnapshotFailure(metadata: SnapshotMetadata, cause: Throwable)
      extends SnapshotProtocol.Response
     * Sent to a [[PersistentActor]] after failed deletion of a range of snapshots.
     * @param criteria snapshot selection criteria.
     * @param cause failure cause.
    final case class DeleteSnapshotsFailure(criteria: SnapshotSelectionCriteria, cause: Throwable)
      extends SnapshotProtocol.Response
     * Offers a [[PersistentActor]] a previously saved `snapshot` during recovery. This offer is received
     * before any further replayed messages.
    final case class SnapshotOffer(metadata: SnapshotMetadata, snapshot: Any)
     * Selection criteria for loading and deleting snapshots.
     * @param maxSequenceNr upper bound for a selected snapshot's sequence number. Default is no upper bound,
     *   i.e. `Long.MaxValue`
     * @param maxTimestamp upper bound for a selected snapshot's timestamp. Default is no upper bound,
     *   i.e. `Long.MaxValue`
     * @param minSequenceNr lower bound for a selected snapshot's sequence number. Default is no lower bound,
     *   i.e. `0L`
     * @param minTimestamp lower bound for a selected snapshot's timestamp. Default is no lower bound,
     *   i.e. `0L`
     * @see [[Recovery]]
    final case class SnapshotSelectionCriteria(
      maxSequenceNr: Long = Long.MaxValue,
      maxTimestamp:  Long = Long.MaxValue,
      minSequenceNr: Long = 0L,
      minTimestamp:  Long = 0L) {
       * INTERNAL API.
      private[persistence] def limit(toSequenceNr: Long): SnapshotSelectionCriteria =
        if (toSequenceNr < maxSequenceNr) copy(maxSequenceNr = toSequenceNr) else this
       * INTERNAL API.
      private[persistence] def matches(metadata: SnapshotMetadata): Boolean =
        metadata.sequenceNr <= maxSequenceNr && metadata.timestamp <= maxTimestamp &&
          metadata.sequenceNr >= minSequenceNr && metadata.timestamp >= minTimestamp
    object SnapshotSelectionCriteria {
       * The latest saved snapshot.
      val Latest = SnapshotSelectionCriteria()
       * No saved snapshot matches.
      val None = SnapshotSelectionCriteria(0L, 0L)
       * Java API.
      def create(maxSequenceNr: Long, maxTimestamp: Long) =
        SnapshotSelectionCriteria(maxSequenceNr, maxTimestamp)
       * Java API.
      def create(maxSequenceNr: Long, maxTimestamp: Long,
                 minSequenceNr: Long, minTimestamp: Long) =
        SnapshotSelectionCriteria(maxSequenceNr, maxTimestamp, minSequenceNr, minTimestamp)
       * Java API.
      def latest() = Latest
       * Java API.
      def none() = None
     * Plugin API: a selected snapshot matching [[SnapshotSelectionCriteria]].
     * @param metadata snapshot metadata.
     * @param snapshot snapshot.
    final case class SelectedSnapshot(metadata: SnapshotMetadata, snapshot: Any)
    object SelectedSnapshot {
       * Java API, Plugin API.
      def create(metadata: SnapshotMetadata, snapshot: Any): SelectedSnapshot =
        SelectedSnapshot(metadata, snapshot)
     * Defines messages exchanged between persistent actors and a snapshot store.
    private[persistence] object SnapshotProtocol {
      /** Marker trait shared by internal snapshot messages. */
      sealed trait Message extends Protocol.Message
      /** Internal snapshot command. */
      sealed trait Request extends Message
      /** Internal snapshot acknowledgement. */
      sealed trait Response extends Message
       * Instructs a snapshot store to load a snapshot.
       * @param persistenceId persistent actor id.
       * @param criteria criteria for selecting a snapshot from which recovery should start.
       * @param toSequenceNr upper sequence number bound (inclusive) for recovery.
      final case class LoadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long)
        extends Request
       * Response message to a [[LoadSnapshot]] message.
       * @param snapshot loaded snapshot, if any.
      final case class LoadSnapshotResult(snapshot: Option[SelectedSnapshot], toSequenceNr: Long)
        extends Response
       * Reply message to a failed [[LoadSnapshot]] request.
       * @param cause failure cause.
      final case class LoadSnapshotFailed(cause: Throwable) extends Response
       * Instructs snapshot store to save a snapshot.
       * @param metadata snapshot metadata.
       * @param snapshot snapshot.
      final case class SaveSnapshot(metadata: SnapshotMetadata, snapshot: Any)
        extends Request
       * Instructs snapshot store to delete a snapshot.
       * @param metadata snapshot metadata.
      final case class DeleteSnapshot(metadata: SnapshotMetadata)
        extends Request
       * Instructs snapshot store to delete all snapshots that match `criteria`.
       * @param persistenceId persistent actor id.
       * @param criteria criteria for selecting snapshots to be deleted.
      final case class DeleteSnapshots(persistenceId: String, criteria: SnapshotSelectionCriteria)
        extends Request


    import akka.actor._
    import akka.persistence._
    object Calculator {
      sealed trait Command
      case class Operand(x: Int) extends Command
      case class Add(x: Int) extends Command
      case class Sub(x: Int) extends Command
      case class Mul(x: Int) extends Command
      case class Div(x: Int) extends Command
      case class ShowResult(x: Double) extends Command
      sealed trait Event
      case class SetNum(x: Int) extends Event
      case class Added(x: Int) extends Event
      case class Subtracted(x: Int) extends Event
      case class Multiplied(x: Int) extends Event
      case class Divided(x: Int) extends Event
      case class State(result: Int) {
        def updateState(evt: Event): State = evt match {
          case SetNum(x) => copy(result = x)
          case Added(x) => copy(result = this.result + x)
          case Subtracted(x) => copy(result = this.result - x)
          case Multiplied(x) => copy(result = this.result * x)
          case Divided(x) => copy(result = this.result / x)
    class Calculator extends PersistentActor with ActorLogging {
    import Calculator._
      var state: State = State(0)
      override def persistenceId: String = "persistence-actor"
      override def receiveCommand: Receive = {
        case Operand(x) =>
          persist(SetNum(x)){evt => state = state.updateState(evt)}
        case Add(x) =>
          persist(Added(x)){evt => state = state.updateState(evt)}
        case Sub(x) =>
          persist(Subtracted(x)){evt => state = state.updateState(evt)}
        case Mul(x) =>
          persist(Multiplied(x)){evt => state = state.updateState(evt)}
        case Divided(x) if (x != 0) =>
          persist(Added(x)){evt => state = state.updateState(evt)}
      override def receiveRecover: Receive = {
        case evt: Event => state = state.updateState(evt)
        case SnapshotOffer(_, sts: State) => state = sts.copy(sts.result)

    以上代码基本上进行了Command和Event的直接对应。这是一种比较直观的关系对应方式。我们注意到只有在收到Div(x)指令时才进行了指令验证(x == 0)。因为这个例子比较简单,所以我们可以肯定只有指令Div对状态进行施用时才有可能造成异常。这样才能使我们比较直观地进行Command与Event关系对应。假如对内部状态的更新涉及到一些非常复杂的算法,我们无法肯定哪个指令会产生异常,那我们只有先进行运算指令得出一个结果,然后直接替换状态,这个动作肯定是安全的了。按这个逻辑,我们把上面的例子调整一下: 

    import akka.actor._
    import akka.persistence._
    object Calculator {
      sealed trait Command
      case class Operand(x: Int) extends Command
      case class Add(x: Int) extends Command
      case class Sub(x: Int) extends Command
      case class Mul(x: Int) extends Command
      case class Div(x: Int) extends Command
      case class ShowResult(x: Int) extends Command
      sealed trait Event
      case class SetResult(x: Int) extends Event
      def getResult(res: Int, cmd: Command): Int = cmd match {
        case Operand(x) => x
        case Add(x) => res + x
        case Sub(x) => res - x
        case Mul(x) => res * x
        case Div(x) => res / x
        case _ => 0
      case class State(result: Int) {
        def updateState(evt: Event): State = evt match {
          case SetResult(x) => copy(result = x)
    class Calculator extends PersistentActor with ActorLogging {
    import Calculator._
      var state: State = State(0)
      override def persistenceId: String = "persistence-actor"
      override def receiveCommand: Receive = {
        case opr: Operand =>
          persist(SetResult(getResult(state.result,opr)))(evt => state = state.updateState(evt))
        case add: Add =>
          persist(SetResult(getResult(state.result,add)))(evt => state = state.updateState(evt))
        case sub: Sub =>
          persist(SetResult(getResult(state.result,sub)))(evt => state = state.updateState(evt))
        case mul: Mul =>
          persist(SetResult(getResult(state.result,mul)))(evt => state = state.updateState(evt))
        case div: Div =>
          persist(SetResult(getResult(state.result,div)))(evt => state = state.updateState(evt))
      override def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit = {
        log.info(s"Persistence Error: ${cause.getMessage}")
      override def receiveRecover: Receive = {
        case evt: Event => state = state.updateState(evt)
        case SnapshotOffer(_, sts: State) => state = sts.copy(sts.result)



    class Calculator extends PersistentActor with ActorLogging {
      import Calculator._
      var state: State = State(0)
      override def persistenceId: String = "persistence-actor"
      val snapShotInterval = 5
      override def receiveCommand: Receive = {
        case Operand(x) => persist(SetNum(x))(handleEvent)
        case Add(x) => persist(Added(x))(handleEvent)
        case Sub(x) => persist(Subtracted(x))(handleEvent)
        case Mul(x) => persist(Multiplied(x))(handleEvent)
        case Div(x) if (x != 0) => persist(Divided(x))(handleEvent)
        case ShowResult =>
          context.system.eventStream.publish(LogMessage(s"Current state: $state"))
        case BackupResult =>
          context.system.eventStream.publish(LogMessage(s"Manual saving snapshot: $state"))
        case SaveSnapshotSuccess(metadata) =>
          context.system.eventStream.publish(LogMessage(s"Successfully saved state: $state"))
        case SaveSnapshotFailure(metadata, reason) =>
          context.system.eventStream.publish(LogMessage(s"Saving state: $state failed!"))
      def handleEvent(evt: Event) = {   //update state and publish progress
        state = state.updateState(evt)
        context.system.eventStream.publish(LogMessage(s"Logged event: $evt"))
        if (lastSequenceNr % snapShotInterval == 0 && lastSequenceNr != 0) {
          context.system.eventStream.publish(LogMessage(s"Saving snapshot: $state after $snapShotInterval events"))
      override def receiveRecover: Receive = {
        case evt: Event => {
          state = state.updateState(evt)
          context.system.eventStream.publish(LogMessage(s"Restoring event: $evt"))
        case SnapshotOffer(mdata, sts: State) => {
          state = sts.copy(sts.result)
          context.system.eventStream.publish(LogMessage(s"Restoring snapshot: $mdata"))
        case RecoveryCompleted => log.info(s"Recovery completed with starting state: $state")
      override def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = {
        log.info(s"Persistence Rejected: ${cause.getMessage}")
      override def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit = {
        log.info(s"Persistence Error: ${cause.getMessage}")



    package persistence.demo
    import akka.actor._
    import persistence.calculator.Calculator
    import persistence.tracker.EventTracker
    object persistenceDemo extends App {
      val persistenceSystem = ActorSystem("persistenceSystem")
      val calculator = persistenceSystem.actorOf(Calculator.props,"Calculator")
      calculator ! Calculator.Add(3)
      calculator ! Calculator.Add(7)
      calculator ! Calculator.Mul(3)
      calculator ! Calculator.Div(2)
      calculator ! Calculator.Sub(8)
      calculator ! Calculator.Mul(12)
      calculator ! Calculator.ShowResult


    Restoring event: Divided(2)
    Restoring event: Subtracted(8)
    Restoring event: Multiplied(12)
    [INFO] [07/21/2017 08:26:51.593] [persistenceSystem-akka.actor.default-dispatcher-4] [akka://persistenceSystem/user/Calculator] Recovery completed with starting state: State(732)
    Logged event: Added(3)
    Logged event: Added(7)
    Saving snapshot: State(742) after 5 events
    Logged event: Multiplied(3)
    Logged event: Divided(2)
    Logged event: Subtracted(8)
    Logged event: Multiplied(12)
    Current state: State(13260)
    Successfully saved state: State(13260)
    Restoring event: Multiplied(3)
    Restoring event: Divided(2)
    Restoring event: Subtracted(8)
    Restoring event: Multiplied(12)
    [INFO] [07/21/2017 08:31:50.068] [persistenceSystem-akka.actor.default-dispatcher-3] [akka://persistenceSystem/user/Calculator] Recovery completed with starting state: State(13260)
    Logged event: Added(3)
    Saving snapshot: State(13263) after 5 events
    Logged event: Added(7)
    Logged event: Multiplied(3)
    Logged event: Divided(2)
    Logged event: Subtracted(8)
    Logged event: Multiplied(12)
    Saving snapshot: State(238764) after 5 events
    Current state: State(238764)
    Successfully saved state: State(238764)
    Successfully saved state: State(238764)
    Restoring snapshot: SnapshotMetadata(persistence-actor,30,1500597110125)
    [INFO] [07/21/2017 08:33:20.585] [persistenceSystem-akka.actor.default-dispatcher-2] [akka://persistenceSystem/user/Calculator] Recovery completed with starting state: State(238764)
    08:33:20.730 [persistenceSystem-cassandra-plugin-default-dispatcher-10] INFO com.datastax.driver.core.utils.UUIDs - PID obtained through native call to getpid(): 1032
    Logged event: Added(3)
    Logged event: Added(7)
    Logged event: Multiplied(3)
    Logged event: Divided(2)
    Logged event: Subtracted(8)
    Saving snapshot: State(358153) after 5 events
    Logged event: Multiplied(12)
    Current state: State(4297836)
    Successfully saved state: State(4297836)




    name := "persistence-actor"
    version := "1.0"
    scalaVersion := "2.11.9"
    sbtVersion := "0.13.5"
    libraryDependencies ++= Seq(
      "com.typesafe.akka"           %% "akka-actor"       % "2.5.3",
      "com.typesafe.akka"           %% "akka-persistence" % "2.5.3",
      "ch.qos.logback" % "logback-classic" % "1.1.7",
      "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.54",
      "com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % "0.54" % Test


    akka {
      persistence {
        journal.plugin = "cassandra-journal"
        snapshot-store.plugin = "cassandra-snapshot-store"
    akka.actor.warn-about-java-serializer-usage = off


    package persistence.calculator
    import akka.actor._
    import akka.persistence._
    object Calculator {
      sealed trait Command
      case class Operand(x: Int) extends Command
      case class Add(x: Int) extends Command
      case class Sub(x: Int) extends Command
      case class Mul(x: Int) extends Command
      case class Div(x: Int) extends Command
      case class ShowResult(x: Double) extends Command
      case object BackupResult extends Command    //saveSnapshot
      sealed trait Event
      case class SetNum(x: Int) extends Event
      case class Added(x: Int) extends Event
      case class Subtracted(x: Int) extends Event
      case class Multiplied(x: Int) extends Event
      case class Divided(x: Int) extends Event
      case class State(result: Int) {
        def updateState(evt: Event): State = evt match {
          case SetNum(x) => copy(result = x)
          case Added(x) => copy(result = this.result + x)
          case Subtracted(x) => copy(result = this.result - x)
          case Multiplied(x) => copy(result = this.result * x)
          case Divided(x) => copy(result = this.result / x)
      case class LogMessage(msg: String)       //broadcase message type
      def props = Props(new Calculator)
    class Calculator extends PersistentActor with ActorLogging {
      import Calculator._
      var state: State = State(0)
      override def persistenceId: String = "persistence-actor"
      val snapShotInterval = 5
      override def receiveCommand: Receive = {
        case Operand(x) => persist(SetNum(x))(handleEvent)
        case Add(x) => persist(Added(x))(handleEvent)
        case Sub(x) => persist(Subtracted(x))(handleEvent)
        case Mul(x) => persist(Multiplied(x))(handleEvent)
        case Div(x) if (x != 0) => persist(Divided(x))(handleEvent)
        case ShowResult =>
          context.system.eventStream.publish(LogMessage(s"Current state: $state"))
        case BackupResult =>
          context.system.eventStream.publish(LogMessage(s"Manual saving snapshot: $state"))
        case SaveSnapshotSuccess(metadata) =>
          context.system.eventStream.publish(LogMessage(s"Successfully saved state: $state"))
        case SaveSnapshotFailure(metadata, reason) =>
          context.system.eventStream.publish(LogMessage(s"Saving state: $state failed!"))
      def handleEvent(evt: Event) = {   //update state and publish progress
        state = state.updateState(evt)
        context.system.eventStream.publish(LogMessage(s"Producing event: $evt"))
        if (lastSequenceNr % snapShotInterval == 0 && lastSequenceNr != 0) {
          context.system.eventStream.publish(LogMessage(s"Saving snapshot: $state after $snapShotInterval events"))
      override def receiveRecover: Receive = {
        case evt: Event => {
          state = state.updateState(evt)
          context.system.eventStream.publish(LogMessage(s"Restoring event: $evt"))
        case SnapshotOffer(mdata, sts: State) => {
          state = sts.copy(sts.result)
          context.system.eventStream.publish(LogMessage(s"Restoring snapshot: $mdata"))
        case RecoveryCompleted => log.info(s"Recovery completed with starting state: $state")
      override def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = {
        log.info(s"Persistence Rejected: ${cause.getMessage}")
      override def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit = {
        log.info(s"Persistence Error: ${cause.getMessage}")


    package persistence.tracker
    import akka.actor._
    import persistence.calculator.Calculator
    object EventTracker {
      def props = Props(new EventTracker)
    class EventTracker extends Actor {
      override def preStart(): Unit = {
      override def postStop(): Unit = {
      override def receive: Receive = {
        case Calculator.LogMessage(msg) => println(msg)


    package persistence.demo
    import akka.actor._
    import persistence.calculator.Calculator
    import persistence.tracker.EventTracker
    object persistenceDemo extends App {
      val persistenceSystem = ActorSystem("persistenceSystem")
      val calculator = persistenceSystem.actorOf(Calculator.props,"Calculator")
      calculator ! Calculator.Add(3)
      calculator ! Calculator.Add(7)
      calculator ! Calculator.Mul(3)
      calculator ! Calculator.Div(2)
      calculator ! Calculator.Sub(8)
      calculator ! Calculator.Mul(12)
      calculator ! Calculator.ShowResult













  • 相关阅读:
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/7215898.html
Copyright © 2011-2022 走看看