zoukankan      html  css  js  c++  java
  • akka-typed(1)

       akka-typed的actor从创建、启用、状态转换、停用、监视等生命周期管理方式和akka-classic还是有一定的不同之处。这篇我们就介绍一下akka-typed的actor生命周期管理。

    每一种actor都是通过定义它的行为属性behavior形成模版,然后由对上一层的父辈actor用spawn方法产生actor实例的。产生的actor实例加入一个系统的由上至下树形结构,直接在spawn产生自己的父辈之下。akka-typed的守护guardian-actor,即根部root-actor是通过在定义ActorSystem时指定并产生的。如下:

        val config = ConfigFactory.load("application.conf")
        val man: ActorSystem[GreetStarter.Command] = ActorSystem(GreetStarter(), "greetDemo",config)
        man ! GreetStarter.RepeatedGreeting("Tiger",1.seconds)

    在某种意义上,这个ActorSystem实例man就代表root-actor。我们可以向man发送消息然后由GreetStarter的behavior用自己的ActorContext进行spawn,stop,watch及分派计算任务等,其实就是一个程序的集线器:

      object GreetStarter {
        import Messages._
        def apply(): Behavior[SayHi] = {
          Behaviors.setup { ctx =>
            val props = DispatcherSelector.fromConfig("akka.actor.default-blocking-io-dispatcher")
            val helloActor = ctx.spawn(HelloActor(), "hello-actor",props)
            val greeter = ctx.spawn(Greeter(helloActor), "greeter")
            ctx.watch(greeter)
            ctx.watchWith(helloActor,StopWorker("something happend"))
            Behaviors.receiveMessage { who =>
              if (who.name == "stop") {
                ctx.stop(helloActor)
                ctx.stop(greeter)
                Behaviors.stopped
              } else {
                greeter ! who
                Behaviors.same
              }
            }
          }
        }
      }

    但是,总有时候我们需要在root-actor的ActorContext之外来进行一些制造、使用actor的操作。下面这个官方文档上的例子是很好的示范:

    import akka.actor.typed.Behavior
    import akka.actor.typed.SpawnProtocol
    import akka.actor.typed.scaladsl.Behaviors
    import akka.actor.typed.scaladsl.LoggerOps
    
    object HelloWorldMain {
      def apply(): Behavior[SpawnProtocol.Command] =
        Behaviors.setup { context =>
          // Start initial tasks
          // context.spawn(...)
    
          SpawnProtocol()
        }
    }
    
    object Main extends App {
    implicit val system: ActorSystem[SpawnProtocol.Command] =
      ActorSystem(HelloWorldMain(), "hello")
    
    // needed in implicit scope for ask (?)
    import akka.actor.typed.scaladsl.AskPattern._
    implicit val ec: ExecutionContext = system.executionContext
    implicit val timeout: Timeout = Timeout(3.seconds)
    
    val greeter: Future[ActorRef[HelloWorld.Greet]] =
      system.ask(SpawnProtocol.Spawn(behavior = HelloWorld(), name = "greeter", props = Props.empty, _))
    
    val greetedBehavior = Behaviors.receive[HelloWorld.Greeted] { (context, message) =>
      context.log.info2("Greeting for {} from {}", message.whom, message.from)
      Behaviors.stopped
    }
    
    val greetedReplyTo: Future[ActorRef[HelloWorld.Greeted]] =
      system.ask(SpawnProtocol.Spawn(greetedBehavior, name = "", props = Props.empty, _))
    
    for (greeterRef <- greeter; replyToRef <- greetedReplyTo) {
      greeterRef ! HelloWorld.Greet("Akka", replyToRef)
    }
    ...
    }

    可以看到所有操作都在actor框架之外进行的。这个SpawnProtocol本身就是一个actor,如下:

    object SpawnProtocol {
    
    ...
      final case class Spawn[T](behavior: Behavior[T], name: String, props: Props, replyTo: ActorRef[ActorRef[T]])
          extends Command
    ...
      def apply(): Behavior[Command] =
        Behaviors.receive { (ctx, msg) =>
          msg match {
            case Spawn(bhvr, name, props, replyTo) =>
              val ref =
                if (name == null || name.equals(""))
                  ctx.spawnAnonymous(bhvr, props)
                else {
    
                  @tailrec def spawnWithUniqueName(c: Int): ActorRef[Any] = {
                    val nameSuggestion = if (c == 0) name else s"$name-$c"
                    ctx.child(nameSuggestion) match {
                      case Some(_) => spawnWithUniqueName(c + 1) // already taken, try next
                      case None    => ctx.spawn(bhvr, nameSuggestion, props)
                    }
                  }
    
                  spawnWithUniqueName(0)
                }
              replyTo ! ref
              Behaviors.same
          }
        }
    
    }

    外界通过发送Spawn消息来指定产生新的actor。

    actor的状态切换就是从一种behavior转到另一种behavior。我们可以自定义behavior或者用现成的Behaviors.???。如果只是涉及内部变量变化,那么可以直接生成带着变量的当前behavior,如下:

    object HelloWorldBot {
    
      def apply(max: Int): Behavior[HelloWorld.Greeted] = {
        bot(0, max)
      }
    
      private def bot(greetingCounter: Int, max: Int): Behavior[HelloWorld.Greeted] =
        Behaviors.receive { (context, message) =>
          val n = greetingCounter + 1
          context.log.info2("Greeting {} for {}", n, message.whom)
          if (n == max) {
            Behaviors.stopped
          } else {
            message.from ! HelloWorld.Greet(message.whom, context.self)
            bot(n, max)
          }
        }
    }

    actor停用可以由直属父辈actor的ActorContext.stop或者自身的Behaviors.stopped来实现。Behaviors.stopped可以带入一个清理函数。在actor完全停止之前进行一些清理操作: 

    object MasterControlProgram {
      sealed trait Command
      final case class SpawnJob(name: String) extends Command
      case object GracefulShutdown extends Command
    
      // Predefined cleanup operation
      def cleanup(log: Logger): Unit = log.info("Cleaning up!")
    
      def apply(): Behavior[Command] = {
        Behaviors
          .receive[Command] { (context, message) =>
            message match {
              case SpawnJob(jobName) =>
                context.log.info("Spawning job {}!", jobName)
                context.spawn(Job(jobName), name = jobName)
                Behaviors.same
              case GracefulShutdown =>
                context.log.info("Initiating graceful shutdown...")
                // perform graceful stop, executing cleanup before final system termination
                // behavior executing cleanup is passed as a parameter to Actor.stopped
                Behaviors.stopped { () =>
                  cleanup(context.system.log)
                }
            }
          }
          .receiveSignal {
            case (context, PostStop) =>
              context.log.info("Master Control Program stopped")
              Behaviors.same
          }
      }
    }

    实际上一个actor转入停用stop状态可以在另一个作为监视actor的receiveSignal获取,如下:

      object GreetStarter {
        import Messages._
        def apply(): Behavior[SayHi] = {
          Behaviors.setup { ctx =>
            val props = DispatcherSelector.fromConfig("akka.actor.default-blocking-io-dispatcher")
            val helloActor = ctx.spawn(HelloActor(), "hello-actor",props)
            val greeter = ctx.spawn(Greeter(helloActor), "greeter")
            ctx.watch(greeter)
            ctx.watchWith(helloActor,StopWorker("something happend"))
            Behaviors.receiveMessage { who =>
              if (who.name == "stop") {
                ctx.stop(helloActor)
                ctx.stop(greeter)
                Behaviors.stopped
              } else {
                greeter ! who
                Behaviors.same
              }
            }.receiveSignal {
                case (context, Terminated(ref)) =>
                  context.log.info("{} stopped!", ref.path.name)
                  Behaviors.same
              }
          }
        }
      }

    下面是.receiveSignal函数及其捕获的Signal消息:

      trait Receive[T] extends Behavior[T] {
        def receiveSignal(onSignal: PartialFunction[(ActorContext[T], Signal), Behavior[T]]): Behavior[T]
      }
    
    
    
    trait Signal
    
    /**
     * Lifecycle signal that is fired upon restart of the Actor before replacing
     * the behavior with the fresh one (i.e. this signal is received within the
     * behavior that failed).
     */
    sealed abstract class PreRestart extends Signal
    case object PreRestart extends PreRestart {
      def instance: PreRestart = this
    }
    
    /**
     * Lifecycle signal that is fired after this actor and all its child actors
     * (transitively) have terminated. The [[Terminated]] signal is only sent to
     * registered watchers after this signal has been processed.
     */
    sealed abstract class PostStop extends Signal
    // comment copied onto object for better hints in IDEs
    /**
     * Lifecycle signal that is fired after this actor and all its child actors
     * (transitively) have terminated. The [[Terminated]] signal is only sent to
     * registered watchers after this signal has been processed.
     */
    case object PostStop extends PostStop {
      def instance: PostStop = this
    }
    
    object Terminated {
      def apply(ref: ActorRef[Nothing]): Terminated = new Terminated(ref)
      def unapply(t: Terminated): Option[ActorRef[Nothing]] = Some(t.ref)
    }
  • 相关阅读:
    eclipse 中 debug-config
    release稳定版本/snapshot快照版本
    nginx.config文件配置
    用 Spring Boot 和 MybatisPlus 快速构建项目
    github 生成ssh key
    Vagrant安装virtualbox
    修改linux默认时区
    《加密与解密》笔记
    manjaro 安装显卡驱动
    排序算法-C++实现
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/12976199.html
Copyright © 2011-2022 走看看