zoukankan      html  css  js  c++  java
  • Akka(3): Actor监管

        在上一篇讨论中我们谈到了监管:在Akka中就是一种直属父子监管树结构,父级Actor负责处理直属子级Actor产生的异常。当时我们把BackoffSupervisor作为父子监管方式的其中一种。实际上BackoffSupervisor与定义了supervisorStrategy的Actor有所不同。我们应该把BackoffSupervisor看作是一个一体化的Actor。当然,它的实现方式还是由一对父子Actor组成。监管策略(SupervisorStrategy)是在BackoffSupervisor的内部实现的。从外表上BackoffSupervisor就像是一个Actor,运算逻辑是在子级Actor中定义的,所谓的父级Actor除监管之外没有任何其它功能,我们甚至没有地方定义父级Actor的功能,它的唯一功能是转发收到的信息给子级,是嵌入BackoffSupervisor里的。所以我们虽然发送消息给BackoffSupervisor,但实际上是在与它的子级交流。我们看看下面这个例子:

    package backoffSupervisorDemo
    import akka.actor._
    import akka.pattern._
    import backoffSupervisorDemo.InnerChild.TestMessage
    
    import scala.concurrent.duration._
    
    object InnerChild {
      case class TestMessage(msg: String)
      class ChildException extends Exception
    
      def props = Props[InnerChild]
    }
    class InnerChild extends Actor with ActorLogging {
      import InnerChild._
      override def receive: Receive = {
        case TestMessage(msg) => //模拟子级功能
          log.info(s"Child received message: ${msg}")
      }
    }
    object Supervisor {
      def props: Props = { //在这里定义了监管策略和child Actor构建
        def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = {
          case _: InnerChild.ChildException => SupervisorStrategy.Restart
        }
    
        val options = Backoff.onFailure(InnerChild.props, "innerChild", 1 second, 5 seconds, 0.0)
          .withManualReset
          .withSupervisorStrategy(
            OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(
              decider.orElse(SupervisorStrategy.defaultDecider)
            )
          )
        BackoffSupervisor.props(options)
      }
    }
    //注意:下面是Supervisor的父级,不是InnerChild的父级
    object ParentalActor {
      case class SendToSupervisor(msg: InnerChild.TestMessage)
      case class SendToInnerChild(msg: InnerChild.TestMessage)
      case class SendToChildSelection(msg: InnerChild.TestMessage)
      def props = Props[ParentalActor]
    }
    class ParentalActor extends Actor with ActorLogging {
      import ParentalActor._
      //在这里构建子级Actor supervisor
      val supervisor = context.actorOf(Supervisor.props,"supervisor")
      supervisor ! BackoffSupervisor.getCurrentChild //要求supervisor返回当前子级Actor
      var innerChild: Option[ActorRef] = None   //返回的当前子级ActorRef
      val selectedChild = context.actorSelection("/user/parent/supervisor/innerChild")
      override def receive: Receive = {
        case BackoffSupervisor.CurrentChild(ref) =>   //收到子级Actor信息
          innerChild = ref
        case SendToSupervisor(msg) => supervisor ! msg
        case SendToChildSelection(msg) => selectedChild ! msg
        case SendToInnerChild(msg) => innerChild foreach(child => child ! msg)
      }
    
    }
    object BackoffSupervisorDemo extends App {
      import ParentalActor._
      val testSystem = ActorSystem("testSystem")
      val parent = testSystem.actorOf(ParentalActor.props,"parent")
    
      Thread.sleep(1000)   //wait for BackoffSupervisor.CurrentChild(ref) received
    
      parent ! SendToSupervisor(TestMessage("Hello message 1 to supervisor"))
      parent ! SendToInnerChild(TestMessage("Hello message 2 to innerChild"))
      parent ! SendToChildSelection(TestMessage("Hello message 3 to selectedChild"))
    
    
      scala.io.StdIn.readLine()
    
      testSystem.terminate()
    
    }

    在上面的例子里我们分别向supervisor,innerChild,selectedChild发送消息。但所有消息都是由InnerChild响应的,如下:

    [INFO] [05/29/2017 16:11:48.167] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/parent/supervisor/innerChild] Child received message: Hello message 1 to supervisor
    [INFO] [05/29/2017 16:11:48.177] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/parent/supervisor/innerChild] Child received message: Hello message 2 to innerChild
    [INFO] [05/29/2017 16:11:48.179] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/parent/supervisor/innerChild] Child received message: Hello message 3 to selectedChild

    上面我们向supervisor发送了一个BackoffSupervisor.GetCurrentChild消息用来获取子级Actor。BackoffSupervisor是这样处理下面几个特殊消息的:

    private[akka] trait HandleBackoff { this: Actor ⇒
      def childProps: Props
      def childName: String
      def reset: BackoffReset
    
      var child: Option[ActorRef] = None
      var restartCount = 0
    
      import BackoffSupervisor._
      import context.dispatcher
    
      override def preStart(): Unit = startChild()
    
      def startChild(): Unit = {
        if (child.isEmpty) {
          child = Some(context.watch(context.actorOf(childProps, childName)))
        }
      }
    
      def handleBackoff: Receive = {
        case StartChild ⇒
          startChild()
          reset match {
            case AutoReset(resetBackoff) ⇒
              val _ = context.system.scheduler.scheduleOnce(resetBackoff, self, ResetRestartCount(restartCount))
            case _ ⇒ // ignore
          }
    
        case Reset ⇒
          reset match {
            case ManualReset ⇒ restartCount = 0
            case msg         ⇒ unhandled(msg)
          }
    
        case ResetRestartCount(current) ⇒
          if (current == restartCount) {
            restartCount = 0
          }
    
        case GetRestartCount ⇒
          sender() ! RestartCount(restartCount)
    
        case GetCurrentChild ⇒
          sender() ! CurrentChild(child)
    
        case msg if child.contains(sender()) ⇒
          // use the BackoffSupervisor as sender
          context.parent ! msg
    
        case msg ⇒ child match {
          case Some(c) ⇒ c.forward(msg)
          case None    ⇒ context.system.deadLetters.forward(msg)
        }
      }
    }

    在handleBackoff函数里可以找到这些消息的处理方式。

    在构建上面例子里的Supervisor的Props时定义了监管策略(SupervisorStrategy)对InnerChild产生的异常ChildException进行Restart处理。我们调整一下InnerChild代码来随机产生一些异常:

    object InnerChild {
      case class TestMessage(msg: String)
      class ChildException(val errmsg: TestMessage) extends Exception
      object CException {  //for pattern match of class with parameter
        def apply(msg: TestMessage) = new ChildException(msg)
        def unapply(cex: ChildException) = Some(cex.errmsg)
      }
      def props = Props[InnerChild]
    }
    class InnerChild extends Actor with ActorLogging {
      import InnerChild._
      context.parent ! BackoffSupervisor.Reset  //reset backoff counts
      override def receive: Receive = {
        case TestMessage(msg) => //模拟子级功能
          if (Random.nextBoolean())   //任意产生异常
            throw new ChildException(TestMessage(msg))
          else
            log.info(s"Child received message: ${msg}")
      }
    }

    我们用Random.nextBoolean来任意产生一些异常。注意:我们同时把ChildException改成了一个带参数的class,因为我们可能需要在重启之前获取造成异常的消息,如下:

        def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = {
          case InnerChild.CException(tmsg) =>
            println(s"Message causing exception: ${tmsg.msg}") //we can extract message here
            SupervisorStrategy.Restart
        }

    所有信息发给supervisor就行了:

    class ParentalActor extends Actor with ActorLogging {
      import ParentalActor._
      //在这里构建子级Actor supervisor
      val supervisor = context.actorOf(Supervisor.props,"supervisor")
      override def receive: Receive = {
        case msg@ _ => supervisor ! msg
      }
    
    }
    object BackoffSupervisorDemo extends App {
      import ParentalActor._
      import InnerChild._
      val testSystem = ActorSystem("testSystem")
      val parent = testSystem.actorOf(ParentalActor.props,"parent")
      
      parent ! TestMessage("Hello message 1 to supervisor")
      parent ! TestMessage("Hello message 2 to supervisor")
      parent ! TestMessage("Hello message 3 to supervisor")
      parent ! TestMessage("Hello message 4 to supervisor")
      parent ! TestMessage("Hello message 5 to supervisor")
      parent ! TestMessage("Hello message 6 to supervisor")
    
    
      scala.io.StdIn.readLine()
    
      testSystem.terminate()
    
    }

    运行后发现在出现异常后所有消息都变成了DeadLetter:

    [INFO] [05/29/2017 18:22:11.689] [testSystem-akka.actor.default-dispatcher-5] [akka://testSystem/user/parent/supervisor/innerChild] Message [backoffSupervisorDemo.InnerChild$TestMessage] from Actor[akka://testSystem/user/parent#2140150413] to Actor[akka://testSystem/user/parent/supervisor/innerChild#-1047097634] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
    ....

    这也证明了BackoffSupervisor具有不同的Restart处理方式,好像是直接终止InnerChild而非正常的挂起,销毁了ActorRef和邮箱,所以在完成启动之前发给InnerChild的消息都被导入DeadLetter队列了。也就是说不但错过造成异常的消息,而是跳过了下面启动时间段内所有的消息。

    下面我们来解决失踪消息的问题:首先是如何重新发送造成异常的消息,我们可以在监管策略中重启前发送:

        def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = {
          case InnerChild.CException(tmsg) =>
            println(s"Message causing exception: ${tmsg.msg}") //we can extract message here
            BackoffSupervisorDemo.sendToParent(tmsg)  //resend message 
            SupervisorStrategy.Restart
        }

    在BackoffSupervisorDemo里先声明sendToParent函数:

      def sendToParent(msg: TestMessage) = parent ! msg

    然后再想办法把DeadLetter捞出来。我们可以用Akka的eventStream来订阅DeadLetter类型消息:

    object DeadLetterMonitor {
      def props(parentRef: ActorRef) = Props(new DeadLetterMonitor(parentRef))
    }
    class DeadLetterMonitor(receiver: ActorRef) extends Actor with ActorLogging {
      import InnerChild._
      import context.dispatcher
      override def receive: Receive = {
        case DeadLetter(msg,sender,_) =>
          //wait till InnerChild finishes restart then resend
          context.system.scheduler.scheduleOnce(1 second,receiver,msg.asInstanceOf[TestMessage])
      }
    }
    object BackoffSupervisorDemo extends App {
      import ParentalActor._
      import InnerChild._
    
      def sendToParent(msg: TestMessage) = parent ! msg
    
      val testSystem = ActorSystem("testSystem")
      val parent = testSystem.actorOf(ParentalActor.props,"parent")
    
      val deadLetterMonitor = testSystem.actorOf(DeadLetterMonitor.props(parent),"dlmonitor")
      testSystem.eventStream.subscribe(deadLetterMonitor,classOf[DeadLetter]) //listen to DeadLetter
    
      parent ! TestMessage("Hello message 1 to supervisor")
      parent ! TestMessage("Hello message 2 to supervisor")
      parent ! TestMessage("Hello message 3 to supervisor")
      parent ! TestMessage("Hello message 4 to supervisor")
      parent ! TestMessage("Hello message 5 to supervisor")
      parent ! TestMessage("Hello message 6 to supervisor")
    
    
      scala.io.StdIn.readLine()
    
      testSystem.terminate()
    
    }

    试运算后显示InnerChild成功处理了所有6条消息。

    下面是本次讨论的完整示范代码:

    package backoffSupervisorDemo
    import akka.actor._
    import akka.pattern._
    import scala.util.Random
    
    import scala.concurrent.duration._
    
    object InnerChild {
      case class TestMessage(msg: String)
      class ChildException(val errmsg: TestMessage) extends Exception
      object CException {  //for pattern match of class with parameter
        def apply(msg: TestMessage) = new ChildException(msg)
        def unapply(cex: ChildException) = Some(cex.errmsg)
      }
      def props = Props[InnerChild]
    }
    class InnerChild extends Actor with ActorLogging {
      import InnerChild._
      context.parent ! BackoffSupervisor.Reset  //reset backoff counts
      override def receive: Receive = {
        case TestMessage(msg) => //模拟子级功能
          if (Random.nextBoolean())   //任意产生异常
            throw new ChildException(TestMessage(msg))
          else
            log.info(s"Child received message: ${msg}")
      }
    }
    object Supervisor {
      def props: Props = { //在这里定义了监管策略和child Actor构建
        def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = {
          case InnerChild.CException(tmsg) =>
            println(s"Message causing exception: ${tmsg.msg}") //we can extract message here
            BackoffSupervisorDemo.sendToParent(tmsg)  //resend message
            SupervisorStrategy.Restart
        }
    
        val options = Backoff.onFailure(InnerChild.props, "innerChild", 1 second, 5 seconds, 0.0)
          .withManualReset
          .withSupervisorStrategy(
            OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(
              decider.orElse(SupervisorStrategy.defaultDecider)
            )
          )
        BackoffSupervisor.props(options)
      }
    }
    //注意:下面是Supervisor的父级,不是InnerChild的父级
    object ParentalActor {
      case class SendToSupervisor(msg: InnerChild.TestMessage)
      case class SendToInnerChild(msg: InnerChild.TestMessage)
      case class SendToChildSelection(msg: InnerChild.TestMessage)
      def props = Props[ParentalActor]
    }
    class ParentalActor extends Actor with ActorLogging {
      import ParentalActor._
      //在这里构建子级Actor supervisor
      val supervisor = context.actorOf(Supervisor.props,"supervisor")
      override def receive: Receive = {
        case msg@ _ => supervisor ! msg
      }
    
    }
    object DeadLetterMonitor {
      def props(parentRef: ActorRef) = Props(new DeadLetterMonitor(parentRef))
    }
    class DeadLetterMonitor(receiver: ActorRef) extends Actor with ActorLogging {
      import InnerChild._
      import context.dispatcher
      override def receive: Receive = {
        case DeadLetter(msg,sender,_) =>
          //wait till InnerChild finishes restart then resend
          context.system.scheduler.scheduleOnce(1 second,receiver,msg.asInstanceOf[TestMessage])
      }
    }
    object BackoffSupervisorDemo extends App {
      import ParentalActor._
      import InnerChild._
    
      def sendToParent(msg: TestMessage) = parent ! msg
    
      val testSystem = ActorSystem("testSystem")
      val parent = testSystem.actorOf(ParentalActor.props,"parent")
    
      val deadLetterMonitor = testSystem.actorOf(DeadLetterMonitor.props(parent),"dlmonitor")
      testSystem.eventStream.subscribe(deadLetterMonitor,classOf[DeadLetter]) //listen to DeadLetter
    
      parent ! TestMessage("Hello message 1 to supervisor")
      parent ! TestMessage("Hello message 2 to supervisor")
      parent ! TestMessage("Hello message 3 to supervisor")
      parent ! TestMessage("Hello message 4 to supervisor")
      parent ! TestMessage("Hello message 5 to supervisor")
      parent ! TestMessage("Hello message 6 to supervisor")
    
    
      scala.io.StdIn.readLine()
    
      testSystem.terminate()
    
    }

     

     

     

     

  • 相关阅读:
    编译OpenSLL windows xp版本
    IGES简单介绍
    STEP标准的简单介绍
    Git工具使用遇到的一些问题记录
    THULAC:一个高效的中文词法分析工具包
    【转】OnDropFiles 可能无响应的问题
    【转】OnPaint()函数的作用原理
    关于Oracle连接异常--添加、修改账户信息
    js生成64位hash码
    关于 VUE 页面跳转
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/6918830.html
Copyright © 2011-2022 走看看