zoukankan      html  css  js  c++  java
  • akka-stream与actor系统集成以及如何处理随之而来的背压问题

        这几天上海快下了五天的雨☔️☔️☔️☔️,淅淅沥沥,郁郁沉沉。
        一共存在四个api:

    • Source.actorRef,返回actorRef,该actorRef接收到的消息,将被下游消费者所消费。
    • Sink.actorRef,接收actorRef,做为数据流下游消费节点。
    • Source.actorPublisher,返回actorRef,使用于reactive stream的Publisher。
    • Sink.actorSubscriber,使用于reactive stream的Subscriber。

    Source.actorRef

      val stringSourceinFuture=Source.actorRef[String](100,OverflowStrategy.fail) // 缓存最大为100,超出的话,将以失败告终
      val hahaStrSource=stringSourceinFuture.filter(str=>str.startsWith("haha")) //source数据流中把不是以"haha"开头的字符串过滤掉
      val actor=hahaStrSource.to(Sink.foreach(println)).run()
      actor!"asdsadasd"
      actor!"hahaasd"
      actor!Success("ok")// 数据流成功完成并关闭
    

        "how to create a Source that can receive elements later via a method call?"在akka-http中经常遇见Source[T,N]的地方就是对文件上传和下载的功能的编码(文件IO)中,完成file=>Source[ByteString,_]的转化,或者Source(List(1,2,3,4,5))这种hello-world级别的玩具代码中,这些代码中在定义Source时,就已经确定流中数据是什么了。那么如何先定义流,而后给流传递数据呢?答案就是Source.actorRef。郑重说明:Source.actorRef没有背压策略(背压简单说就是生产者的生成速率大于消费者处理速率,导致数据积压)。

    Sink.actorRef

    class MyActor extends Actor{
      override def receive: Receive = {
        case "FIN"=>
          println("完成了哇!!!")
          context.stop(self)
        case str:String =>
          println("msgStr:"+str)
      }
    }
    ......
      val actor=system.actorOf(Props[MyActor],"myActor")
      val sendToActor=Sink.actorRef(actor,onCompleteMessage = "FIN")
      val hahaStringSource=Source.actorRef[String](100,OverflowStrategy.dropHead).filter(str=>str.startsWith("haha"))
      val actorReceive=hahaStringSource.to(sendToActor).run()
      actorReceive!"hahasdsadsa1"
      actorReceive!"hahasdsadsa2"
      actorReceive!"hahasdsadsa3"
      actorReceive!"hahasdsadsa4"
      actorReceive!Success("ok")
    //output
    msgStr:hahasdsadsa1
    msgStr:hahasdsadsa2
    msgStr:hahasdsadsa3
    msgStr:hahasdsadsa4
    完成了哇!!!
    

        Sink作为数据流终端消费节点,常见用法比如Sink.foreach[T](t:T=>Unit)Sink.fold[U,T](z:U)((u:U,t:T)=>U)等等。Sink.actorRef用于指定某个actorRef实例,把本该数据流终端处理的数据全部发送给这个actorRef实例去处理。解释上述程序,Sink,actorRef需要说明哪一个actorRef来接收消息,并且在数据流上游完成时,这个actorRef会接收到什么样的消息作为完成的信号。我们可以看到onCompleteMessage这条消息并没有受到str=>str.startsWith("haha")这过滤条件的作用(同样的,Sink.actorRef没有处理背压功能,数据挤压过多只能按某些策略舍弃,或者直接失败)。

    背压处理

    以上Source.actorRefSink.actorRef均不支持背压策略。我们可以借助Source.actorPublisher或者Sink.actorPublisher在数据流的上游或者下游处理背压问题,但是需要去继承ActorPublisher[T]ActorSubscriber实现了处理逻辑。

    Source.actorPublisher

    在数据流上游处自己手动实现背压处理逻辑:

    case object JobAccepted
    case object JobDenied
    case class Job(msg:String)
    ...
    class MyPublisherActor extends ActorPublisher[Job]{
      import akka.stream.actor.ActorPublisherMessage._
      val MAXSize=10
      var buf=Vector.empty[Job]
      override def receive: Receive = {
        case job:Job if buf.size==MAXSize =>
          sender()!JobDenied //超出缓存 拒绝处理
        case job:Job =>
          sender()!JobAccepted //确认处理该任务
          buf.isEmpty&&totalDemand>0 match {
            case true =>
              onNext(job)
            case false=>
              buf:+=job //先向缓存中存放job
              deliverBuf() //当下游存在需求时,再去从缓存中消费job
          }
        case req@Request(n)=>
          deliverBuf()
        case Cancel=>
          context.stop(self)
      }
    
      def deliverBuf():Unit= totalDemand>0 match {
        case true =>
          totalDemand<=Int.MaxValue match {
            case true =>
              val (use,keep)=buf.splitAt(totalDemand.toInt) //相当于(buf.take(n),buf.drop(n))
              buf=keep
              use.foreach(onNext(_)) //把buf一份两半,前一半发送给下游节点消费,后一半保留
            case false=>
              buf.take(Int.MaxValue).foreach(onNext(_))
              buf=buf.drop(Int.MaxValue)
              deliverBuf() //递归
          }
        case false=>
      }
    }
    ...
    val jobSource=Source.actorPublisher[Job](Props[MyPublisherActor])
    val jobSourceActor=jobSource.via(Flow[Job].map(job=>Job(job.msg*2))).to(Sink.foreach(println)).run()
    jobSourceActor!Job("ha")
    jobSourceActor!Job("he")
    

        actorPublisher的函数签名def actorPublisher[T](props: Props): Source[T, ActorRef]。上述代码中totalDemand是由下游消费节点确定。onNext(e)方法在ActorPublisher中定义,作用是将数据传输给下游节点。当然还有onComplete()onError(ex)函数,也是用于通知下游节点作出相应处理。

    Sink.actorSubscriber

    case class Reply(id:Int)
    ...
    class Worker extends Actor{
      override def receive: Receive = {
        case (id:Int,job:Job)=>
          println("finish job:"+job)
          sender()!Reply(id)
      }
    }
    ...
    class CenterSubscriber extends ActorSubscriber{
      val router={ //路由组
        val routees=Vector.fill(3){ActorRefRoutee(context.actorOf(Props[Worker]))}
        Router(RoundRobinRoutingLogic(),routees)
      }
      var buf=Map.empty[Int,Job]
      override def requestStrategy: RequestStrategy = WatermarkRequestStrategy.apply(100)
      import akka.stream.actor.ActorSubscriberMessage._
      override def receive: Receive = {
        case OnNext(job:Job)=>
          val temp=(Random).nextInt(10000)->job
          buf+=temp //记录并下发任务
          router.route(temp,self)
        case OnError(ex)=>
          println("上游发生错误了::"+ex.getMessage)
        case OnComplete=>
          println("该数据流完成使命..")
        case Reply(id)=>
          buf-=id//当处理完成时,删去记录
      }
    }
    ...
    val actor=Source.actorPublisher[Job](Props[MyPublisherActor]).to(Sink.actorSubscriber[Job](Props[CenterSubscriber])).run()
    actor!Job("job1")
    actor!Job("job2")
    actor!Job("job3")
    

        ActorSubscriber可以接收如下几种消息类型:OnNext上游来的新消息、OnComplete上游已经结束数据流、OnError上游发生错误以及其他普通类型的消息。继承ActorSubscriber的子类都需要覆写requestStrategy以此来提供请求策略去控制数据流的背压(围绕requestDemand展开,何时向上游请求数据,一次请求多少数据等等问题)。

    知难行易
    原创博文,请勿转载
    我的又一个博客hangscer.win
  • 相关阅读:
    168. Excel Sheet Column Title
    171. Excel Sheet Column Number
    264. Ugly Number II java solutions
    152. Maximum Product Subarray java solutions
    309. Best Time to Buy and Sell Stock with Cooldown java solutions
    120. Triangle java solutions
    300. Longest Increasing Subsequence java solutions
    63. Unique Paths II java solutions
    221. Maximal Square java solutions
    279. Perfect Squares java solutions
  • 原文地址:https://www.cnblogs.com/hangscer/p/8097956.html
Copyright © 2011-2022 走看看