  • Akka(15): 持久化模式:AtLeastOnceDelivery-消息保证送达模式


    AtleastOnceDelivery模式既然需要保证消息必达,就必须保证自身在出现任何异常情况下都能恢复到原来的状态,这些都可通过状态持久化来实现。与PersistentActor不同而且更复杂的是AtleastOnceDelivery-Actor的状态除自定义的结构外还必须包括未确认收到的消息(outstanding messages)。所以AtleastOnceDelivery提供了自身特殊的事件(event)和快照(snapshot)类型,它们都包括消息送达状态。


     * Scala API: Mix-in this trait with your `PersistentActor` to send messages with at-least-once
     * delivery semantics to destinations. It takes care of re-sending messages when they
     * have not been confirmed within a configurable timeout. Use the [[AtLeastOnceDeliveryLike#deliver]] method to
     * send a message to a destination. Call the [[AtLeastOnceDeliveryLike#confirmDelivery]] method when the destination
     * has replied with a confirmation message.
     * At-least-once delivery implies that original message send order is not always retained
     * and the destination may receive duplicate messages due to possible resends.
     * The interval between redelivery attempts can be defined by [[AtLeastOnceDeliveryLike#redeliverInterval]].
     * After a number of delivery attempts a [[AtLeastOnceDelivery.UnconfirmedWarning]] message
     * will be sent to `self`. The re-sending will still continue, but you can choose to call
     * [[AtLeastOnceDeliveryLike#confirmDelivery]] to cancel the re-sending.
     * The `AtLeastOnceDelivery` trait has a state consisting of unconfirmed messages and a
     * sequence number. It does not store this state itself. You must persist events corresponding
     * to the `deliver` and `confirmDelivery` invocations from your `PersistentActor` so that the
     * state can be restored by calling the same methods during the recovery phase of the
     * `PersistentActor`. Sometimes these events can be derived from other business level events,
     * and sometimes you must create separate events. During recovery calls to `deliver`
     * will not send out the message, but it will be sent later if no matching `confirmDelivery`
     * was performed.
     * Support for snapshots is provided by [[AtLeastOnceDeliveryLike#getDeliverySnapshot]] and [[AtLeastOnceDeliveryLike#setDeliverySnapshot]].
     * The `AtLeastOnceDeliverySnapshot` contains the full delivery state, including unconfirmed messages.
     * If you need a custom snapshot for other parts of the actor state you must also include the
     * `AtLeastOnceDeliverySnapshot`. It is serialized using protobuf with the ordinary Akka
     * serialization mechanism. It is easiest to include the bytes of the `AtLeastOnceDeliverySnapshot`
     * as a blob in your custom snapshot.
     * @see [[AtLeastOnceDeliveryLike]]
     * @see [[AbstractPersistentActorWithAtLeastOnceDelivery]] for Java API
    trait AtLeastOnceDelivery extends PersistentActor with AtLeastOnceDeliveryLike {
       * Scala API: Send the message created by the `deliveryIdToMessage` function to
       * the `destination` actor. It will retry sending the message until
       * the delivery is confirmed with [[#confirmDelivery]]. Correlation
       * between `deliver` and `confirmDelivery` is performed with the
       * `deliveryId` that is provided as parameter to the `deliveryIdToMessage`
       * function. The `deliveryId` is typically passed in the message to the
       * destination, which replies with a message containing the same `deliveryId`.
       * The `deliveryId` is a strictly monotonically increasing sequence number without
       * gaps. The same sequence is used for all destinations of the actor, i.e. when sending
       * to multiple destinations the destinations will see gaps in the sequence if no
       * translation is performed.
       * During recovery this method will not send out the message, but it will be sent
       * later if no matching `confirmDelivery` was performed.
       * This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]]
       * if [[#numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]].
      def deliver(destination: ActorPath)(deliveryIdToMessage: Long ⇒ Any): Unit = {
       * Scala API: Send the message created by the `deliveryIdToMessage` function to
       * the `destination` actor. It will retry sending the message until
       * the delivery is confirmed with [[#confirmDelivery]]. Correlation
       * between `deliver` and `confirmDelivery` is performed with the
       * `deliveryId` that is provided as parameter to the `deliveryIdToMessage`
       * function. The `deliveryId` is typically passed in the message to the
       * destination, which replies with a message containing the same `deliveryId`.
       * The `deliveryId` is a strictly monotonically increasing sequence number without
       * gaps. The same sequence is used for all destinations of the actor, i.e. when sending
       * to multiple destinations the destinations will see gaps in the sequence if no
       * translation is performed.
       * During recovery this method will not send out the message, but it will be sent
       * later if no matching `confirmDelivery` was performed.
       * This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]]
       * if [[#numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]].
      def deliver(destination: ActorSelection)(deliveryIdToMessage: Long ⇒ Any): Unit = {


       * Call this method when a message has been confirmed by the destination,
       * or to abort re-sending.
       * @see [[#deliver]]
       * @return `true` the first time the `deliveryId` is confirmed, i.e. `false` for duplicate confirm
      def confirmDelivery(deliveryId: Long): Boolean = {
        if (unconfirmed.contains(deliveryId)) {
          unconfirmed -= deliveryId
        } else false


       * @see [[AtLeastOnceDeliveryLike#warnAfterNumberOfUnconfirmedAttempts]]
      case class UnconfirmedWarning(unconfirmedDeliveries: immutable.Seq[UnconfirmedDelivery]) {
         * Java API
        def getUnconfirmedDeliveries: java.util.List[UnconfirmedDelivery] = {
          import scala.collection.JavaConverters._


    private def redeliverOverdue(): Unit = {
        val now = System.nanoTime()
        val deadline = now - redeliverInterval.toNanos
        var warnings = Vector.empty[UnconfirmedDelivery]
          .filter { case (_, delivery) ⇒ delivery.timestamp <= deadline }
          .foreach {
            case (deliveryId, delivery) ⇒
              send(deliveryId, delivery, now)
              if (delivery.attempt == warnAfterNumberOfUnconfirmedAttempts)
                warnings :+= UnconfirmedDelivery(deliveryId, delivery.destination, delivery.message)
        if (warnings.nonEmpty)
          self ! UnconfirmedWarning(warnings)


     override private[akka] def onReplaySuccess(): Unit = {
      override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit =
        message match {
          case RedeliveryTick ⇒
          case x ⇒
            super.aroundReceive(receive, message)


    object AtLeastOnceDelivery {
       * Snapshot of current `AtLeastOnceDelivery` state. Can be retrieved with
       * [[AtLeastOnceDeliveryLike#getDeliverySnapshot]] and saved with [[PersistentActor#saveSnapshot]].
       * During recovery the snapshot received in [[SnapshotOffer]] should be set
       * with [[AtLeastOnceDeliveryLike#setDeliverySnapshot]].
      case class AtLeastOnceDeliverySnapshot(currentDeliveryId: Long, unconfirmedDeliveries: immutable.Seq[UnconfirmedDelivery])
        extends Message {
         * Java API
        def getUnconfirmedDeliveries: java.util.List[UnconfirmedDelivery] = {
          import scala.collection.JavaConverters._


       * Full state of the `AtLeastOnceDelivery`. It can be saved with [[PersistentActor#saveSnapshot]].
       * During recovery the snapshot received in [[SnapshotOffer]] should be set
       * with [[#setDeliverySnapshot]].
       * The `AtLeastOnceDeliverySnapshot` contains the full delivery state, including unconfirmed messages.
       * If you need a custom snapshot for other parts of the actor state you must also include the
       * `AtLeastOnceDeliverySnapshot`. It is serialized using protobuf with the ordinary Akka
       * serialization mechanism. It is easiest to include the bytes of the `AtLeastOnceDeliverySnapshot`
       * as a blob in your custom snapshot.
      def getDeliverySnapshot: AtLeastOnceDeliverySnapshot =
          unconfirmed.map { case (deliveryId, d) ⇒ UnconfirmedDelivery(deliveryId, d.destination, d.message) }(breakOut))
       * If snapshot from [[#getDeliverySnapshot]] was saved it will be received during recovery
       * in a [[SnapshotOffer]] message and should be set with this method.
      def setDeliverySnapshot(snapshot: AtLeastOnceDeliverySnapshot): Unit = {
        deliverySequenceNr = snapshot.currentDeliveryId
        val now = System.nanoTime()
        unconfirmed = snapshot.unconfirmedDeliveries.map(d ⇒
          d.deliveryId → Delivery(d.destination, d.message, now, 0))(breakOut)


    package atleastonce.calculation
    import akka.actor._
    import akka.persistence._
    import akka.persistence.AtLeastOnceDelivery._
    import atleastonce.calculator.Calculator
    object CalcAggregator {
      sealed trait Command
      case class Add(x: Int, y: Int) extends Command
      case class Sub(x: Int, y: Int) extends Command
      case class Mul(x: Int, y: Int) extends Command
      case class Div(x: Int, y: Int) extends Command
      case class Result(id: Long, res: Int) extends Command
      sealed trait Event
      case class Added(x: Int, y: Int) extends Event
      case class Substracted(x: Int, y: Int) extends Event
      case class Multiplied(x: Int, y: Int) extends Event
      case class Divided(x: Int, y: Int) extends Event
      case class GotResult(id: Long, res: Int) extends Event
      case class Snap(results: Set[Int], deliverySnapshot: AtLeastOnceDeliverySnapshot)
    class CalcAggregator(calculators: Map[String,ActorPath]) extends PersistentActor
      with AtLeastOnceDelivery with ActorLogging {
     import CalcAggregator._
      var results: Set[Int] = Set()
      override def persistenceId = "calculation-actor"
      //sending commands and update state only with delivery ack
      def updateState(cmd: Command) = cmd match {
        case Add(x,y) => deliver(calculators("ADD")){id => Calculator.Add(id,x,y)}
        case Sub(x,y) => deliver(calculators("SUB")){id => Calculator.Sub(id,x,y)}
        case Mul(x,y) => deliver(calculators("MUL")){id => Calculator.Mul(id,x,y)}
        case Div(x,y) => deliver(calculators("DIV")){id => Calculator.Div(id,x,y)}
        case Result(id,res) =>
          results += res
      override def receiveCommand: Receive = {
        case cmd: Command => persistAsync(cmd){updateState}
        case UnconfirmedWarning(unconfirmedDeliveries) =>  //cancel outstanding message
          unconfirmedDeliveries.foreach{u => confirmDelivery(u.deliveryId)}
      override def receiveRecover: Receive = {
        case cmd: Command => updateState(cmd)
        case SnapshotOffer(_,snap: Snap) =>
          results = snap.results

    package atleastonce.calculation
    import akka.actor._
    import akka.persistence._
    import akka.persistence.AtLeastOnceDelivery._
    import atleastonce.calculator.Calculator
    import scala.util.control.NoStackTrace
    object CalcAggregator {
      sealed trait Command
      case class Add(x: Int, y: Int) extends Command
      case class Sub(x: Int, y: Int) extends Command
      case class Mul(x: Int, y: Int) extends Command
      case class Div(x: Int, y: Int) extends Command
      case class Result(id: Long, res: Int) extends Command
      sealed trait Event
      case class Added(x: Int, y: Int) extends Event
      case class Substracted(x: Int, y: Int) extends Event
      case class Multiplied(x: Int, y: Int) extends Event
      case class Divided(x: Int, y: Int) extends Event
      case class GotResult(id: Long, res: Int) extends Event
      case class Snap(results: List[Int],
                      deliverySnapshot: AtLeastOnceDeliverySnapshot)
      case object ShowResults
      case object Boom
      case object ClearJournal
      def props(calculators: Map[String,ActorRef],keepJournalNr: Int) =
        Props(new CalcAggregator(calculators,keepJournalNr))
    class CalcAggregator(calculators: Map[String,ActorRef],keepJournalNr: Int)
      extends PersistentActor with AtLeastOnceDelivery with ActorLogging {
      import CalcAggregator._
      var results: List[Int] = List()
      var resultsId: Int = 0
      override def persistenceId = "calculation-actor023"
      //sending commands and update state only with delivery ack
      def updateState(cmd: Command) = {
        if (!recoveryRunning && !cmd.isInstanceOf[Result])
            log.info(s"Sending command message: $cmd at: $lastSequenceNr")
        cmd match {
          case Add(x,y) => deliver(calculators("ADD").path){id => Calculator.Add(id,x,y)}
          case Sub(x,y) => deliver(calculators("SUB").path){id => Calculator.Sub(id,x,y)}
          case Mul(x,y) => deliver(calculators("MUL").path){id => Calculator.Mul(id,x,y)}
          case Div(x,y) => deliver(calculators("DIV").path){id => Calculator.Div(id,x,y)}
          case Result(id,res) =>
            log.info(s"Receive calculation result $res with ack id: $id")
            if ( res != 0) {
              results = res :: results
              log.info(s"Current state updated to: $results at $lastSequenceNr")
              resultsId += 1
              if (resultsId % keepJournalNr == 0) {
                resultsId = 0
                saveSnapshot(Snap(results, getDeliverySnapshot))
                log.info(s"Saving snapshot with state $results, snapshot: $getDeliverySnapshot")
      override def receiveCommand: Receive = {
        case cmd: Command => persist(cmd){updateState}
        case ack: Calculator.Ack =>
        case UnconfirmedWarning(unconfirmedDeliveries) =>  //cancel outstanding message
          log.info(s"UnconfirmedWarning: $unconfirmedDeliveries ...")
          unconfirmedDeliveries.foreach{u =>
            log.info(s"Cancelling unconfirmedDeliveris $u")
        case SaveSnapshotSuccess(m) =>
          log.info(s"Sucessfull saving snapshot: ${m} at: $lastSequenceNr")
          //clear journal and snapshot
          deleteSnapshots(SnapshotSelectionCriteria(maxSequenceNr = m.sequenceNr - 1))
        case SaveSnapshotFailure(m,cause) =>
          log.info(s"Saving snapshot failed because: ${cause}")
        case DeleteMessagesSuccess(toSeq) =>
          log.info(s"Succefull deleting journal upto: $toSeq")
        case DeleteMessagesFailure(cause,toSeq) =>
          log.info(s"Failed to delete journal upto: $toSeq because: $cause")
        case DeleteSnapshotsSuccess(crit) =>
          log.info(s"Successful delete snapshots for $crit")
        case DeleteSnapshotSuccess(m) =>
          log.info(s"Successful delete snapshot upto: ${m.sequenceNr}")
        case DeleteSnapshotsFailure(crit,cause) =>
          log.info(s"Failed to delete snapshots $crit because: $cause")
        case DeleteSnapshotFailure(m,cause) =>
          log.info(s"Failed to delete snapshot upto: ${m.sequenceNr} because: $cause")
        case ShowResults =>
          log.info(s"Show Current State: $results and lastSequenceNr : $lastSequenceNr")
        case "TakeSnapshot" =>
          log.info(s"Saving snapshot with state: $results ...")
          saveSnapshot(Snap(results, getDeliverySnapshot))
        case Boom =>
          throw new RuntimeException("boom") with NoStackTrace
        case ClearJournal =>
          deleteSnapshots(SnapshotSelectionCriteria(maxSequenceNr = lastSequenceNr))
      override def receiveRecover: Receive = {
        case cmd: Command => updateState(cmd)
          log.info(s"Replaying command: $cmd")
        case SnapshotOffer(md,snap: Snap) =>
          log.info(s"Loading snapshot at: ${md.sequenceNr} with state: ${snap.results}")
          results = snap.results
          log.info(s"Updated state to $results with snapshot")
        case RecoveryCompleted =>
          log.info(s"Recovery compeleted with State: $results and lastSequenceNr=$lastSequenceNr")
      override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
        log.info(s"Aggregator restarting with reason: ${reason.getMessage}")
        super.preRestart(reason, message)
      override def warnAfterNumberOfUnconfirmedAttempts = 1


    package atleastonce.demo
    import atleastonce.calculation.CalcAggregator
    import atleastonce.calculation.CalcAggregator._
    import atleastonce.calculator.Calculator
    import akka.actor._
    object AtLeastOnceDemo extends App {
      val atLeastOnceSystem = ActorSystem("atleastonceSystem")
      val addActor = atLeastOnceSystem.actorOf(Calculator.props,"addActor")
      val subActor = atLeastOnceSystem.actorOf(Calculator.props,"subActor")
      val mulActor = atLeastOnceSystem.actorOf(Calculator.props,"mulActor")
      val divActor = atLeastOnceSystem.actorOf(Calculator.props,"divActor")
      var actors = Map[String,ActorRef]()
      actors += ("ADD" -> addActor)
      actors += ("SUB" -> subActor)
      actors += ("MUL" -> mulActor)
      actors += ("DIV" -> divActor)
      val aggregator = atLeastOnceSystem.actorOf(CalcAggregator.props(actors,5), "aggregator")
      aggregator ! Sub(0,0)
      aggregator ! Add(6,3)
      aggregator ! Sub(8,0)
      aggregator ! Mul(3,2)
      aggregator ! Boom
      aggregator ! Div(12,3)
      aggregator ! ShowResults
     // aggregator ! ClearJournal




    name := "atleastonce-delivery"
    version := "1.0"
    scalaVersion := "2.11.9"
    sbtVersion := "0.13.5"
    libraryDependencies ++= Seq(
      "com.typesafe.akka"           %% "akka-actor"       % "2.5.3",
      "com.typesafe.akka"           %% "akka-persistence" % "2.5.3",
      "ch.qos.logback" % "logback-classic" % "1.1.7",
      "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.54",
      "com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % "0.54" % Test


    akka {
      persistence {
        journal.plugin = "cassandra-journal"
        snapshot-store.plugin = "cassandra-snapshot-store"
    akka.actor.warn-about-java-serializer-usage = off
    akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts = 1


