zoukankan      html  css  js  c++  java
  • Akka(8): 分布式运算:Remoting-远程查找式

      Akka是一种消息驱动运算模式,它实现跨JVM程序运算的方式是通过能跨JVM的消息系统来调动分布在不同JVM上ActorSystem中的Actor进行运算,前题是Akka的地址系统可以支持跨JVM定位。Akka的消息系统最高境界可以实现所谓的Actor位置透明化,这样在Akka编程中就无须关注Actor具体在哪个JVM上运行,分布式Actor编程从方式上跟普通Actor编程就不会有什么区别了。Akka的Remoting是一种点对点的跨JVM消息通道,让一个JVM上ActorSystem中的某个Actor可以连接另一个JVM上ActorSystem中的另一个Actor。两个JVM上的ActorSystem之间只需具备TCP网络连接功能就可以实现Akka Remoting了。Akka-Remoting还没有实现完全的位置透明化,因为用户还必须在代码里或者配置文件里指明目标Actor的具体地址。

    Akka-Remoting提供了两种Actor之间的沟通方法:

    1、远程查找:通过路径Path查找在远程机上已经创建存在的Actor,获取ActorRef后进行沟通

    2、远程创建:在远程机上直接创建Actor作为沟通对象

    Akka-Remoting的主要应用应该是把一些任务部署到远程机上去运算。发起方(Local JVM)在这里面的主要作用是任务分配,有点像Akka-Router。我们可以用下面的例子来示范:模拟一个计算器,可以进行连续的加减乘除,保留累计结果。我们会把这个计算器部署到远程机上,然后从本机与之沟通分配运算任务及获取运算结果。这个计算器就是个简单的Actor:

    import akka.actor._
    
    object Calculator {
      sealed trait MathOps
      case class Num(dnum: Double) extends MathOps
      case class Add(dnum: Double) extends MathOps
      case class Sub(dnum: Double) extends MathOps
      case class Mul(dnum: Double) extends MathOps
      case class Div(dnum: Double) extends MathOps
    
      sealed trait CalcOps
      case object Clear extends CalcOps
      case object GetResult extends CalcOps
      
    }
    
    class Calcultor extends Actor {
      import Calculator._
      var result: Double = 0.0   //internal state
    
      override def receive: Receive = {
        case Num(d) => result = d
        case Add(d) => result += d
        case Sub(d) => result -= d
        case Mul(d) => result *= d
        case Div(d) => result = result / d
    
        case Clear => result = 0.0
        case GetResult =>
          sender() ! s"Result of calculation is: $result"
      }
    
    
    }

    就是一个简单的Actor实现,跟Remoting没什么关系。

    下面我们会在一个远程机上部署这个Calculator Actor。 先看看这个示范的项目结构:remoteLookup/build.sbt

    lazy val commonSettings = seq (
      name := "RemoteLookupDemo",
      version := "1.0",
      scalaVersion := "2.11.8",
      libraryDependencies := Seq(
        "com.typesafe.akka" %% "akka-actor" % "2.5.2",
        "com.typesafe.akka" %% "akka-remote" % "2.5.2"
      )
    )
    
    lazy val local = (project in file("."))
         .settings(commonSettings)
         .settings(
           name := "localSystem"
         ).aggregate(messages,remote).dependsOn(messages)
    
    lazy val messages = (project in file("messages"))
        .settings(commonSettings)
        .settings(
          name := "commands"
        )
    
    lazy val remote = (project in file("remote"))
        .settings(commonSettings)
        .settings(
          name := "remoteSystem"
        ).aggregate(messages).dependsOn(messages)

     在这里我们分了三个项目:local是主项目,messages和remote是分项目(subprojects)。messages里只有OpsMessages.scala一个源文件:

    package remoteLookup.messages
    
    object Messages {
      sealed trait MathOps
      case class Num(dnum: Double) extends MathOps
      case class Add(dnum: Double) extends MathOps
      case class Sub(dnum: Double) extends MathOps
      case class Mul(dnum: Double) extends MathOps
      case class Div(dnum: Double) extends MathOps
    
      sealed trait CalcOps
      case object Clear extends CalcOps
      case object GetResult extends CalcOps
    
    }

    我们看到:这个文件是把上面的Calculator支持的消息拆了出来。这是因为Calculator Actor会在另一个JVM remote上部署,而我们会从local JVM里向Calculator发送操作消息,所以Messages必须是local和remote共享的。这个要求我们通过dependOn(messages)实现了。现在Calculator是在remote项目里定义的:remote/Calculator.scala

    package remoteLookup.remote
    
    import akka.actor._
    import remoteLookup.messages.Messages._
    
    object CalcProps {
      def props = Props(new Calcultor)
    }
    
    class Calcultor extends Actor with ActorLogging {
    
      var result: Double = 0.0   //internal state
    
      override def receive: Receive = {
        case Num(d) => result = d
        case Add(d) => result += d
        case Sub(d) => result -= d
        case Mul(d) => result *= d
        case Div(d) =>
          val _ = result.toInt / d.toInt   //yield ArithmeticException
          result /= d
        case Clear => result = 0.0
        case GetResult =>
          sender() ! s"Result of calculation is: $result"
      }
    
      override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
        log.info(s"Restarting calculator: ${reason.getMessage}")
        super.preRestart(reason, message)
      }
    }

    由于ArithmeticException默认的处理策略SupervisorStrategy是Restart,一旦输入Div(0.0)时会重启将result清零。我们可以在remote上加一个Supervisor来把异常处理策略改为Resume。

    下面我们先在remote项目本地对Calculator的功能进行测试:remote/CalculatorRunner.scala

    package remoteLookup.remote
    import akka.actor._
    import akka.pattern._
    import remoteLookup.messages.Messages._
    
    import scala.concurrent.duration._
    
    class SupervisorActor extends Actor {
      def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
        case _: ArithmeticException => SupervisorStrategy.Resume
      }
    
      override def supervisorStrategy: SupervisorStrategy =
        OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){
          decider.orElse(SupervisorStrategy.defaultDecider)
        }
    
      val calcActor = context.actorOf(CalcProps.props,"calculator")
    
      override def receive: Receive = {
        case msg@ _ => calcActor.forward(msg)
      }
    
    }
    
    object CalculatorRunner extends App {
    
      val remoteSystem = ActorSystem("remoteSystem")
      val calcActor = remoteSystem.actorOf(Props[SupervisorActor],"supervisorActor")
    
      import remoteSystem.dispatcher
    
      calcActor ! Clear
      calcActor ! Num(13.0)
      calcActor ! Mul(1.5)
    
      implicit val timeout = akka.util.Timeout(1 second)
    
      ((calcActor ? GetResult).mapTo[String]) foreach println
      scala.io.StdIn.readLine()
    
      calcActor ! Div(0.0)
      calcActor ! Div(1.5)
      calcActor ! Add(100.0)
      ((calcActor ? GetResult).mapTo[String]) foreach println
      scala.io.StdIn.readLine()
    
      remoteSystem.terminate()
    
    }

    测试运算得出以下结果:

    Result of calculation is: 19.5
    
    Result of calculation is: 113.0
    [WARN] [06/20/2017 19:28:10.720] [remoteSystem-akka.actor.default-dispatcher-4] [akka://remoteSystem/user/parentActor/calculator] / by zero

    supervisorActor实现了它应有的功能。

    下面进行远程查找示范:首先,remote需要把Calculator向外发布。这可以通过配置文件设置实现:remote/src/main/resources/application.conf

    akka {
      actor {
        provider = remote 
      }
      remote {
        enabled-transports = ["akka.remote.netty.tcp"]
        netty.tcp {
          hostname = "127.0.0.1"
          port = 2552
        }
        log-sent-messages = on
        log-received-messages = on
      }
    }

    上面这段的意思是:所有向外公开Actor的地址前缀为:akka.tcp://remoteSystem@127.0.0.1:2552/user/???

    那么Calculator的完整地址path应该就是:akka.tcp://remoteSystem@127.0.0.1:2552/user/supervisorActor/calculator

    Akka-Remoting提供了两种远程查找方式:actorSelection.resolveOne方法和Identify消息确认。无论如何,local都需要进行Remoting配置: local/src/main/resources/application.conf

    akka {
      actor {
        provider = remote
      }
      remote {
        enabled-transports = ["akka.remote.netty.tcp"]
        netty.tcp {
          hostname = "127.0.0.1"
          port = 0
        }
      }
    }

    port=0的意思是由系统自动选择任何可用的端口。现在我们完成了Remoting设置,也得到了在远程机上Calculator的具体地址,应该足够进行远程Actor沟通了。我们先用actorSelection.resolveOne示范。resolveOne源代码如下:

      /**
       * Resolve the [[ActorRef]] matching this selection.
       * The result is returned as a Future that is completed with the [[ActorRef]]
       * if such an actor exists. It is completed with failure [[ActorNotFound]] if
       * no such actor exists or the identification didn't complete within the
       * supplied `timeout`.
       *
       * Under the hood it talks to the actor to verify its existence and acquire its
       * [[ActorRef]].
       */
      def resolveOne()(implicit timeout: Timeout): Future[ActorRef] = {
        implicit val ec = ExecutionContexts.sameThreadExecutionContext
        val p = Promise[ActorRef]()
        this.ask(Identify(None)) onComplete {
          case Success(ActorIdentity(_, Some(ref))) ⇒ p.success(ref)
          case _                                    ⇒ p.failure(ActorNotFound(this))
        }
        p.future
      }

    resolveOne返回Future[ActorRef],我们可以用Future的函数组件(combinator)来操作:localAccessDemo.scala

    import akka.actor._
    import akka.util.Timeout
    import scala.concurrent.duration._
    import akka.pattern._
    import remoteLookup.messages.Messages._
    
    object LocalSelectionDemo extends App {
    
    
      val localSystem = ActorSystem("localSystem")
      import localSystem.dispatcher
    
      val path = "akka.tcp://remoteSystem@127.0.0.1:2552/user/supervisorActor/calculator"
    
    
          implicit val timeout = Timeout(5 seconds)
          for (calcActor : ActorRef <- localSystem.actorSelection(path).resolveOne()) {
    
            calcActor ! Clear
            calcActor ! Num(13.0)
            calcActor ! Mul(1.5)
            ((calcActor ? GetResult).mapTo[String]) foreach println
    
            calcActor ! Div(0.0)
            calcActor ! Div(1.5)
            calcActor ! Add(100.0)
            ((calcActor ? GetResult).mapTo[String]) foreach println
    
          }
    
    
      scala.io.StdIn.readLine()
      localSystem.terminate()
    
    }

    因为resolveOne返回的是个Future[x],我们可以用for来对嵌在Future内的x进行操作。现在remoteSystem只需要构建Calculator待用就行了:remote/CalculatorRunner.scala

    package remoteLookup.remote
    import akka.actor._
    import akka.pattern._
    import remoteLookup.messages.Messages._
    
    import scala.concurrent.duration._
    
    class SupervisorActor extends Actor {
      def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
        case _: ArithmeticException => SupervisorStrategy.Resume
      }
    
      override def supervisorStrategy: SupervisorStrategy =
        OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){
          decider.orElse(SupervisorStrategy.defaultDecider)
        }
    
      val calcActor = context.actorOf(CalcProps.props,"calculator")
    
      override def receive: Receive = {
        case msg@ _ => calcActor.forward(msg)
      }
    
    }
    
    object CalculatorRunner extends App {
    
      val remoteSystem = ActorSystem("remoteSystem")
      val calcActor = remoteSystem.actorOf(Props[SupervisorActor],"supervisorActor")
    /*
      import remoteSystem.dispatcher
    
      calcActor ! Clear
      calcActor ! Num(13.0)
      calcActor ! Mul(1.5)
    
      implicit val timeout = akka.util.Timeout(1 second)
    
      ((calcActor ? GetResult).mapTo[String]) foreach println
      scala.io.StdIn.readLine()
    
      calcActor ! Div(0.0)
      calcActor ! Div(1.5)
      calcActor ! Add(100.0)
      ((calcActor ? GetResult).mapTo[String]) foreach println
      */
      scala.io.StdIn.readLine()
      remoteSystem.terminate()
    
    }

    注意:注销的操作转移到了localSelectionDemo里。

    先运行remote项目:

    INFO] [06/20/2017 21:24:37.955] [main] [akka.remote.Remoting] Starting remoting
    [INFO] [06/20/2017 21:24:38.091] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://remoteSystem@127.0.0.1:2552]
    [INFO] [06/20/2017 21:24:38.092] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://remoteSystem@127.0.0.1:2552]

    remoteSystem开始监视配置的公开地址。

    用sbt run 运行local:

    Result of calculation is: 19.5
    Result of calculation is: 113.0

    结果正确。supervisorActor的SupervisorStrategy起到了应有的作用。

    remote项目输出显示也能证明:

    [INFO] [06/20/2017 21:24:37.955] [main] [akka.remote.Remoting] Starting remoting
    [INFO] [06/20/2017 21:24:38.091] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://remoteSystem@127.0.0.1:2552]
    [INFO] [06/20/2017 21:24:38.092] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://remoteSystem@127.0.0.1:2552]
    [WARN] [06/20/2017 21:27:06.330] [remoteSystem-akka.actor.default-dispatcher-4] [akka://remoteSystem/user/supervisorActor/calculator] / by zero
    [ERROR] [06/20/2017 21:27:34.176] [remoteSystem-akka.remote.default-remote-dispatcher-5] [akka.tcp://remoteSystem@127.0.0.1:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FlocalSystem%40127.0.0.1%3A60601-0/endpointWriter] AssociationError [akka.tcp://remoteSystem@127.0.0.1:2552] <- [akka.tcp://localSystem@127.0.0.1:60601]: Error [Shut down address: akka.tcp://localSystem@127.0.0.1:60601] [
    akka.remote.ShutDownAssociation: Shut down address: akka.tcp://localSystem@127.0.0.1:60601
    Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down.
    ] 

     下面我们试着用Identify消息确认方式来复演上述例子。Akka是如下这样定义有关Identify消息确认的:

    /**
     * A message all Actors will understand, that when processed will reply with
     * [[akka.actor.ActorIdentity]] containing the `ActorRef`. The `messageId`
     * is returned in the `ActorIdentity` message as `correlationId`.
     */
    @SerialVersionUID(1L)
    final case class Identify(messageId: Any) extends AutoReceivedMessage with NotInfluenceReceiveTimeout
    
    /**
     * Reply to [[akka.actor.Identify]]. Contains
     * `Some(ref)` with the `ActorRef` of the actor replying to the request or
     * `None` if no actor matched the request.
     * The `correlationId` is taken from the `messageId` in
     * the `Identify` message.
     */
    @SerialVersionUID(1L)
    final case class ActorIdentity(correlationId: Any, ref: Option[ActorRef]) {
      if (ref.isDefined && ref.get == null) {
        throw new IllegalArgumentException("ActorIdentity created with ref = Some(null) is not allowed, " +
          "this could happen when serializing with Scala 2.12 and deserializing with Scala 2.11 which is not supported.")
      }
    
      /**
       * Java API: `ActorRef` of the actor replying to the request or
       * null if no actor matched the request.
       */
      @deprecated("Use getActorRef instead", "2.5.0")
      def getRef: ActorRef = ref.orNull
    
      /**
       * Java API: `ActorRef` of the actor replying to the request or
       * not defined if no actor matched the request.
       */
      def getActorRef: Optional[ActorRef] = {
        import scala.compat.java8.OptionConverters._
        ref.asJava
      }
    }

    如果拿上面的例子,我们就会向远程机上的Calculator地址发送Identify(path),而Calculator返回ActorIdentity消息,参数包括correlationId = path, ref = Calculator的ActorRef。 下面是使用示范代码:

    object LocalIdentifyDemo extends App {
    
      class RemoteCalc extends Actor with ActorLogging {
    
        val path = "akka.tcp://remoteSystem@127.0.0.1:2552/user/supervisorActor/calculator"
    
        context.actorSelection(path) ! Identify(path)  //send req for ActorRef
    
        import context.dispatcher
        implicit val timeout = Timeout(5 seconds)
    
        override def receive: Receive = {
          case ActorIdentity(p,someRef) if p.equals(path) => 
            someRef foreach { calcActor =>
    
              calcActor ! Clear
              calcActor ! Num(13.0)
              calcActor ! Mul(1.5)
              ((calcActor ? GetResult).mapTo[String]) foreach println
    
              calcActor ! Div(0.0)
              calcActor ! Div(1.5)
              calcActor ! Add(100.0)
              ((calcActor ? GetResult).mapTo[String]) foreach println
    
            }
        }
    
      }
    
      val localSystem = ActorSystem("localSystem")
      val localActor = localSystem.actorOf(Props[RemoteCalc],"localActor")
    
      scala.io.StdIn.readLine()
      localSystem.terminate()
    
    }

    Identify消息确认机制是一种Actor沟通模式,所以我们需要构建一个RemoteCalc Actor,把程序包嵌在这个Actor里面。当receive收到确认消息ActorIdentity后获取ActorRef运算程序。

    查看运算结果,正确。

    下面是这次示范的完整源代码:

    build.sbt

    lazy val commonSettings = seq (
      name := "RemoteLookupDemo",
      version := "1.0",
      scalaVersion := "2.11.8",
      libraryDependencies := Seq(
        "com.typesafe.akka" %% "akka-actor" % "2.5.2",
        "com.typesafe.akka" %% "akka-remote" % "2.5.2"
      )
    )
    
    
    lazy val local = (project in file("."))
         .settings(commonSettings)
         .settings(
           name := "remoteLookupDemo"
         ).aggregate(messages,remote).dependsOn(messages)
    
    lazy val messages = (project in file("messages"))
        .settings(commonSettings)
        .settings(
          name := "commands"
        )
    
    lazy val remote = (project in file("remote"))
        .settings(commonSettings)
        .settings(
          name := "remoteSystem"
        ).aggregate(messages).dependsOn(messages)

    messages/OpsMessages.scala

    package remoteLookup.messages
    
    object Messages {
      sealed trait MathOps
      case class Num(dnum: Double) extends MathOps
      case class Add(dnum: Double) extends MathOps
      case class Sub(dnum: Double) extends MathOps
      case class Mul(dnum: Double) extends MathOps
      case class Div(dnum: Double) extends MathOps
    
      sealed trait CalcOps
      case object Clear extends CalcOps
      case object GetResult extends CalcOps
    
    }

    remote/src/main/resources/application.conf

    akka {
      actor {
        provider = remote
      }
      remote {
        enabled-transports = ["akka.remote.netty.tcp"]
        netty.tcp {
          hostname = "127.0.0.1"
          port = 2552
        }
        log-sent-messages = on
        log-received-messages = on
      }
    }

    remote/Calculator.scala

    package remoteLookup.remote
    
    import akka.actor._
    import remoteLookup.messages.Messages._
    
    object CalcProps {
      def props = Props(new Calcultor)
    }
    
    class Calcultor extends Actor with ActorLogging {
    
      var result: Double = 0.0   //internal state
    
      override def receive: Receive = {
        case Num(d) => result = d
        case Add(d) => result += d
        case Sub(d) => result -= d
        case Mul(d) => result *= d
        case Div(d) =>
          val _ = result.toInt / d.toInt   //yield ArithmeticException
          result /= d
        case Clear => result = 0.0
        case GetResult =>
          sender() ! s"Result of calculation is: $result"
      }
    
      override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
        log.info(s"Restarting calculator: ${reason.getMessage}")
        super.preRestart(reason, message)
      }
    }

    remote/CalculatorRunner.scala

    package remoteLookup.remote
    import akka.actor._
    import akka.pattern._
    import remoteLookup.messages.Messages._
    
    import scala.concurrent.duration._
    
    class SupervisorActor extends Actor {
      def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
        case _: ArithmeticException => SupervisorStrategy.Resume
      }
    
      override def supervisorStrategy: SupervisorStrategy =
        OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){
          decider.orElse(SupervisorStrategy.defaultDecider)
        }
    
      val calcActor = context.actorOf(CalcProps.props,"calculator")
    
      override def receive: Receive = {
        case msg@ _ => calcActor.forward(msg)
      }
    
    }
    
    object CalculatorRunner extends App {
    
      val remoteSystem = ActorSystem("remoteSystem")
      val calcActor = remoteSystem.actorOf(Props[SupervisorActor],"supervisorActor")
    /*
      import remoteSystem.dispatcher
    
      calcActor ! Clear
      calcActor ! Num(13.0)
      calcActor ! Mul(1.5)
    
      implicit val timeout = akka.util.Timeout(1 second)
    
      ((calcActor ? GetResult).mapTo[String]) foreach println
      scala.io.StdIn.readLine()
    
      calcActor ! Div(0.0)
      calcActor ! Div(1.5)
      calcActor ! Add(100.0)
      ((calcActor ? GetResult).mapTo[String]) foreach println
      */
      scala.io.StdIn.readLine()
      remoteSystem.terminate()
    
    }

    local/src/main/resources/application.conf

    akka {
      actor {
        provider = remote
      }
      remote {
        enabled-transports = ["akka.remote.netty.tcp"]
        netty.tcp {
          hostname = "127.0.0.1"
          port = 0
        }
      }
    }

    local/localAccessDemo.scala

    import akka.actor._
    import akka.util.Timeout
    import scala.concurrent.duration._
    import akka.pattern._
    import remoteLookup.messages.Messages._
    
    object LocalSelectionDemo extends App {
    
    
      val localSystem = ActorSystem("localSystem")
      import localSystem.dispatcher
    
      val path = "akka.tcp://remoteSystem@127.0.0.1:2552/user/supervisorActor/calculator"
    
    
          implicit val timeout = Timeout(5 seconds)
          for (calcActor : ActorRef <- localSystem.actorSelection(path).resolveOne()) {
    
            calcActor ! Clear
            calcActor ! Num(13.0)
            calcActor ! Mul(1.5)
            ((calcActor ? GetResult).mapTo[String]) foreach println
    
            calcActor ! Div(0.0)
            calcActor ! Div(1.5)
            calcActor ! Add(100.0)
            ((calcActor ? GetResult).mapTo[String]) foreach println
    
          }
    
      scala.io.StdIn.readLine()
      localSystem.terminate()
    
    }
    
    object LocalIdentifyDemo extends App {
    
      class RemoteCalc extends Actor with ActorLogging {
    
        val path = "akka.tcp://remoteSystem@127.0.0.1:2552/user/supervisorActor/calculator"
    
        context.actorSelection(path) ! Identify(path)  //semd req for ActorRef
    
        import context.dispatcher
        implicit val timeout = Timeout(5 seconds)
    
        override def receive: Receive = {
          case ActorIdentity(p,someRef) if p.equals(path) =>
            someRef foreach { calcActor =>
    
              calcActor ! Clear
              calcActor ! Num(13.0)
              calcActor ! Mul(1.5)
              ((calcActor ? GetResult).mapTo[String]) foreach println
    
              calcActor ! Div(0.0)
              calcActor ! Div(1.5)
              calcActor ! Add(100.0)
              ((calcActor ? GetResult).mapTo[String]) foreach println
    
            }
        }
    
      }
    
      val localSystem = ActorSystem("localSystem")
      val localActor = localSystem.actorOf(Props[RemoteCalc],"localActor")
    
      scala.io.StdIn.readLine()
      localSystem.terminate()
    
    }

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

  • 相关阅读:
    融云使用
    cocoaPods使用
    电脑硬件
    拖图UI和纯代码UI
    7.2内存管理-ARC
    7内存管理-MRC
    数据刷新
    5.1音频播放
    2.6核心动画
    Git常用操作
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/7057788.html
Copyright © 2011-2022 走看看