zoukankan      html  css  js  c++  java
  • 自动调整速率的Actor设计模式

    问题背景

    与数据库或者存储系统交互是所有应用软件都必不可少的功能之一,akka开发的系统也不例外。但akka特殊的地方在于,会尽可能的将所有的功能都设计成异步的,以避免Actor阻塞,然而无法避免IO这类的阻塞操作。我们往往会把IO消息发送给单独的Actor进行处理,避免业务主逻辑受到阻塞。

    在处理IO消息时,有两种模式:批量和单条。批量是指一次性处理多个消息,这样可以减少与存储系统的交互,提高吞吐量,适合处理大量消息;单条是指一次只处理一条消息,与存储系统交互次数增多,但可以尽快的处理当前消息,这在消息比较少时非常有用。

    但系统往往是复杂的,待处理的消息的分布并不集中,业务繁忙时,短时间内消息很多,此时批量处理可以增加吞吐量;业务闲暇时,消息零零散散,需要尽可能快的处理消息。一个优秀的系统需要能够识别并合适的处理这两种消息速率,用akka开发系统时,也需要拥有这种能力。

    问题假设

    记得以前数学老师讲课时,最喜欢也是最经常说的两个字就是“解、设”,就是在解决问题之前,总是会做一些假设。那么我们也做一些假设,以简化解决问题的难度,但这并不影响我们对原有问题的理解。现假设如下:

    有一个actor接收其他actor发过来的消息,把它存入数据库:
    1、数据量比较少时。数据单条处理,尽量快速的入库。

    2、数据量比较大时。数据需要批量处理,比如调用jdbc的batch操作,以提高吞吐量。

    3、需要能够在不同消息速率之间自由切换。

    基于以上的背景,这个actor该如何设计比较好呢?

    解决思路

    解决该问题有以下几点因素需要考虑:

      1. 如何计算消息速率。
      2. 如何判断消息速率过高或过低
      3. 批量、单条模式之间如何切换
      4. 基于akka解决问题(毕竟作者遇到这个问题就是在用akka开发软件的过程中)

    计算消息速率

    上面问题的关键点之一是如何判断当前的消息速率过高或过快,而计算速率的重要参数是时间,而在分布式场景下时间是一个不可忽视的因素。各个节点之间的时间有时无法做到完全一致,作者所在的公司就是这样。
    计算时,时间有两种选择方式:

    1、选择消息本身的时间;
    2、选择处理消息时,当前系统时间。

    两种选择方式各有优劣。第一种比较准确,毕竟计算速率的对象是消息,用消息的时间也最为准确,但这要求所有节点的时间保持同步,而且消息本身必须有一个时间字段;第二种准确度稍微差一点,毕竟收到消息与实际处理该消息会有一定的延时,可以处理任意类型的消息。
    为了简化并解决问题,作者选择了消息本身的时间作为计算参数,所有的消息都有时间字段。

    切换计算模式

    我们已经计算出了消息速率,那么是否就可以直接跟设定的阈值进行对比,判断当前的处理模式(批量或单条)了呢?这还不一定,要根据实际情况作出判断。消息速率的计算有两种计算方式:实时、固定速率。其中“固定速率”也有两种方式:固定消息个数、固定处理间隔。

    计算的方法各有千秋。如果实时计算消息速率,可以及时的切换批量或单条模式,但在速率不稳定的情况下,会造成“抖动”的情况,即频繁的在两种模式之间进行切换,很可能造成批量处理的消息数量过少,降低吞吐量;固定速率计算,则可以缓和这种“抖动”,当然也就不能及时的切换批量或单条模式。

    同样,为了简化问题,并考虑遇到问题的实际情况,作者选择用“固定速率”计算消息速率,计算方法如下:

    1、 刚开始处于单条模式,保存当前时间(或第一条消息的时间)为StartTime
    2、 单条模式下处理消息,并保存消息的当前时间问EndTime

    3、 计算当前的处理消息的个数,如果达到一个批量阈值,则计算此次批量时间跨度,即EndTime-StartTime。如果时间跨度大于批量时间的阈值,即此次批量处理的消息比较少,继续处于单条模式;如果时间跨度小于阈值,则表示在短时间内,收到了大量的消息,则切换为批量模式,计数器清零。

    4、 批量模式开始时,保存第一条消息的时间为StartTime
    5、 批量模式处理消息,并保存消息的当前时间问EndTime

    6、 计算当前的处理消息的个数,如果达到一个批量阈值,则返回“批量提交”消息,该消息作为特殊消息,提示处理程序提交当前批量。
    7、 通过外部actor或者外部系统,给当前actor发送“批量心跳”消息,该消息主要为了弥补消息尾端的空白。即消息个数少于一个批量时,能够及时处理当前剩余消息。

    8、 收到“批量心跳”消息时,检查当前消息处理个数,如果小于一个批量阈值,则表示当前消息速率过低,退出批量模式;如果大于一个批量阈值,则计数器清零,保持批量模式。无论此时进入哪个模式,都会发送“批量提交”消息,以尽快提交当前批量。

    demo代码

    为了保持通用,此处设计了一个抽象类,封装了部分逻辑

    object AutoSpeedActor{
      final case class BatchMessage(sourceMessage:Option[Any],lastMessageTime:Long, commit:Boolean = false)
      private[actor] trait InternalMessage{
        def systemTimestamp:Long
      }
      private[actor] final case class EnterBatch(systemTimestamp:Long) extends InternalMessage
      private[actor] final case class LeaveBatch(systemTimestamp:Long) extends InternalMessage
      private[actor] final case class BatchInterval(systemTimestamp:Long) extends InternalMessage
    }
    /**
      *
      * @param batchNumber 自适应时批量的数量
      * @param batchInterval 自适应时批量的时间间隔
      */
    abstract class AutoSpeedActor(batchNumber:Long,batchInterval:Duration,startTime:Long) extends Actor with ActorLogging {
      /**
        * 时间守卫。
        * 用来在批量模式下,及时提交当前剩余批量消息
        */
      private var timerGuard: ActorRef = _
      /**
        * 当前批量开始时间
        */
      private var batchStartTimestamp:Long = startTime
      /**
        * 当前批量结束时间
        */
      private var batchEndTimestamp:Long = batchStartTimestamp
      /**
        * 批量计数器
        */
      private var batchCounter:Long = 0
      /**
        * 获取当前消息的时间戳
        * @param msg 当前消息
        * @return 当前消息的时间戳
        */
      def getMessageTimestamp(msg: Any):Long
      /**
        * 判断当前消息是否自动驾驶,
        * @param msg 当前消息
        * @return true则对此类型的消息自动调整速率
        */
      def isAutoDriveMessage(msg:Any):Boolean
      /**
        * 判断当前是否为内部消息
        * @param msg 当前消息
        * @return true表示当前消息为内部消息
        */
      private def isIntervalMessage(msg:Any):Boolean = msg.isInstanceOf[AutoSpeedActor.InternalMessage]
      override def preStart(): Unit = {
        super.preStart()
        timerGuard = context.actorOf(Props.create(classOf[AutoSpeedActorGuard],batchInterval),self.path.name+"timerGuard")
      }
      override def postStop(): Unit = {
        super.postStop()
        context.stop(timerGuard)
      }
      /**
        * 消息拦截器,初始化为单条模式
        */
      private var messageIntercept: (Any) => Any = singleProcess
      /**
        * 批量模式下,封装当前消息
        * @param currentMsg 当前消息
        * @return 封装后的批量消息
        */
      private def batchProcess(currentMsg:Any):Any = currentMsg match {
          case AutoSpeedActor.BatchInterval(systemTimestamp) =>
            log.debug(s"Receive AutoSpeedActor.BatchInterval message at $systemTimestamp ")
            if( batchCounter < batchNumber ){
              timerGuard ! AutoSpeedActor.LeaveBatch(System.currentTimeMillis())
              messageIntercept = singleProcess
            }
            // 收到超时时间时,当前消息过少,则退出批量模式
            batchCounter = 0
            batchStartTimestamp = batchEndTimestamp
            AutoSpeedActor.BatchMessage(None,batchEndTimestamp,commit = true)
          case _ =>
            batchEndTimestamp = getMessageTimestamp(currentMsg)
            val commit = batchCounter % batchNumber == 0
            AutoSpeedActor.BatchMessage(Some(currentMsg),batchEndTimestamp,commit)
      }
      /**
        * 单条模式下,封装当前消息
        * @param currentMsg 当前消息
        * @return 封装后的消息
        */
      private def singleProcess(currentMsg:Any):Any = {
        batchEndTimestamp = getMessageTimestamp(currentMsg)
        if(batchCounter == batchNumber){
          batchCounter = 0
          log.debug(s"Reach an batch which from $batchStartTimestamp to $batchEndTimestamp")
          // 在一个批量内,时间跨度大于设定的批量阈值,则表示接收的消息比较慢
          if (batchEndTimestamp - batchStartTimestamp > batchInterval.toMillis ){
            batchStartTimestamp = batchEndTimestamp
          }else{
            // 在一个批量内,时间跨度小于设定的批量阈值,则表示接收的消息比较快,进入批量模式
            timerGuard ! AutoSpeedActor.EnterBatch(System.currentTimeMillis())
            messageIntercept = batchProcess
          }
        }
        currentMsg
      }
      override def aroundReceive(receive: Receive, msg: Any): Unit = {
        val interceptedMessage = if(isAutoDriveMessage(msg) || isIntervalMessage(msg)){
          batchCounter += 1
          messageIntercept(msg)
        }else{
          msg
        }
        super.aroundReceive(receive, interceptedMessage)
      }
    }
    private[actor] class AutoSpeedActorGuard(timeout:Duration) extends Actor with ActorLogging{
      private var batchMode = false
      private implicit val executionContextExecutor: ExecutionContextExecutor = context.dispatcher
      override def receive: Receive = {
        case AutoSpeedActor.EnterBatch(systemTimestamp) =>
          log.debug(s"Enter batch mode at $systemTimestamp")
          batchMode = true
          context.system.scheduler.scheduleOnce(FiniteDuration(timeout._1,timeout._2),self,AutoSpeedActor.BatchInterval(systemTimestamp))
        case AutoSpeedActor.LeaveBatch(systemTimestamp) =>
          log.debug(s"Leave batch mode at $systemTimestamp")
          batchMode = false
        case evt: AutoSpeedActor.BatchInterval =>
          log.debug(s"Receive an AutoSpeedActor.BatchInterval message $evt ,batchMode = $batchMode")
          if(batchMode){
            context.system.scheduler.scheduleOnce(FiniteDuration(timeout._1,timeout._2),self,AutoSpeedActor.BatchInterval(System.currentTimeMillis()))
            context.parent ! evt
          }
      }
    }
    

    TODO

    demo代码还是有点简单,后期需要进一步的优化,例如该actor只能对某一类消息进行自动速率调整,无法适应多个不同类型消息的AutoDrive,欢迎大家进行讨论,提出各种优化方案。

  • 相关阅读:
    LeetCode 230. Kth Smallest Element in a BST
    LeetCode 114. Flatten Binary Tree to Linked List
    LeetCode 222. Count Complete Tree Nodes
    LeetCode 129. Sum Root to Leaf Numbers
    LeetCode 113. Path Sum II
    LeetCode 257. Binary Tree Paths
    Java Convert String & Int
    Java Annotations
    LeetCode 236. Lowest Common Ancestor of a Binary Tree
    LeetCode 235. Lowest Common Ancestor of a Binary Search Tree
  • 原文地址:https://www.cnblogs.com/gabry/p/9138507.html
Copyright © 2011-2022 走看看