zoukankan      html  css  js  c++  java
  • akka设计模式系列-Aggregate模式

      所谓的Aggregate模式,其实就是聚合模式,跟masterWorker模式有点类似,但其出发点不同。masterWorker模式是指master向worker发送命令,worker完成某种业务逻辑。而聚合模式则刚好相反,由各个worker完成某种业务逻辑后,把结果汇总发给某个actor,这个actor不一定是masterActor。

    class AggregateMasterActor extends Actor{
      override def receive: Receive = {
        case cmd: AggregateCommand.Aggregate =>
          // 将此次汇总结果汇报给from,为了简化,此处用self替代
          val from = self
          val backendActor = context.actorOf(Props(new AggregateMasterBackendActor(from,cmd.parallel)),s"AggregateMasterBackendActor-${cmd.at}")
          backendActor ! cmd
        case AggregateBackendEvent.WorkDone(sum) =>
          val from = sender()
          println(s"AggregateMasterActor [${self.path.name}] 收到 ${from.path.name} 汇总结果 $sum")
      }
    }
    class AggregateMasterBackendActor(replyTo:ActorRef,parallel:Int) extends Actor{
      var counter = 0
      var sum = 0L
      override def receive: Receive = {
        case AggregateCommand.Aggregate(_) =>
          println(s"AggregateMasterBackendActor [${self.path.name}] 开始工作,parallel $parallel,工作结果汇总给 ${replyTo.path.name}")
          1 to parallel foreach { i =>
            val worker = context.actorOf(Props(new AggregateWorker(self)),s"AggregateWorker-$i")
            worker ! AggregateBackendCommand.Aggregate(i,parallel)
          }
        case AggregateWorkerEvent.WorkDone(result) =>
          counter += 1
          sum += result
          if(counter == parallel){
            replyTo ! AggregateBackendEvent.WorkDone(sum)
            context.stop(self)
            println(s"AggregateMasterBackendActor [${self.path.name}] 工作结束退出")
          }
      }
    }
    class AggregateWorker(replyTo:ActorRef) extends Actor{
      def calcResult(index:Int,parallel:Int):Long = index * parallel
      override def receive: Receive = {
        case AggregateBackendCommand.Aggregate(index,parallel) =>
          println(s"AggregateWorker [${self.path.name}] 开始工作 index=$index,工作汇总给 ${replyTo.path.name}")
          val result = calcResult(index,parallel)
          replyTo ! AggregateWorkerEvent.WorkDone(result)
          println(s"AggregateWorker [${self.path.name}] 工作结束退出")
          context.stop(self)
      }
    }
    
    object AggregatePattern {
      def main(args: Array[String]): Unit = {
        val system = ActorSystem("AggregatePattern",ConfigFactory.load())
        val aggregateMasterActor =  system.actorOf(Props(new AggregateMasterActor),"AggregateMasterActor")
        aggregateMasterActor ! AggregateCommand.Aggregate(3)
      }
    }
    

     输出:

    AggregateMasterBackendActor [AggregateMasterBackendActor-1531383454073] 开始工作,parallel 3,工作结果汇总给 AggregateMasterActor
    AggregateWorker [AggregateWorker-1] 开始工作 index=1,工作汇总给 AggregateMasterBackendActor-1531383454073
    AggregateWorker [AggregateWorker-2] 开始工作 index=2,工作汇总给 AggregateMasterBackendActor-1531383454073
    AggregateWorker [AggregateWorker-3] 开始工作 index=3,工作汇总给 AggregateMasterBackendActor-1531383454073
    AggregateWorker [AggregateWorker-1] 工作结束退出
    AggregateWorker [AggregateWorker-3] 工作结束退出
    AggregateWorker [AggregateWorker-2] 工作结束退出
    AggregateMasterBackendActor [AggregateMasterBackendActor-1531383454073] 工作结束退出
    AggregateMasterActor [AggregateMasterActor] 收到 AggregateMasterBackendActor-1531383454073 汇总结果 18
    

      从代码来看该设计模式也比较简单,就是由Master创建以临时的子actor,此处命名为MasterBackend,将汇报对象的actorRef以构造函数的形式传递给MasterBackend,此处为了简单用self替代;MasterBackend根据并行参数,创建对应个数的workerActor,并把本身的actorRef以构造函数的形式传递给workerActor,workerActor执行具体的业务逻辑,并将汇总结果,发送给replyTo(也就是MasterBackend);MasterBackend收到workerActor的汇总结果,根据并行参数,判断所有子actor是否执行结束,若执行结束,此次计算完成,将汇总后的结果,发送给replyTo(也就是MasterActor)。

      上面这种设计模式有一个明显的好处,就是Master可以迅速创建大量的聚合工作而不阻塞,因为它收到命令后,只是简单的创建MasterBackend,工作交给它去执行,这个过程非常快。如果某个聚合工作比较慢,并不会影响其他任务。

      之所以说这个设计模式非常重要,是因为在spark/storm等大多分布式框架中都有它的影子。他们都选择将功能进行拆解,专门的节点或actor分别负责任务的接收、创建、执行、汇总这些工作,工作之间互不影响。如果能够深刻的理解这种设计模式,你将会设计出一个架构分层合理、互相解耦的高质量应用系统。

  • 相关阅读:
    (转载)Centos7 install Openstack Juno (RDO)
    (转载)vmware esxi 6.0 开启嵌套虚拟化
    Delphi XE5 android toast
    delphi中Message消息的使用方法
    delphi中Time消息的使用方法
    Delphi中Interface接口的使用方法
    SystemParametersinfo 用法
    Delphi XE5 android openurl(转)
    Delphi XE5开发Android程序使用自定义字体文件.
    获取 TUniConnection.SpecificOptions默认值和下拉框列表值
  • 原文地址:https://www.cnblogs.com/gabry/p/9300259.html
Copyright © 2011-2022 走看看