zoukankan      html  css  js  c++  java
  • Akka之BackoffSupervisor

    一、背景

    最近在开发一个项目,项目的各模块之间是使用akka grpc传输音频帧的,并且各模块中的actor分别都进行了persist。本周在开发过程中遇到了一个bug,就是音频帧在通行一段时间后,整个系统处于卡死状态,没有了反应。刚开始怀疑是akka grpc通信时,流中断了,或者没有传输过来,可是通过抓包和日志,发现流的每一帧已经接受到了。最后定位到问题时persist的原因,每次卡死之间都可以发现persit失败了。我们去看persist方法的源码可以发现上面的注解如下:

    /**
    * Asynchronously persists `event`. On successful persistence, `handler` is called with the
    * persisted event. It is guaranteed that no new commands will be received by a persistent actor
    * between a call to `persist` and the execution of its `handler`. This also holds for
    * multiple `persist` calls per received command. Internally, this is achieved by stashing new
    * commands and unstashing them when the `event` has been persisted and handled. The stash used
    * for that is an internal stash which doesn't interfere with the inherited user stash.
    *
    * An event `handler` may close over persistent actor state and modify it. The `sender` of a persisted
    * event is the sender of the corresponding command. This means that one can reply to a command
    * sender within an event `handler`.
    *
    * Within an event handler, applications usually update persistent actor state using persisted event
    * data, notify listeners and reply to command senders.
    *
    * If persistence of an event fails, [[#onPersistFailure]] will be invoked and the actor will
    * unconditionally be stopped. The reason that it cannot resume when persist fails is that it
    * is unknown if the event was actually persisted or not, and therefore it is in an inconsistent
    * state. Restarting on persistent failures will most likely fail anyway, since the journal
    * is probably unavailable. It is better to stop the actor and after a back-off timeout start
    * it again.
    *
    * @param event event to be persisted
    * @param handler handler for each persisted `event`
    */
    def persist[A](event: A)(handler: A ⇒ Unit): Unit = {
    internalPersist(event)(handler)
    }

    从注解我们可以发现,我们在使用akka的persist特性时,如果持久化失败,相应的actor就会被stop掉。因此,如果你继续往它发消息是没有任何反应。并且这个注解建议我们使用Backoff来重启这个Actor。(建议,我们在把actor持久化到本地或者使用redis等插件持久化到数据库时,最好选择持久化的方法,不然会使用java持久化,会出现WARN,因为默认的java持久化效率不是很好)。因此,我们来学习一下,这种BackOff重启的方式。

    二、普通的监控和重启

    我们都知道Actor之间是有父子关系的,如果子Actor出现了异常,是会进行上报,并且使用策略来进行相应的处理,其中最常用的策略就是OneForOneStrategy,也就是只针对发生异常的Actor施用策略,策略中定义了对下属子级发生的各种异常的处理方式。而默认的处理方式是这样的:

    final val defaultDecider: Decider = {
    case _: ActorInitializationException ⇒ Stop
    case _: ActorKilledException ⇒ Stop
    case _: DeathPactException ⇒ Stop
    case _: Exception ⇒ Restart
    }

    但是,我们可以加上自己的一些特殊处理方式,例如

    override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
    case _: ArithmeticException => Resume
    case _: MyException => Restart
    case t =>
    super.supervisorStrategy.decider.applyOrElse(t, (_: Any) => Escalate)
    }

    三、BackoffSupervisor

    但是,如果我们的actor是通过system.actorOf进行启动的,我们就很难通过上述这个方式去自定义自己的异常处理方式。并且,如果我们想进行个细粒度的控制,例如在actor发生异常以后多久去处理它。这种情况我们就可以使用BackoffSupervisor去实现。

    我们可以这样理解BackoffSupervisor。实际上BackoffSupervisor与定义了supervisorStrategy的Actor有所不同。我们应该把BackoffSupervisor看作是一个一体化的Actor。当然,它的实现方式还是由一对父子Actor组成。监管策略(SupervisorStrategy)是在BackoffSupervisor的内部实现的。从外表上BackoffSupervisor就像是一个Actor,运算逻辑是在子级Actor中定义的,所谓的父级Actor除监管之外没有任何其它功能,我们甚至没有地方定义父级Actor的功能,它的唯一功能是转发收到的信息给子级,是嵌入BackoffSupervisor里的。所以我们虽然发送消息给BackoffSupervisor,但实际上是在与它的子级交流。下面我们可以通过一个例子来看看怎么使用:

    object InnerChild {
    case class TestMessage(msg: String)
    class ChildException extends Exception

    def props = Props[InnerChild]
    }
    class InnerChild extends Actor with ActorLogging {
    import InnerChild._
    override def receive: Receive = {
    case TestMessage(msg) => //模拟子级功能
    log.info(s"Child received message: ${msg}")
    }
    }
    object Supervisor {
    def props: Props = { //在这里定义了监管策略和child Actor构建
    def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = {
    case _: InnerChild.ChildException => SupervisorStrategy.Restart
    }

    val options = Backoff.onFailure(InnerChild.props, "innerChild", 1 second, 5 seconds, 0.0)
    .withManualReset
    .withSupervisorStrategy(
    OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(
    decider.orElse(SupervisorStrategy.defaultDecider)
    )
    )
    BackoffSupervisor.props(options)
    }
    }
    //注意:下面是Supervisor的父级,不是InnerChild的父级
    object ParentalActor {
    case class SendToSupervisor(msg: InnerChild.TestMessage)
    case class SendToInnerChild(msg: InnerChild.TestMessage)
    case class SendToChildSelection(msg: InnerChild.TestMessage)
    def props = Props[ParentalActor]
    }
    class ParentalActor extends Actor with ActorLogging {
    import ParentalActor._
    //在这里构建子级Actor supervisor
    val supervisor = context.actorOf(Supervisor.props,"supervisor")
    supervisor ! BackoffSupervisor.getCurrentChild //要求supervisor返回当前子级Actor
    var innerChild: Option[ActorRef] = None //返回的当前子级ActorRef
    val selectedChild = context.actorSelection("/user/parent/supervisor/innerChild")
    override def receive: Receive = {
    case BackoffSupervisor.CurrentChild(ref) => //收到子级Actor信息
    innerChild = ref
    case SendToSupervisor(msg) => supervisor ! msg
    case SendToChildSelection(msg) => selectedChild ! msg
    case SendToInnerChild(msg) => innerChild foreach(child => child ! msg)
    }

    }
    object BackoffSupervisorDemo extends App {
    import ParentalActor._
    val testSystem = ActorSystem("testSystem")
    val parent = testSystem.actorOf(ParentalActor.props,"parent")
    Thread.sleep(1000) //wait for BackoffSupervisor.CurrentChild(ref) received
    parent ! SendToSupervisor(TestMessage("Hello message 1 to supervisor"))
    parent ! SendToInnerChild(TestMessage("Hello message 2 to innerChild"))
    parent ! SendToChildSelection(TestMessage("Hello message 3 to selectedChild"))
    scala.io.StdIn.readLine()
    testSystem.terminate()
    }

    运行结果如下:

    [INFO] [10/13/2018 16:11:48.167] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/parent/supervisor/innerChild] Child received message: Hello message 1 to supervisor
    [INFO] [10/13/2018 16:11:48.177] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/parent/supervisor/innerChild] Child received message: Hello message 2 to innerChild
    [INFO] [10/13/2018 16:11:48.179] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/parent/supervisor/innerChild] Child received message: Hello message 3 to selectedChild

    从结果可以看出,虽然在上面的例子里我们分别向supervisor,innerChild,selectedChild发送消息。但最终所有消息都是由InnerChild响应的。

  • 相关阅读:
    Maven记录
    TP-Link的config.bin的解码
    SLF4JLogFactory does not implement org.apache.commons.logging.LogFactory
    axis1.4调用WebService报找不到分派方法
    Spring在单例bean中使用session、request范围的bean
    使用spring-session同时用session范围bean的问题
    tomcat session共享快速入门
    Log4j配置spring+druid打印日志
    基于WebSocket的多人在线坦克大战demo
    IDEA将maven项目配置到本地tomcat中运行
  • 原文地址:https://www.cnblogs.com/junjiang3/p/9785373.html
Copyright © 2011-2022 走看看