zoukankan      html  css  js  c++  java
  • Akka(11): 分布式运算:集群-均衡负载

      在上篇讨论里我们主要介绍了Akka-Cluster的基本原理。同时我们也确认了几个使用Akka-Cluster的重点:首先,Akka-Cluster集群构建与Actor编程没有直接的关联。集群构建是ActorSystem层面上的,可以是纯粹的配置和部署行为;分布式Actor程序编程实现了Actor消息地址的透明化,无须考虑目标运行环境是否分布式的,可以按正常的Actor编程模式进行。

      既然分布式的Actor编程无须特别针对集群环境,那么摆在我们面前的就是多个可以直接使用的运算环境(集群节点)了,现在我们的分布式编程方式应该主要聚焦在如何充分使用这些分布的运算环境,即:如何把程序合理分配到各集群节点以达到最优的运算输出效率。我们可以通过人为方式有目的向集群节点分配负载,或者通过某种算法来自动分配就像前面讨论过的Routing那样。

    我们首先示范如何手工进行集群的负载分配:目的很简单:把不同的功能分配给不同的集群节点去运算。先按运算功能把集群节点分类:我们可以通过设定节点角色role来实现。然后需要一类节点(可能是多个节点)作为前端负责承接任务,然后根据任务类型来分配。负责具体运算的节点统称后台节点(backend nodes),负责接收和分配任务的节点统称前端节点(frontend nodes)。在编程过程中唯一需要考虑集群环境的部分就是前端节点需要知道处在所有后台节点上运算Actor的具体地址,即ActorRef。这点需要后台节点Node在加人集群时向前端负责分配任务的Actor报备自己的ActorRef。前端Actor是通过一个后台报备的ActorRef清单来分配任务的。假如我们用加减乘除模拟四项不同的运算功能,用手工形式把每一项功能部署到一个集群节点上,然后用一个前端节点按功能分配运算任务。下面是所有节点共享的消息类型:

    package clusterloadbalance.messages
    
    object Messages {
      sealed trait MathOps
      case class Add(x: Int, y: Int) extends MathOps
      case class Sub(x: Int, y: Int) extends MathOps
      case class Mul(x: Int, y: Int) extends MathOps
      case class Div(x: Int, y: Int) extends MathOps
    
      sealed trait ClusterMsg
      case class RegisterBackendActor(role: String) extends ClusterMsg
    
    }

    负责运算的后台定义如下:

    package clusterloadbalance.backend
    
    import akka.actor._
    import clusterloadbalance.messages.Messages._
    
    import scala.concurrent.duration._
    import akka.cluster._
    import akka.cluster.ClusterEvent._
    import com.typesafe.config.ConfigFactory
    
    
    object CalcFuctions {
      def propsFuncs = Props(new CalcFuctions)
      def propsSuper(role: String) = Props(new CalculatorSupervisor(role))
    }
    
    class CalcFuctions extends Actor {
      override def receive: Receive = {
        case Add(x,y) =>
          println(s"$x + $y carried out by ${self} with result=${x+y}")
        case Sub(x,y) =>
          println(s"$x - $y carried out by ${self} with result=${x - y}")
        case Mul(x,y) =>
          println(s"$x * $y carried out by ${self} with result=${x * y}")
        case Div(x,y) =>
            println(s"$x / $y carried out by ${self} with result=${x / y}")
      }
    
      override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
        println(s"Restarting calculator: ${reason.getMessage}")
        super.preRestart(reason, message)
      }
    }
    
    class CalculatorSupervisor(mathOps: String) extends Actor {
      def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
        case _: ArithmeticException => SupervisorStrategy.Resume
      }
    
      override def supervisorStrategy: SupervisorStrategy =
        OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){
          decider.orElse(SupervisorStrategy.defaultDecider)
        }
    
      val calcActor = context.actorOf(CalcFuctions.propsFuncs,"calcFunction")
      val cluster = Cluster(context.system)
      override def preStart(): Unit = {
        cluster.subscribe(self,classOf[MemberUp])
        super.preStart()
      }
    
      override def postStop(): Unit =
        cluster.unsubscribe(self)
        super.postStop()
    
      override def receive: Receive = {
        case MemberUp(m) =>
          if (m.hasRole("frontend")) {
            context.actorSelection(RootActorPath(m.address)+"/user/frontend") !
              RegisterBackendActor(mathOps)
          }
        case msg@ _ => calcActor.forward(msg)
      }
    
    }
    
    object Calculator {
      def create(role: String): Unit = {   //create instances of backend Calculator
        val config = ConfigFactory.parseString("Backend.akka.cluster.roles = [""+role+""]")
          .withFallback(ConfigFactory.load()).getConfig("Backend")
        val calcSystem = ActorSystem("calcClusterSystem",config)
        val calcRef = calcSystem.actorOf(CalcFuctions.propsSuper(role),"calculator")
      }
    }

    注意,这个运算Actor是带监管的,可以自动处理异常。

    为了方便表达,我把所有功能都集中在CalcFunctions一个Actor里。在实际情况下应该分成四个不同的Actor,因为它们会被部署到不同的集群节点上。另外,增加了一个supervisor来监管CalcFunctions。这个supervisor也是分布在四个节点上分别监管本节点上的子级Actor。现在所有集群节点之间的功能交流都会通过这个supervisor来进行了,所以需要向前端登记ActorRef和具体负责的功能role。下面是前台程序定义:

    package clusterloadbalance.frontend
    
    import akka.actor._
    import clusterloadbalance.messages.Messages._
    import com.typesafe.config.ConfigFactory
    
    
    object CalcRouter {
      def props = Props(new CalcRouter)
    }
    
    class CalcRouter extends Actor {
      var nodes: Map[String,ActorRef] = Map()
      override def receive: Receive = {
        case RegisterBackendActor(role) =>
          nodes += (role -> sender())
          context.watch(sender())
        case add: Add => routeCommand("adder", add)
        case sub: Sub => routeCommand("substractor",sub)
        case mul: Mul => routeCommand("multiplier",mul)
        case div: Div => routeCommand("divider",div)
    
        case Terminated(ref) =>    //remove from register
          nodes = nodes.filter { case (_,r) => r != ref}
    
      }
      def routeCommand(role: String, ops: MathOps): Unit = {
        nodes.get(role) match {
          case Some(ref) => ref ! ops
          case None =>
            println(s"$role not registered!")
        }
      }
    }
    
    object FrontEnd {
      private var router: ActorRef = _
      def create = {  //must load this seed-node before any backends
        val calcSystem = ActorSystem("calcClusterSystem",ConfigFactory.load().getConfig("Frontend"))
        router = calcSystem.actorOf(CalcRouter.props,"frontend")
      }
      def getRouter = router
    }

    只有一个前台负责功能分配。这是个seed-node,必须先启动,然后其它后台节点启动时才能进行后台登记。

    我们用下面的代码来测试这个程序。由于Akka的集群节点是由不同的ActorSystem实例代表的,所以在这个例子里没有必要使用多层项目结构:

    package clusterLoadBalance.demo
    import clusterloadbalance.messages.Messages._
    import clusterloadbalance.backend.Calculator
    import clusterloadbalance.frontend.FrontEnd
    
    object LoadBalancerDemo extends App {
      FrontEnd.create
    
      Calculator.create("adder")
    
      Calculator.create("substractor")
    
      Calculator.create("multiplier")
    
      Calculator.create("divider")
    
      Thread.sleep(2000)
    
      val router = FrontEnd.getRouter
    
      router ! Add(10,3)
      router ! Mul(3,7)
      router ! Div(8,2)
      router ! Sub(45, 3)
      router ! Div(8,0)
    
    }

    然后,resources/application.conf是这样定义的: 

    Frontend {
      akka {
        actor {
          provider = "cluster"
        }
        remote {
          log-remote-lifecycle-events = off
          netty.tcp {
            hostname = "127.0.0.1"
            port = 2551
          }
        }
    
        cluster {
          roles = [frontend]
          seed-nodes = [
            "akka.tcp://calcClusterSystem@127.0.0.1:2551"]
    
          auto-down-unreachable-after = 10s
        }
      }
    }
    
    Backend {
      akka{
        actor {
          provider = "cluster"
        }
        remote {
          log-remote-lifecycle-events = off
          netty.tcp {
            hostname = "127.0.0.1"
            port = 0
          }
        }
    
        cluster {
          roles = [backend]
          seed-nodes = [
            "akka.tcp://calcClusterSystem@127.0.0.1:2551"]
    
          auto-down-unreachable-after = 10s
        }
      }
    }

    运算结果:

    ...
    3 * 7 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1800460396] with result=21
    10 + 3 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#1046049287] with result=13
    8 / 2 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-2008880936] with result=4
    45 - 3 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#367851617] with result=42
    [WARN] [06/30/2017 09:12:39.465] [calcClusterSystem-akka.actor.default-dispatcher-20] [akka://calcClusterSystem/user/calculator/calcFunction] / by zero

    结果显示不同的功能在不同的节点运算,而且ArithmeticException并没有造成Restart,说明SupervisorStrategy发挥了作用。 

    我们也可以用类似前面介绍的Routing方式来分配负载,原理还是通过前端的Router把任务分配到后台多个Routee上去运算。不同的是这些Routee可以在不同的集群节点上。原则上讲,具体任务分配是通过某种Routing算法自动安排到Routees上去的。但是,我们还是可以用ConsistentHashing-Router模式实现目标Routees的配对。上面例子的加减乘除操作类型可以成为这种Router模式的分类键(Key)。Router还是分Router-Group和Router-Pool两种:Router-Pool更自动化些,它的Routees是由Router构建及部署的,Router-Group的Router则是通过路径查找方式连接已经构建和部署好了的Routees的。如果我们希望有针对性的把任务分配到指定集群节点上的Routee,必须首先构建和部署Routees后再用Router-Group方式连接Routees。还是用上面的例子,使用ConsistentHashing-Router模式。首先调整一下消息类型:

    package clusterrouter.messages
    import akka.routing.ConsistentHashingRouter._
    object Messages {
      class MathOps(hashKey: String) extends Serializable with ConsistentHashable {
        override def consistentHashKey: Any = hashKey
      }
      case class Add(x: Int, y: Int) extends MathOps("adder")
      case class Sub(x: Int, y: Int) extends MathOps("substractor")
      case class Mul(x: Int, y: Int) extends MathOps("multiplier")
      case class Div(x: Int, y: Int) extends MathOps("divider")
    
    }

    注意现在Router是在跨系统的集群环境下,消息类型必须实现Serializable。构建Calculator并且完成部署:

    package clusterrouter.backend
    
    import akka.actor._
    import clusterrouter.messages.Messages._
    
    import scala.concurrent.duration._
    
    import com.typesafe.config.ConfigFactory
    
    
    object CalcFuctions {
      def propsFuncs = Props(new CalcFuctions)
      def propsSuper = Props(new CalculatorSupervisor)
    }
    
    class CalcFuctions extends Actor {
      override def receive: Receive = {
        case Add(x,y) =>
          println(s"$x + $y carried out by ${self} with result=${x+y}")
        case Sub(x,y) =>
          println(s"$x - $y carried out by ${self} with result=${x - y}")
        case Mul(x,y) =>
          println(s"$x * $y carried out by ${self} with result=${x * y}")
        case Div(x,y) =>
            println(s"$x / $y carried out by ${self} with result=${x / y}")
      }
    
      override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
        println(s"Restarting calculator: ${reason.getMessage}")
        super.preRestart(reason, message)
      }
    }
    
    class CalculatorSupervisor extends Actor {
      def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
        case _: ArithmeticException => SupervisorStrategy.Resume
      }
    
      override def supervisorStrategy: SupervisorStrategy =
        OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){
          decider.orElse(SupervisorStrategy.defaultDecider)
        }
    
      val calcActor = context.actorOf(CalcFuctions.propsFuncs,"calcFunction")
    
      override def receive: Receive = {
        case msg@ _ => calcActor.forward(msg)
      }
    
    }
    
    
    object Calculator {
      def create(port: Int): Unit = {   //create instances of backend Calculator
        val config = ConfigFactory.parseString("akka.cluster.roles = [backend]")
            .withFallback(ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port"))
          .withFallback(ConfigFactory.load())
        val calcSystem = ActorSystem("calcClusterSystem",config)
        val calcRef = calcSystem.actorOf(CalcFuctions.propsSuper,"calculator")
      }
    }

    用create函数构建后台节点后再在之上构建部署一个CalculatorSupervisor。

    frontend定义如下:

    package clusterrouter.frontend
    
    import akka.actor._
    import akka.routing.FromConfig
    import com.typesafe.config.ConfigFactory
    
    
    object CalcRouter {
      def props = Props(new CalcRouter)
    }
    
    class CalcRouter extends Actor {
    
      // This router is used both with lookup and deploy of routees. If you
      // have a router with only lookup of routees you can use Props.empty
      // instead of Props[CalculatorSupervisor].
      val calcRouter = context.actorOf(FromConfig.props(Props.empty),
        name = "calcRouter")
    
      override def receive: Receive = {
        case msg@ _ => calcRouter forward msg
      }
    }
    
    object FrontEnd {
      private var router: ActorRef = _
      def create = {  //must load this seed-node before any backends
        val calcSystem = ActorSystem("calcClusterSystem",ConfigFactory.load("hashing"))
        router = calcSystem.actorOf(CalcRouter.props,"frontend")
      }
      def getRouter = router
    }

    注意calcRouter构建,用了FromConfig.props(Props.empty),代表只进行Routees查找,无需部署。hashing.conf文件如下:

    include "application"
    akka.cluster.roles = [frontend]
    akka.actor.deployment {
      /frontend/calcRouter {
        router = consistent-hashing-group
        routees.paths = ["/user/calculator"]
        cluster {
          enabled = on
          allow-local-routees = on
          use-role = backend
        }
      }
    }

    application.conf:

    akka {
      actor {
        provider = cluster
      }
      remote {
        log-remote-lifecycle-events = off
        netty.tcp {
          hostname = "127.0.0.1"
          port = 0
        }
      }
    
      cluster {
        seed-nodes = [
          "akka.tcp://calcClusterSystem@127.0.0.1:2551"]
    
        # auto downing is NOT safe for production deployments.
        # you may want to use it during development, read more about it in the docs.
        auto-down-unreachable-after = 10s
      }
    }

    用下面的代码进行测试:

    package clusterLoadBalance.demo
    import clusterrouter.messages.Messages._
    import clusterrouter.backend.Calculator
    import clusterrouter.frontend.FrontEnd
    
    object LoadBalancerDemo extends App {
    
      Calculator.create(2551)   //seed-node
      Calculator.create(0)      //backend node
      Calculator.create(0)
      Calculator.create(0)
      Calculator.create(0)
      Calculator.create(0)
    
      Thread.sleep(2000)
    
      FrontEnd.create
    
    
      Thread.sleep(2000)
    
      val router = FrontEnd.getRouter
    
      router ! Add(10,3)
      router ! Mul(3,7)
      router ! Div(8,2)
      router ! Sub(45, 3)
      router ! Div(8,0)
    
      Thread.sleep(2000)
    
      router ! Add(10,3)
      router ! Mul(3,7)
      router ! Div(8,2)
      router ! Sub(45, 3)
      router ! Div(8,0)
    
    }

    重复两批次运算后显示: 

    8 / 2 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#1669565820] with result=4
    3 * 7 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1532934391] with result=21
    10 + 3 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#904088130] with result=13
    45 - 3 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1532934391] with result=42
    [WARN] [07/02/2017 17:35:03.381] [calcClusterSystem-akka.actor.default-dispatcher-20] [akka://calcClusterSystem/user/calculator/calcFunction] / by zero
    [INFO] [07/02/2017 17:35:05.076] [calcClusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://calcClusterSystem)] Cluster Node [akka.tcp://calcClusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://calcClusterSystem@127.0.0.1:51304] to [Up]
    10 + 3 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#904088130] with result=13
    3 * 7 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1532934391] with result=21
    8 / 2 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#1669565820] with result=4
    45 - 3 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1532934391] with result=42
    [WARN] [07/02/2017 17:35:05.374] [calcClusterSystem-akka.actor.default-dispatcher-17] [akka://calcClusterSystem/user/calculator/calcFunction] / by zero

    相同功能的运算被分配到了相同的节点上,这点可以用Actor的地址证明。

    Akka-Cluster提供的Adaptive-Group是一种比较智能化的自动Routing模式,它是通过对各集群节点的具体负载情况来分配任务的。用户只需要定义adaptive-group的配置,按情况增减集群节点以及在不同的集群节点上构建部署Routee都是自动的。Adaptive-Group-Router可以在配置文件中设置:

    include "application"
    
    akka.cluster.min-nr-of-members = 3
    
    akka.cluster.role {
      frontend.min-nr-of-members = 1
      backend.min-nr-of-members = 2
    }
    
    akka.actor.deployment {
      /frontend/adaptive/calcRouter {
        # Router type provided by metrics extension.
        router = cluster-metrics-adaptive-group
        # Router parameter specific for metrics extension.
        # metrics-selector = heap
        # metrics-selector = load
        # metrics-selector = cpu
        metrics-selector = mix
        #
        nr-of-instances = 100
        routees.paths = ["/user/calculator"]
        cluster {
          enabled = on
          use-role = backend
          allow-local-routees = off
        }
      }
    }

    后台的运算Actor与上面人为分配方式一样不变。前端Router的构建也没变,只不过在构建时目标是cluster-metrics-adaptive-group。这个在测试代码中可以看到: 

    package clusterLoadBalance.demo
    import clusterrouter.messages.Messages._
    import clusterrouter.backend.Calculator
    import clusterrouter.frontend.CalcRouter
    import com.typesafe.config.ConfigFactory
    import scala.util.Random
    import scala.concurrent.duration._
    import akka.actor._
    import akka.cluster._
    
    
    class RouterRunner extends Actor {
      val jobs = List(Add,Sub,Mul,Div)
      import context.dispatcher
    
      val calcRouter = context.actorOf(CalcRouter.props,"adaptive")
      context.system.scheduler.schedule(3.seconds, 3.seconds, self, "DoRandomMath")
    
      override def receive: Receive = {
        case  _ => calcRouter ! anyMathJob
      }
      def anyMathJob: MathOps =
        jobs(Random.nextInt(4))(Random.nextInt(100), Random.nextInt(100))
    }
    
    object AdaptiveRouterDemo extends App {
    
      Calculator.create(2551)   //seed-node
      Calculator.create(0)      //backend node
    
      Thread.sleep(2000)
    
    
      val config = ConfigFactory.parseString("akka.cluster.roles = [frontend]").
        withFallback(ConfigFactory.load("adaptive"))
    
      val calcSystem = ActorSystem("calcClusterSystem",config)
    
      //#registerOnUp
      Cluster(calcSystem) registerOnMemberUp {
        val _ = calcSystem.actorOf(Props[RouterRunner],"frontend")
      }
      //#registerOnUp
    
      //val _ = calcSystem.actorOf(Props[RouterRunner],"frontend")
    
    }

    测试结果显示如下:

    ...
    12 * 9 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1771471734] with result=108
    27 + 74 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1512057504] with result=101
    78 + 37 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1512057504] with result=115
    77 * 33 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1771471734] with result=2541
    32 - 19 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1512057504] with result=13
    65 * 46 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1771471734] with result=2990
    68 - 99 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1771471734] with result=-31
    97 + 18 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1771471734] with result=115
    ...

    运算已经被部署到不同节点去进行了。

     下面是这次讨论示范的源代码:

    例1:

    build.sbt:

    name := "cluster-load-balance"
    
    version := "1.0"
    
    scalaVersion := "2.11.8"
    
    libraryDependencies ++= {
      val akkaVersion = "2.5.3"
      Seq(
        "com.typesafe.akka"       %%  "akka-actor"   % akkaVersion,
        "com.typesafe.akka"       %%  "akka-cluster"   % akkaVersion
      )
    
    }

    application.conf:

    Frontend {
      akka {
        actor {
          provider = "cluster"
        }
        remote {
          log-remote-lifecycle-events = off
          netty.tcp {
            hostname = "127.0.0.1"
            port = 2551
          }
        }
    
        cluster {
          roles = [frontend]
          seed-nodes = [
            "akka.tcp://calcClusterSystem@127.0.0.1:2551"]
    
          auto-down-unreachable-after = 10s
        }
      }
    }
    
    Backend {
      akka{
        actor {
          provider = "cluster"
        }
        remote {
          log-remote-lifecycle-events = off
          netty.tcp {
            hostname = "127.0.0.1"
            port = 0
          }
        }
    
        cluster {
          roles = [backend]
          seed-nodes = [
            "akka.tcp://calcClusterSystem@127.0.0.1:2551"]
    
          auto-down-unreachable-after = 10s
        }
      }
    }

    messages/Messages.scala

    package clusterloadbalance.messages
    
    object Messages {
      sealed trait MathOps
      case class Add(x: Int, y: Int) extends MathOps
      case class Sub(x: Int, y: Int) extends MathOps
      case class Mul(x: Int, y: Int) extends MathOps
      case class Div(x: Int, y: Int) extends MathOps
    
      sealed trait ClusterMsg
      case class RegisterBackendActor(role: String) extends ClusterMsg
    
    }

    backend/Calculator.scala:

    package clusterloadbalance.backend
    
    import akka.actor._
    import clusterloadbalance.messages.Messages._
    
    import scala.concurrent.duration._
    import akka.cluster._
    import akka.cluster.ClusterEvent._
    import com.typesafe.config.ConfigFactory
    
    
    object CalcFuctions {
      def propsFuncs = Props(new CalcFuctions)
      def propsSuper(role: String) = Props(new CalculatorSupervisor(role))
    }
    
    class CalcFuctions extends Actor {
      override def receive: Receive = {
        case Add(x,y) =>
          println(s"$x + $y carried out by ${self} with result=${x+y}")
        case Sub(x,y) =>
          println(s"$x - $y carried out by ${self} with result=${x - y}")
        case Mul(x,y) =>
          println(s"$x * $y carried out by ${self} with result=${x * y}")
        case Div(x,y) =>
            println(s"$x / $y carried out by ${self} with result=${x / y}")
      }
    
      override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
        println(s"Restarting calculator: ${reason.getMessage}")
        super.preRestart(reason, message)
      }
    }
    
    class CalculatorSupervisor(mathOps: String) extends Actor {
      def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
        case _: ArithmeticException => SupervisorStrategy.Resume
      }
    
      override def supervisorStrategy: SupervisorStrategy =
        OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){
          decider.orElse(SupervisorStrategy.defaultDecider)
        }
    
      val calcActor = context.actorOf(CalcFuctions.propsFuncs,"calcFunction")
      val cluster = Cluster(context.system)
      override def preStart(): Unit = {
        cluster.subscribe(self,classOf[MemberUp])
        super.preStart()
      }
    
      override def postStop(): Unit =
        cluster.unsubscribe(self)
        super.postStop()
    
      override def receive: Receive = {
        case MemberUp(m) =>
          if (m.hasRole("frontend")) {
            context.actorSelection(RootActorPath(m.address)+"/user/frontend") !
              RegisterBackendActor(mathOps)
          }
        case msg@ _ => calcActor.forward(msg)
      }
    
    }
    
    
    object Calculator {
      def create(role: String): Unit = {   //create instances of backend Calculator
        val config = ConfigFactory.parseString("Backend.akka.cluster.roles = ["+role+"]")
          .withFallback(ConfigFactory.load()).getConfig("Backend")
        val calcSystem = ActorSystem("calcClusterSystem",config)
        val calcRef = calcSystem.actorOf(CalcFuctions.propsSuper(role),"calculator")
      }
    }

    frontend/FrontEnd.scala:

    package clusterloadbalance.frontend
    
    import akka.actor._
    import clusterloadbalance.messages.Messages._
    import com.typesafe.config.ConfigFactory
    
    
    object CalcRouter {
      def props = Props(new CalcRouter)
    }
    
    class CalcRouter extends Actor {
      var nodes: Map[String,ActorRef] = Map()
      override def receive: Receive = {
        case RegisterBackendActor(role) =>
          nodes += (role -> sender())
          context.watch(sender())
        case add: Add => routeCommand("adder", add)
        case sub: Sub => routeCommand("substractor",sub)
        case mul: Mul => routeCommand("multiplier",mul)
        case div: Div => routeCommand("divider",div)
    
        case Terminated(ref) =>    //remove from register
          nodes = nodes.filter { case (_,r) => r != ref}
    
      }
      def routeCommand(role: String, ops: MathOps): Unit = {
        nodes.get(role) match {
          case Some(ref) => ref ! ops
          case None =>
            println(s"$role not registered!")
        }
      }
    }
    
    object FrontEnd {
      private var router: ActorRef = _
      def create = {  //must load this seed-node before any backends
        val calcSystem = ActorSystem("calcClusterSystem",ConfigFactory.load().getConfig("Frontend"))
        router = calcSystem.actorOf(CalcRouter.props,"frontend")
      }
      def getRouter = router
    }

    loadBalanceDemo.scala

    package clusterLoadBalance.demo
    import clusterloadbalance.messages.Messages._
    import clusterloadbalance.backend.Calculator
    import clusterloadbalance.frontend.FrontEnd
    
    object LoadBalancerDemo extends App {
      FrontEnd.create
    
      Calculator.create("adder")
    
      Calculator.create("substractor")
    
      Calculator.create("multiplier")
    
      Calculator.create("divider")
    
      Thread.sleep(2000)
    
      val router = FrontEnd.getRouter
    
      router ! Add(10,3)
      router ! Mul(3,7)
      router ! Div(8,2)
      router ! Sub(45, 3)
      router ! Div(8,0)
      
    }

    例2:

    build.sbt:

    name := "cluster-router-demo"
    
    version := "1.0"
    
    scalaVersion := "2.11.9"
    
    sbtVersion := "0.13.7"
    
    resolvers += "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/"
    
    val akkaVersion = "2.5.3"
    
    libraryDependencies ++= Seq(
      "com.typesafe.akka" %% "akka-actor" % akkaVersion,
      "com.typesafe.akka" %% "akka-remote" % akkaVersion,
      "com.typesafe.akka" %% "akka-cluster" % akkaVersion,
      "com.typesafe.akka" %% "akka-cluster-metrics" % akkaVersion,
      "com.typesafe.akka" %% "akka-cluster-tools" % akkaVersion,
      "com.typesafe.akka" %% "akka-multi-node-testkit" % akkaVersion)

    application.conf+hashing.conf+adaptive.conf:

    akka {
      actor {
        provider = cluster
      }
      remote {
        log-remote-lifecycle-events = off
        netty.tcp {
          hostname = "127.0.0.1"
          port = 0
        }
      }
    
      cluster {
        seed-nodes = [
          "akka.tcp://calcClusterSystem@127.0.0.1:2551"]
    
        # auto downing is NOT safe for production deployments.
        # you may want to use it during development, read more about it in the docs.
        auto-down-unreachable-after = 10s
      }
    }
    
    include "application"
    akka.cluster.roles = [frontend]
    akka.actor.deployment {
      /frontend/calcRouter {
        router = consistent-hashing-group
        routees.paths = ["/user/calculator"]
        cluster {
          enabled = on
          allow-local-routees = on
          use-role = backend
        }
      }
    }
    
    include "application"
    
    akka.cluster.min-nr-of-members = 3
    
    akka.cluster.role {
      frontend.min-nr-of-members = 1
      backend.min-nr-of-members = 2
    }
    
    akka.actor.warn-about-java-serializer-usage = off
    akka.log-dead-letters-during-shutdown = off
    akka.log-dead-letters = off
    
    akka.actor.deployment {
      /frontend/adaptive/calcRouter {
        # Router type provided by metrics extension.
        router = cluster-metrics-adaptive-group
        # Router parameter specific for metrics extension.
        # metrics-selector = heap
        # metrics-selector = load
        # metrics-selector = cpu
        metrics-selector = mix
        #
        nr-of-instances = 100
        routees.paths = ["/user/calculator"]
        cluster {
          enabled = on
          use-role = backend
          allow-local-routees = off
        }
      }
    }

    messages/Messages.scala:

    package clusterrouter.messages
    import akka.routing.ConsistentHashingRouter._
    object Messages {
      class MathOps(hashKey: String) extends Serializable with ConsistentHashable {
        override def consistentHashKey: Any = hashKey
      }
      case class Add(x: Int, y: Int) extends MathOps("adder")
      case class Sub(x: Int, y: Int) extends MathOps("substractor")
      case class Mul(x: Int, y: Int) extends MathOps("multiplier")
      case class Div(x: Int, y: Int) extends MathOps("divider")
    
    }

    backend/Calculator.scala:

    package clusterrouter.backend
    
    import akka.actor._
    import clusterrouter.messages.Messages._
    
    import scala.concurrent.duration._
    
    import com.typesafe.config.ConfigFactory
    
    
    object CalcFuctions {
      def propsFuncs = Props(new CalcFuctions)
      def propsSuper = Props(new CalculatorSupervisor)
    }
    
    class CalcFuctions extends Actor {
      override def receive: Receive = {
        case Add(x,y) =>
          println(s"$x + $y carried out by ${self} with result=${x+y}")
        case Sub(x,y) =>
          println(s"$x - $y carried out by ${self} with result=${x - y}")
        case Mul(x,y) =>
          println(s"$x * $y carried out by ${self} with result=${x * y}")
        case Div(x,y) =>
            println(s"$x / $y carried out by ${self} with result=${x / y}")
      }
    
      override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
        println(s"Restarting calculator: ${reason.getMessage}")
        super.preRestart(reason, message)
      }
    }
    
    class CalculatorSupervisor extends Actor {
      def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
        case _: ArithmeticException => SupervisorStrategy.Resume
      }
    
      override def supervisorStrategy: SupervisorStrategy =
        OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){
          decider.orElse(SupervisorStrategy.defaultDecider)
        }
    
      val calcActor = context.actorOf(CalcFuctions.propsFuncs,"calcFunction")
    
      override def receive: Receive = {
        case msg@ _ => calcActor.forward(msg)
      }
    
    }
    
    
    object Calculator {
      def create(port: Int): Unit = {   //create instances of backend Calculator
        val config = ConfigFactory.parseString("akka.cluster.roles = [backend]")
            .withFallback(ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port"))
          .withFallback(ConfigFactory.load("adaptive"))
        val calcSystem = ActorSystem("calcClusterSystem",config)
        val calcRef = calcSystem.actorOf(CalcFuctions.propsSuper,"calculator")
      }
    }

    frontend/FrontEnd.scala:

    package clusterrouter.frontend
    
    import akka.actor._
    import akka.routing.FromConfig
    import com.typesafe.config.ConfigFactory
    
    object CalcRouter {
      def props = Props(new CalcRouter)
    }
    
    class CalcRouter extends Actor {
    
      // This router is used both with lookup and deploy of routees. If you
      // have a router with only lookup of routees you can use Props.empty
      // instead of Props[CalculatorSupervisor].
      val calcRouter = context.actorOf(FromConfig.props(Props.empty),
        name = "calcRouter")
    
      override def receive: Receive = {
        case msg@ _ => calcRouter forward msg
      }
    }
    
    object FrontEnd {
      private var router: ActorRef = _
      def create = {  //must load this seed-node before any backends
        val calcSystem = ActorSystem("calcClusterSystem",ConfigFactory.load("hashing"))
        router = calcSystem.actorOf(CalcRouter.props,"frontend")
      }
      def getRouter = router
    }

    loadBalancerDemo.scala:

    package clusterrouter.demo
    import clusterrouter.messages.Messages._
    import clusterrouter.backend.Calculator
    import clusterrouter.frontend.FrontEnd
    
    object LoadBalancerDemo extends App {
    
      Calculator.create(2551)   //seed-node
      Calculator.create(0)      //backend node
      Calculator.create(0)
      Calculator.create(0)
      Calculator.create(0)
      Calculator.create(0)
    
      Thread.sleep(2000)
    
      FrontEnd.create
    
    
      Thread.sleep(2000)
    
      val router = FrontEnd.getRouter
    
      router ! Add(10,3)
      router ! Mul(3,7)
      router ! Div(8,2)
      router ! Sub(45, 3)
      router ! Div(8,0)
    
      Thread.sleep(2000)
    
      router ! Add(10,3)
      router ! Mul(3,7)
      router ! Div(8,2)
      router ! Sub(45, 3)
      router ! Div(8,0)
    
    }

    adaptiveRouterDemo.scala:

    package clusterrouter.demo
    import clusterrouter.messages.Messages._
    import clusterrouter.backend.Calculator
    import clusterrouter.frontend.CalcRouter
    import com.typesafe.config.ConfigFactory
    import scala.util.Random
    import scala.concurrent.duration._
    import akka.actor._
    import akka.cluster._
    
    
    class RouterRunner extends Actor {
      val jobs = List(Add,Sub,Mul,Div)
      import context.dispatcher
    
      val calcRouter = context.actorOf(CalcRouter.props,"adaptive")
      context.system.scheduler.schedule(3.seconds, 3.seconds, self, "DoRandomMath")
    
      override def receive: Receive = {
        case  _ => calcRouter ! anyMathJob
      }
      def anyMathJob: MathOps =
        jobs(Random.nextInt(4))(Random.nextInt(100), Random.nextInt(100))
    }
    
    object AdaptiveRouterDemo extends App {
    
      Calculator.create(2551)   //seed-node
      Calculator.create(0)      //backend node
    
      Thread.sleep(2000)
    
    
      val config = ConfigFactory.parseString("akka.cluster.roles = [frontend]").
        withFallback(ConfigFactory.load("adaptive"))
    
      val calcSystem = ActorSystem("calcClusterSystem",config)
    
      //#registerOnUp
      Cluster(calcSystem) registerOnMemberUp {
        val _ = calcSystem.actorOf(Props[RouterRunner],"frontend")
      }
      //#registerOnUp
    
      //val _ = calcSystem.actorOf(Props[RouterRunner],"frontend")
    
    }

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

  • 相关阅读:
    JVM 常量池、运行时常量池、字符串常量池
    JVM Direct Memory
    JVM 方法区
    JVM GC Roots
    jvm 堆
    jvm slot复用
    JVM 虚拟机栈
    JVM 程序计数器
    java打印树形目录结构
    java 通过反射获取数组
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/7110661.html
Copyright © 2011-2022 走看看