zoukankan      html  css  js  c++  java
  • Akka(25): Stream:对接外部系统-Integration

       在现实应用中akka-stream往往需要集成其它的外部系统形成完整的应用。这些外部系统可能是akka系列系统或者其它类型的系统。所以,akka-stream必须提供一些函数和方法来实现与各种不同类型系统的信息交换。在这篇讨论里我们就介绍几种通用的信息交换方法和函数。

       akka-stream提供了mapAsync+ask模式可以从一个运算中的数据流向外连接某个Actor来进行数据交换。这是一种akka-stream与Actor集成的应用。说到与Actor集成,联想到如果能把akka-stream中复杂又消耗资源的运算任务交付给Actor,那么我们就可以充分利用actor模式的routing,cluster,supervison等等特殊功能来实现分布式高效安全的运算。下面就是这个mapAsync函数定义:

      /**
       * Transform this stream by applying the given function to each of the elements
       * as they pass through this processing step. The function returns a `Future` and the
       * value of that future will be emitted downstream. The number of Futures
       * that shall run in parallel is given as the first argument to ``mapAsync``.
       * These Futures may complete in any order, but the elements that
       * are emitted downstream are in the same order as received from upstream.
       *
       * If the function `f` throws an exception or if the `Future` is completed
       * with failure and the supervision decision is [[akka.stream.Supervision.Stop]]
       * the stream will be completed with failure.
       *
       * If the function `f` throws an exception or if the `Future` is completed
       * with failure and the supervision decision is [[akka.stream.Supervision.Resume]] or
       * [[akka.stream.Supervision.Restart]] the element is dropped and the stream continues.
       *
       * The function `f` is always invoked on the elements in the order they arrive.
       *
       * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
       *
       * '''Emits when''' the Future returned by the provided function finishes for the next element in sequence
       *
       * '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream
       * backpressures or the first future is not completed
       *
       * '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted
       *
       * '''Cancels when''' downstream cancels
       *
       * @see [[#mapAsyncUnordered]]
       */
      def mapAsync[T](parallelism: Int)(f: Out ⇒ Future[T]): Repr[T] = via(MapAsync(parallelism, f))

    mapAsync把一个函数f: Out=>Future[T]在parallelism个Future里并行运算。我们来看看ask的款式:

      def ?(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any] =
        internalAsk(message, timeout, sender)

    刚好是 T=>Future[T]这样的款式。所以我们可以用下面这种方式从Stream里与Actor沟通:

      stream.mapAsync(parallelism = 5)(elem => (ref ? elem).mapTo[String])

    在以上的用例里Stream的每一个元素都通过ref ? elem发送给了ActorRef在一个Future里运算,这个Actor完成运算后返回Future[String]类型结果。值得注意的是mapAsync通过这个返回的Future来实现stream backpressure,也就是说这个运算Actor必须返回结果,否则Stream就会挂在那里了。下面我们先示范一下mapAsync的直接应用:

    import akka.actor._
    import akka.pattern._
    import akka.stream._
    import akka.stream.scaladsl._
    import akka.routing._
    
    import scala.concurrent.duration._
    import akka.util.Timeout
    
    object StorageActor {
    
      case class Query(rec: Int, qry: String) //模拟数据存写Query
    
      class StorageActor extends Actor with ActorLogging { //模拟存写操作Actor
        override def receive: Receive = {
          case Query(num,qry) =>
            val reply = s"${self.path} is saving: [$qry]"
            sender() ! reply                  //必须回复mapAsync, 抵消backpressure
            reply
        }
      }
      def props = Props(new StorageActor)
    }
    
    object MapAsyncDemo extends App {
      implicit val sys = ActorSystem("demoSys")
      implicit val ec = sys.dispatcher
      implicit val mat = ActorMaterializer(
        ActorMaterializerSettings(sys)
          .withInputBuffer(initialSize = 16, maxSize = 16)
      )
      val storageActor = sys.actorOf(StorageActor.props,"dbWriter")
    
    
      implicit val timeout = Timeout(3 seconds)
      Source(Stream.from(1)).delay(1.second,DelayOverflowStrategy.backpressure)
        .mapAsync(parallelism = 3){ n =>
          (storageActor ? StorageActor.Query(n,s"Record#$n")).mapTo[String]
        }.runWith(Sink.foreach(println))
    
      scala.io.StdIn.readLine()
      sys.terminate()
    
    }

     在这个例子里parallelism=3,我们在StorageActor里把当前运算中的实例返回并显示出来:

    akka://demoSys/user/dbWriter is saving: [Record#1]
    akka://demoSys/user/dbWriter is saving: [Record#2]
    akka://demoSys/user/dbWriter is saving: [Record#3]
    akka://demoSys/user/dbWriter is saving: [Record#4]
    akka://demoSys/user/dbWriter is saving: [Record#5]
    akka://demoSys/user/dbWriter is saving: [Record#6]
    ...

    可以看到:mapAsync只调用了一个Actor。那么所谓的并行运算parallelism=3的意思就只能代表在多个Future线程中同时运算了。为了实现对Actor模式特点的充分利用,我们可以通过router来实现在多个actor上并行运算。Router分pool和group两种类型:pool类router自己构建routees,group类型则调用已经构建的Actor。在我们这次的测试里只能使用group类型的Router,因为如果需要对routee实现监管supervision的话,pool类型的router在routee终止时会自动补充构建新的routee,如此就避开了监管策略。首先增加StorageActor的routing功能:

      val numOfActors = 3
      val routees: List[ActorRef] = List.fill(numOfActors)(      //构建3个StorageActor
        sys.actorOf(StorageActor.props))
      val routeePaths: List[String] = routees.map{ref => "/user/"+ref.path.name}
    
      val storageActorPool = sys.actorOf(
        RoundRobinGroup(routeePaths).props()
          .withDispatcher("akka.pool-dispatcher")
        ,"starageActorPool"
      )
    
      implicit val timeout = Timeout(3 seconds)
      Source(Stream.from(1)).delay(1.second,DelayOverflowStrategy.backpressure)
        .mapAsync(parallelism = 1){ n =>
          (storageActorPool ? StorageActor.Query(n,s"Record#$n")).mapTo[String]
        }.runWith(Sink.foreach(println))

    我们使用了RoundRobinGroup作为智能任务分配方式。注意上面的parallelism=1:现在不需要多个Future了。再看看运行的结果显示:

    akka://demoSys/user/$a is saving: [Record#1]
    akka://demoSys/user/$b is saving: [Record#2]
    akka://demoSys/user/$c is saving: [Record#3]
    akka://demoSys/user/$a is saving: [Record#4]
    akka://demoSys/user/$b is saving: [Record#5]
    akka://demoSys/user/$c is saving: [Record#6]
    akka://demoSys/user/$a is saving: [Record#7]

    可以看到现在运算任务是在a,b,c三个Actor上并行运算的。既然是模拟数据库的并行存写动作,我们可以试着为每个routee增加逐步延时重启策略BackOffSupervisor:

    object StorageActor {
    
      case class Query(rec: Int, qry: String) //模拟数据存写Query
      class DbException(cause: String) extends Exception(cause) //自定义存写异常
    
      class StorageActor extends Actor with ActorLogging { //存写操作Actor
        override def receive: Receive = {
          case Query(num,qry) =>
            var res: String = ""
            try {
              res = saveToDB(num,qry)
            } catch {
              case e: Exception => Error(num,qry) //模拟操作异常
            }
            sender() ! res
          case _ =>
        }
        def saveToDB(num: Int,qry: String): String = { //模拟存写函数
          val msg = s"${self.path} is saving: [$qry#$num]"
          if ( num % 3 == 0) Error(num,qry)        //模拟异常
          else {
            log.info(s"${self.path} is saving: [$qry#$num]")
            s"${self.path} is saving: [$qry#$num]"
          }
        }
        def Error(num: Int,qry: String): String = {
          val msg = s"${self.path} is saving: [$qry#$num]"
          sender() ! msg
          throw new DbException(s"$msg blew up, boooooom!!!")
        }
    
        //验证异常重启
        //BackoffStrategy.onStop goes through restart process
        override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
          log.info(s"Restarting ${self.path.name} on ${reason.getMessage}")
          super.preRestart(reason, message)
        }
    
        override def postRestart(reason: Throwable): Unit = {
          log.info(s"Restarted ${self.path.name} on ${reason.getMessage}")
          super.postRestart(reason)
        }
    
        override def postStop(): Unit = {
          log.info(s"Stopped ${self.path.name}!")
          super.postStop()
        }
       //BackOffStrategy.onFailure dosn't go through restart process
        override def preStart(): Unit = {
          log.info(s"PreStarting ${self.path.name} ...")
          super.preStart()
        }
    
    
      }
      def props = Props(new StorageActor)
    }
    
    object StorageActorGuardian {  //带监管策略的StorageActor
      def props: Props = { //在这里定义了监管策略和StorageActor构建
        def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = {
          case _: StorageActor.DbException => SupervisorStrategy.Restart
        }
    
        val options = Backoff.onStop(StorageActor.props, "dbWriter", 100 millis, 500 millis, 0.0)
          .withManualReset
          .withSupervisorStrategy(
            OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 second)(
              decider.orElse(SupervisorStrategy.defaultDecider)
            )
          )
        BackoffSupervisor.props(options)
      }
    }
    
    object IntegrateDemo extends App {
      implicit val sys = ActorSystem("demoSys")
      implicit val ec = sys.dispatcher
      implicit val mat = ActorMaterializer(
        ActorMaterializerSettings(sys)
          .withInputBuffer(initialSize = 16, maxSize = 16)
      )
    
      val numOfActors = 3
      val routees: List[ActorRef] = List.fill(numOfActors)(
        sys.actorOf(StorageActorGuardian.props))
      val routeePaths: List[String] = routees.map{ref => "/user/"+ref.path.name} //获取ActorPath
    
      val storageActorPool = sys.actorOf(
        RoundRobinGroup(routeePaths).props()
          .withDispatcher("akka.pool-dispatcher")
        ,"starageActorPool"
      )
    
      implicit val timeout = Timeout(3 seconds)
      Source(Stream.from(1)).delay(3.second,DelayOverflowStrategy.backpressure)
        .mapAsync(parallelism = 1){ n =>
          (storageActorPool ? StorageActor.Query(n,s"Record")).mapTo[String]
        }.runWith(Sink.foreach(println))
    
      scala.io.StdIn.readLine()
      sys.terminate()
    
    }

    我们同时增加了模拟异常发生、StorageActor生命周期callback来跟踪异常发生时SupervisorStrategy.Restart的执行情况。从试运行反馈结果证实Backoff.onFailure不会促发Restart事件,而是直接促发了preStart事件。Backoff.onStop则走Restart过程。Backoff.onFailure是在Actor出现异常终止触动的,而Backoff.onStop则是目标Actor在任何情况下终止后触发。值得注意的是,在以上例子里运算Actor会越过造成异常的这个流元素,所以我们必须在preRestart里把这个元素补发给自己:

       //验证异常重启
        //BackoffStrategy.onStop goes through restart process
        override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
          log.info(s"Restarting ${self.path.name} on ${reason.getMessage}")
           message match {
            case Some(Query(n,qry)) =>
              self ! Query(n+101,qry)      //把异常消息再补发送给自己,n+101更正了异常因素
            case _ =>
              log.info(s"Exception message: None")
    
          }
          super.preRestart(reason, message)
        }

    如果我们不需要委托给Actor运算任务的返回结果,可以尝试用Sink.actorRefWithAck:

     /**
       * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
       * First element is always `onInitMessage`, then stream is waiting for acknowledgement message
       * `ackMessage` from the given actor which means that it is ready to process
       * elements. It also requires `ackMessage` message after each stream element
       * to make backpressure work.
       *
       * If the target actor terminates the stream will be canceled.
       * When the stream is completed successfully the given `onCompleteMessage`
       * will be sent to the destination actor.
       * When the stream is completed with failure - result of `onFailureMessage(throwable)`
       * function will be sent to the destination actor.
       */
      def actorRefWithAck[T](ref: ActorRef, onInitMessage: Any, ackMessage: Any, onCompleteMessage: Any,
                             onFailureMessage: (Throwable) ⇒ Any = Status.Failure): Sink[T, NotUsed] =
        Sink.fromGraph(new ActorRefBackpressureSinkStage(ref, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage))

    在这里ActorRef只能返回有关backpressure状态信号。actorRefWithAck自己则返回Sink[T,NotUsed],也就是说它构建了一个Sink。actorRefWithAck使用三种信号来与目标Actor沟通:

    1、onInitMessage:stream发送给ActorRef的第一个信号,表示可以开始数据交换

    2、ackMessage:ActorRef向stream发出的信号,回复自身准备完毕,可以接收消息,也是一种backpressure卸除消息

    3、onCompleteMessage:stream发给ActorRef,通知stream已经完成了所有流元素发送

    我们必须修改上个例子中的StorageActor来符合actorRefWithAck的应用和与目标Actor的沟通:

    object StorageActor {
      val onInitMessage = "start"
      val onCompleteMessage = "done"
      val ackMessage = "ack"
    
      case class Query(rec: Int, qry: String) //模拟数据存写Query
      class DbException(cause: String) extends Exception(cause) //自定义存写异常
    
      class StorageActor extends Actor with ActorLogging { //存写操作Actor
        override def receive: Receive = {
          case `onInitMessage` => sender() ! ackMessage
          case Query(num,qry) =>
            var res: String = ""
            try {
              res = saveToDB(num,qry)
            } catch {
              case e: Exception => Error(num,qry) //模拟操作异常
            }
            sender() ! ackMessage
          case `onCompleteMessage` => //clean up resources 释放资源
          case _ =>
        }
        def saveToDB(num: Int,qry: String): String = { //模拟存写函数
          val msg = s"${self.path} is saving: [$qry#$num]"
          if ( num % 5 == 0) Error(num,qry)        //模拟异常
          else {
            log.info(s"${self.path} is saving: [$qry#$num]")
            s"${self.path} is saving: [$qry#$num]"
          }
        }
        def Error(num: Int,qry: String) = {
          val msg = s"${self.path} is saving: [$qry#$num]"
          sender() ! ackMessage
          throw new DbException(s"$msg blew up, boooooom!!!")
        }
    
        //验证异常重启
        //BackoffStrategy.onStop goes through restart process
        override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
          log.info(s"Restarting ${self.path.name} on ${reason.getMessage}")
          message match {
            case Some(Query(n,qry)) =>
              self ! Query(n+101,qry)      //把异常消息再补发送给自己,n+101更正了异常因素
            case _ =>
              log.info(s"Exception message: None")
    
          }
          super.preRestart(reason, message)
        }
    
        override def postRestart(reason: Throwable): Unit = {
          log.info(s"Restarted ${self.path.name} on ${reason.getMessage}")
          super.postRestart(reason)
        }
    
        override def postStop(): Unit = {
          log.info(s"Stopped ${self.path.name}!")
          super.postStop()
        }
        //BackOffStrategy.onFailure dosn't go through restart process
        override def preStart(): Unit = {
          log.info(s"PreStarting ${self.path.name} ...")
          super.preStart()
        }
      }
      def props = Props(new StorageActor)
    }

    StorageActor类里包括了对actorRefWithAck沟通消息onInitMessage、ackMessage、onCompleteMessage的处理。这个Actor只返回backpressure消息ackMessage,而不是返回任何运算结果。注意,在preRestart里我们把造成异常的元素处理后再补发给了自己。Sink.actorRefWithAck的调用方式如下: 

      Source(Stream.from(1)).map(n => Query(n,s"Record")).delay(3.second,DelayOverflowStrategy.backpressure)
          .runWith(Sink.actorRefWithAck(
           storageActorPool, onInitMessage, ackMessage,onCompleteMessage))

    完整的运行环境源代码如下:

    object SinkActorRefWithAck extends App {
      import StorageActor._
    
      implicit val sys = ActorSystem("demoSys")
      implicit val ec = sys.dispatcher
      implicit val mat = ActorMaterializer(
        ActorMaterializerSettings(sys)
          .withInputBuffer(initialSize = 16, maxSize = 16)
      )
    
      val storageActor = sys.actorOf(StorageActor.props,"storageActor")
    
      val numOfActors = 3
      val routees: List[ActorRef] = List.fill(numOfActors)(
        sys.actorOf(StorageActorGuardian.props))
      val routeePaths: List[String] = routees.map{ref => "/user/"+ref.path.name}
    
      val storageActorPool = sys.actorOf(
        RoundRobinGroup(routeePaths).props()
          .withDispatcher("akka.pool-dispatcher")
        ,"starageActorPool"
      )
    
      Source(Stream.from(1)).map(n => Query(n,s"Record")).delay(3.second,DelayOverflowStrategy.backpressure)
          .runWith(Sink.actorRefWithAck(
           storageActorPool, onInitMessage, ackMessage,onCompleteMessage))
    
      scala.io.StdIn.readLine()
      sys.terminate()
    
    }

    如果一个外部系统向一个数据流提供数据,那我们可以把这个外部系统当作数据流的源头Source。akka-stream提供了个Source.queque函数来构建一种Source,外部系统可以利用这个Source来向Stream发送数据。Source.queque的函数款式如下:

      /**
       * Creates a `Source` that is materialized as an [[akka.stream.scaladsl.SourceQueue]].
       * You can push elements to the queue and they will be emitted to the stream if there is demand from downstream,
       * otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded
       * if downstream is terminated.
       *
       * Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if
       * there is no space available in the buffer.
       *
       * Acknowledgement mechanism is available.
       * [[akka.stream.scaladsl.SourceQueue.offer]] returns `Future[QueueOfferResult]` which completes with
       * `QueueOfferResult.Enqueued` if element was added to buffer or sent downstream. It completes with
       * `QueueOfferResult.Dropped` if element was dropped. Can also complete  with `QueueOfferResult.Failure` -
       * when stream failed or `QueueOfferResult.QueueClosed` when downstream is completed.
       *
       * The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete last `offer():Future`
       * call when buffer is full.
       *
       * You can watch accessibility of stream with [[akka.stream.scaladsl.SourceQueue.watchCompletion]].
       * It returns future that completes with success when stream is completed or fail when stream is failed.
       *
       * The buffer can be disabled by using `bufferSize` of 0 and then received message will wait
       * for downstream demand unless there is another message waiting for downstream demand, in that case
       * offer result will be completed according to the overflow strategy.
       *
       * @param bufferSize size of buffer in element count
       * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
       */
      def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueueWithComplete[T]] =
        Source.fromGraph(new QueueSource(bufferSize, overflowStrategy).withAttributes(DefaultAttributes.queueSource))

    Source.queue构建了一个Source:Source[T,SourceQueueWithComplete[T]],SourceQueueWithComplete类型如下:

    /**
     * This trait adds completion support to [[SourceQueue]].
     */
    trait SourceQueueWithComplete[T] extends SourceQueue[T] {
      /**
       * Complete the stream normally. Use `watchCompletion` to be notified of this
       * operation’s success.
       */
      def complete(): Unit
    
      /**
       * Complete the stream with a failure. Use `watchCompletion` to be notified of this
       * operation’s success.
       */
      def fail(ex: Throwable): Unit
    
      /**
       * Method returns a [[Future]] that will be completed if the stream completes,
       * or will be failed when the stage faces an internal failure or the the [[SourceQueueWithComplete.fail]] method is invoked.
       */
      def watchCompletion(): Future[Done]
    }

    它在SourceQueue的基础上增加了几个抽象函数,主要用来向目标数据流发送终止信号包括:complete,fail。watchCompletion是一种监视函数,返回Future代表SourceQueue被手工终止或stream由于某些原因终止运算。下面是SourceQueue定义:

    /**
     * This trait allows to have the queue as a data source for some stream.
     */
    trait SourceQueue[T] {
    
      /**
       * Method offers next element to a stream and returns future that:
       * - completes with `Enqueued` if element is consumed by a stream
       * - completes with `Dropped` when stream dropped offered element
       * - completes with `QueueClosed` when stream is completed during future is active
       * - completes with `Failure(f)` when failure to enqueue element from upstream
       * - fails when stream is completed or you cannot call offer in this moment because of implementation rules
       * (like for backpressure mode and full buffer you need to wait for last offer call Future completion)
       *
       * @param elem element to send to a stream
       */
      def offer(elem: T): Future[QueueOfferResult]
    
      /**
       * Method returns a [[Future]] that will be completed if the stream completes,
       * or will be failed when the stage faces an internal failure.
       */
      def watchCompletion(): Future[Done]
    }

    这个界面支持了SourceQueue的基本操作:offer(elem: T), watchComplete()两个函数。下面我们就用个例子来示范SourceQueue的使用方法:我们用Calculator actor来模拟外部系统、先用Source.queue构建一个SourceQueue然后再连接下游形成一个完整的数据流。把这个数据流传给Calculator,这样Calculator就可以向这个运行中的Stream发送数据了。我们会通过这个过程来示范SourceQueue的基本操作。下面这个Calculator Actor模拟了一个外部系统作为SourceQueue用户:

    object Calculator {
      trait Operations
      case class Add(op1:Int, op2:Int) extends Operations
      case class DisplayError(err: Exception) extends Operations
      case object Stop extends Operations
      case class ProduceError(err: Exception) extends Operations
    
      def props(inputQueue: SourceQueueWithComplete[String]) = Props(new Calculator(inputQueue))
    }
    class Calculator(inputQueue: SourceQueueWithComplete[String]) extends Actor with ActorLogging{
      import Calculator._
      import context.dispatcher
      override def receive: Receive = {
        case Add(op1,op2) =>
          val msg = s"$op1 + $op2 = ${op1 + op2}"
          inputQueue.offer(msg)    //.mapTo[String]
            .recover {
            case e: Exception => DisplayError(e)}
            .pipeTo(self)
        case QueueOfferResult.Enqueued =>
          log.info("QueueOfferResult.Enqueued")
        case QueueOfferResult.Dropped =>
        case QueueOfferResult.Failure(cause) =>
        case QueueOfferResult.QueueClosed  =>
          log.info("QueueOfferResult.QueueClosed")
    
        case Stop => inputQueue.complete()
        case ProduceError(e) => inputQueue.fail(e)
    
      }
    }

    我们看到,Calculator通过传入的inputQueue把计算结果传给数据流显示出来。在receive函数里我们把offer用法以及它可能产生的返回结果通过pipeTo都做了示范。注意:不能使用mapTo[String],因为offer返回Future[T],T不是String,会造成类型转换错误。而我们已经在Source.queue[String]注明了offer(elem) elem的类型是String。inputQueue的构建方式如下:

      val source: Source[String, SourceQueueWithComplete[String]]  =
                   Source.queue[String](bufferSize = 16,
                   overflowStrategy = OverflowStrategy.backpressure)
    
      val inputQueue: SourceQueueWithComplete[String] = source.toMat(Sink.foreach(println))(Keep.left).run()
    
      inputQueue.watchCompletion().onComplete {
        case Success(result) => println(s"Calculator ends with: $result")
        case Failure(cause)  => println(s"Calculator ends with exception: ${cause.getMessage}")
      }

    增加了watchCompetion来监测SourceQueue发出的终止信号。我们还可以看到:以上SoureQueue实例source是支持backpressure的。下面是这个例子的具体运算方式:

    object SourceQueueDemo extends App {
      implicit val sys = ActorSystem("demoSys")
      implicit val ec = sys.dispatcher
      implicit val mat = ActorMaterializer(
        ActorMaterializerSettings(sys)
          .withInputBuffer(initialSize = 16, maxSize = 16)
      )
      
      val source: Source[String, SourceQueueWithComplete[String]]  =
                   Source.queue[String](bufferSize = 16,
                   overflowStrategy = OverflowStrategy.backpressure)
    
      val inputQueue: SourceQueueWithComplete[String] = source.toMat(Sink.foreach(println))(Keep.left).run()
    
      inputQueue.watchCompletion().onComplete {
        case Success(result) => println(s"Calculator ends with: $result")
        case Failure(cause)  => println(s"Calculator ends with exception: ${cause.getMessage}")
      }
    
      val calc = sys.actorOf(Calculator.props(inputQueue),"calculator")
    
      import Calculator._
      
      calc ! Add(3,5)
      scala.io.StdIn.readLine
      calc ! Add(39,1)
      scala.io.StdIn.readLine
      calc ! ProduceError(new Exception("Boooooommm!"))
      scala.io.StdIn.readLine
      calc ! Add(1,1)
    
      scala.io.StdIn.readLine
      sys.terminate()
    
    }

    在本次讨论里我们了解了akka-stream与外界系统对接集成的一些情况。主要介绍了一些支持Reactive-Stream backpressure的方法。

    以下是本次示范的全部源代码:

    MapAsyncDemo.scala:

    import akka.actor._
    import akka.pattern._
    import akka.stream._
    import akka.stream.scaladsl._
    import akka.routing._
    
    import scala.concurrent.duration._
    import akka.util.Timeout
    
    object StorageActor {
    
      case class Query(rec: Int, qry: String) //模拟数据存写Query
      class DbException(cause: String) extends Exception(cause) //自定义存写异常
    
      class StorageActor extends Actor with ActorLogging { //存写操作Actor
        override def receive: Receive = {
          case Query(num,qry) =>
            var res: String = ""
            try {
              res = saveToDB(num,qry)
            } catch {
              case e: Exception => Error(num,qry) //模拟操作异常
            }
            sender() ! res
          case _ =>
        }
        def saveToDB(num: Int,qry: String): String = { //模拟存写函数
          val msg = s"${self.path} is saving: [$qry#$num]"
          if ( num % 5 == 0) Error(num,qry)        //模拟异常
          else {
            log.info(s"${self.path} is saving: [$qry#$num]")
            s"${self.path} is saving: [$qry#$num]"
          }
        }
        def Error(num: Int,qry: String): String = {
          val msg = s"${self.path} is saving: [$qry#$num]"
          sender() ! msg                       //卸去backpressure
          throw new DbException(s"$msg blew up, boooooom!!!")
        }
    
        //验证异常重启
        //BackoffStrategy.onStop goes through restart process
        override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
          log.info(s"Restarting ${self.path.name} on ${reason.getMessage}")
           message match {
            case Some(Query(n,qry)) =>
              self ! Query(n+101,qry)      //把异常消息再补发送给自己,n+101更正了异常因素
            case _ =>
              log.info(s"Exception message: None")
    
          }
          super.preRestart(reason, message)
        }
    
        override def postRestart(reason: Throwable): Unit = {
          log.info(s"Restarted ${self.path.name} on ${reason.getMessage}")
          super.postRestart(reason)
        }
    
        override def postStop(): Unit = {
          log.info(s"Stopped ${self.path.name}!")
          super.postStop()
        }
       //BackOffStrategy.onFailure dosn't go through restart process
        override def preStart(): Unit = {
          log.info(s"PreStarting ${self.path.name} ...")
          super.preStart()
        }
      }
      def props = Props(new StorageActor)
    }
    
    object StorageActorGuardian {  //带监管策略的StorageActor
      def props: Props = { //在这里定义了监管策略和StorageActor构建
        def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = {
          case _: StorageActor.DbException => SupervisorStrategy.Restart
        }
    
        val options = Backoff.onStop(StorageActor.props, "dbWriter", 100 millis, 500 millis, 0.0)
          .withManualReset
          .withSupervisorStrategy(
            OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 second)(
              decider.orElse(SupervisorStrategy.defaultDecider)
            )
          )
        BackoffSupervisor.props(options)
      }
    }
    
    object IntegrateDemo extends App {
      implicit val sys = ActorSystem("demoSys")
      implicit val ec = sys.dispatcher
      implicit val mat = ActorMaterializer(
        ActorMaterializerSettings(sys)
          .withInputBuffer(initialSize = 16, maxSize = 16)
      )
      
      val numOfActors = 3
      val routees: List[ActorRef] = List.fill(numOfActors)(
        sys.actorOf(StorageActorGuardian.props))
      val routeePaths: List[String] = routees.map{ref => "/user/"+ref.path.name}
    
      val storageActorPool = sys.actorOf(
        RoundRobinGroup(routeePaths).props()
          .withDispatcher("akka.pool-dispatcher")
        ,"starageActorPool"
      )
      
      implicit val timeout = Timeout(3 seconds)
      Source(Stream.from(1)).delay(3.second,DelayOverflowStrategy.backpressure)
        .mapAsync(parallelism = 1){ n =>
          (storageActorPool ? StorageActor.Query(n,s"Record")).mapTo[String]
        }.runWith(Sink.foreach(println))
    
      scala.io.StdIn.readLine()
      sys.terminate()
    
    }

    SinkActorRefAckDemo.scala:

    package sinkactorrefack
    import akka.actor._
    import akka.pattern._
    import akka.stream._
    import akka.stream.scaladsl._
    import akka.routing._
    
    import scala.concurrent.duration._
    
    object StorageActor {
      val onInitMessage = "start"
      val onCompleteMessage = "done"
      val ackMessage = "ack"
    
      case class Query(rec: Int, qry: String) //模拟数据存写Query
      class DbException(cause: String) extends Exception(cause) //自定义存写异常
    
      class StorageActor extends Actor with ActorLogging { //存写操作Actor
        override def receive: Receive = {
          case `onInitMessage` => sender() ! ackMessage
          case Query(num,qry) =>
            var res: String = ""
            try {
              res = saveToDB(num,qry)
            } catch {
              case e: Exception => Error(num,qry) //模拟操作异常
            }
            sender() ! ackMessage
          case `onCompleteMessage` => //clean up resources 释放资源
          case _ =>
        }
        def saveToDB(num: Int,qry: String): String = { //模拟存写函数
          val msg = s"${self.path} is saving: [$qry#$num]"
          if ( num == 3) Error(num,qry)        //模拟异常
          else {
            log.info(s"${self.path} is saving: [$qry#$num]")
            s"${self.path} is saving: [$qry#$num]"
          }
        }
        def Error(num: Int,qry: String) = {
          val msg = s"${self.path} is saving: [$qry#$num]"
          sender() ! ackMessage
          throw new DbException(s"$msg blew up, boooooom!!!")
        }
    
        //验证异常重启
        //BackoffStrategy.onStop goes through restart process
        override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
          log.info(s"Restarting ${self.path.name} on ${reason.getMessage}")
          message match {
            case Some(Query(n,qry)) =>
              self ! Query(n+101,qry)      //把异常消息再补发送给自己,n+101更正了异常因素
            case _ =>
              log.info(s"Exception message: None")
    
          }
          super.preRestart(reason, message)
        }
    
        override def postRestart(reason: Throwable): Unit = {
          log.info(s"Restarted ${self.path.name} on ${reason.getMessage}")
          super.postRestart(reason)
        }
    
        override def postStop(): Unit = {
          log.info(s"Stopped ${self.path.name}!")
          super.postStop()
        }
        //BackOffStrategy.onFailure dosn't go through restart process
        override def preStart(): Unit = {
          log.info(s"PreStarting ${self.path.name} ...")
          super.preStart()
        }
      }
      def props = Props(new StorageActor)
    }
    
    object StorageActorGuardian {  //带监管策略的StorageActor
      def props: Props = { //在这里定义了监管策略和StorageActor构建
        def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = {
          case _: StorageActor.DbException => SupervisorStrategy.Restart
        }
    
        val options = Backoff.onStop(StorageActor.props, "dbWriter", 100 millis, 500 millis, 0.0)
          .withManualReset
          .withSupervisorStrategy(
            OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 second)(
              decider.orElse(SupervisorStrategy.defaultDecider)
            )
          )
        BackoffSupervisor.props(options)
      }
    }
    
    object SinkActorRefWithAck extends App {
      import StorageActor._
    
      implicit val sys = ActorSystem("demoSys")
      implicit val ec = sys.dispatcher
      implicit val mat = ActorMaterializer(
        ActorMaterializerSettings(sys)
          .withInputBuffer(initialSize = 16, maxSize = 16)
      )
    
      val storageActor = sys.actorOf(StorageActor.props,"storageActor")
    
      val numOfActors = 3
      val routees: List[ActorRef] = List.fill(numOfActors)(
        sys.actorOf(StorageActorGuardian.props))
      val routeePaths: List[String] = routees.map{ref => "/user/"+ref.path.name}
    
      val storageActorPool = sys.actorOf(
        RoundRobinGroup(routeePaths).props()
          .withDispatcher("akka.pool-dispatcher")
        ,"starageActorPool"
      )
    
      Source(Stream.from(1)).map(n => Query(n,s"Record")).delay(3.second,DelayOverflowStrategy.backpressure)
          .runWith(Sink.actorRefWithAck(
           storageActorPool, onInitMessage, ackMessage,onCompleteMessage))
    
      scala.io.StdIn.readLine()
      sys.terminate()
    
    }

    SourceQueueDemo.scala:

    import akka.actor._
    import akka.stream._
    import akka.stream.scaladsl._
    import scala.concurrent._
    import scala.util._
    import akka.pattern._
    
    object Calculator {
      trait Operations
      case class Add(op1:Int, op2:Int) extends Operations
      case class DisplayError(err: Exception) extends Operations
      case object Stop extends Operations
      case class ProduceError(err: Exception) extends Operations
    
      def props(inputQueue: SourceQueueWithComplete[String]) = Props(new Calculator(inputQueue))
    }
    class Calculator(inputQueue: SourceQueueWithComplete[String]) extends Actor with ActorLogging{
      import Calculator._
      import context.dispatcher
      override def receive: Receive = {
        case Add(op1,op2) =>
          val msg = s"$op1 + $op2 = ${op1 + op2}"
          inputQueue.offer(msg)
            .recover {
            case e: Exception => DisplayError(e)}
            .pipeTo(self).mapTo[String]
        case QueueOfferResult =>
          log.info("QueueOfferResult.Enqueued")
        case QueueOfferResult.Enqueued =>
          log.info("QueueOfferResult.Enqueued")
        case QueueOfferResult.Dropped =>
        case QueueOfferResult.Failure(cause) =>
        case QueueOfferResult.QueueClosed  =>
          log.info("QueueOfferResult.QueueClosed")
    
        case Stop => inputQueue.complete()
        case ProduceError(e) => inputQueue.fail(e)
    
      }
    }
    
    
    object SourceQueueDemo extends App {
      implicit val sys = ActorSystem("demoSys")
      implicit val ec = sys.dispatcher
      implicit val mat = ActorMaterializer(
        ActorMaterializerSettings(sys)
          .withInputBuffer(initialSize = 16, maxSize = 16)
      )
    
      val source: Source[String, SourceQueueWithComplete[String]]  =
                   Source.queue[String](bufferSize = 16,
                   overflowStrategy = OverflowStrategy.backpressure)
    
      val inputQueue: SourceQueueWithComplete[String] = source.toMat(Sink.foreach(println))(Keep.left).run()
    
      inputQueue.watchCompletion().onComplete {
        case Success(result) => println(s"Calculator ends with: $result")
        case Failure(cause)  => println(s"Calculator ends with exception: ${cause.getMessage}")
      }
    
      val calc = sys.actorOf(Calculator.props(inputQueue),"calculator")
    
      import Calculator._
    
      calc ! Add(3,5)
      scala.io.StdIn.readLine
      calc ! Add(39,1)
      scala.io.StdIn.readLine
      calc ! ProduceError(new Exception("Boooooommm!"))
      scala.io.StdIn.readLine
      calc ! Add(1,1)
    
      scala.io.StdIn.readLine
      sys.terminate()
    
    }

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

  • 相关阅读:
    android 中文 api (43) —— Chronometer
    SVN客户端清除密码
    Android 中文 API (35) —— ImageSwitcher
    Android 中文API (46) —— SimpleAdapter
    Android 中文 API (28) —— CheckedTextView
    Android 中文 API (36) —— Toast
    Android 中文 API (29) —— CompoundButton
    android 中文 API (41) —— RatingBar.OnRatingBarChangeListener
    Android 中文 API (30) —— CompoundButton.OnCheckedChangeListener
    Android 中文 API (24) —— MultiAutoCompleteTextView.CommaTokenizer
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/7550663.html
Copyright © 2011-2022 走看看