zoukankan      html  css  js  c++  java
  • Akka(一)

    1. 启动类

    object Application extends App{
    
      val _system = ActorSystem("HelloAkka")  //构建akka容器
      val master:ActorRef = _system.actorOf(Props[MasterActor],name="master")   //akka容器创建actor
    
      println("master.path ==>	"+master.path)   //akka://HelloAkka/user/master
    
      master ! "hi my name is spark, so happy"
      master ! "hi my zsh"
      master ! "xixi"
      Thread.sleep(1000)
      master ! new Result
    
      Thread.sleep(500)
      _system.terminate
    }
    

    2. MasterActor创建map,reduce,aggregate任务的actor

    class MasterActor extends Actor{
      val aggregateActor:ActorRef = context.actorOf(Props[AggregateActor],name="aggregate")
      val reduceActor:ActorRef = context.actorOf(Props(new ReduceActor(aggregateActor)),name="reduce")
      val mapActor:ActorRef = context.actorOf(Props(new MapActor(reduceActor)),name="map")
    
      println("aggregateActor ==>	"+aggregateActor.path)  //akka://HelloAkka/user/master/aggregate  (master的子actor)
      println("mapActor ==>	"+mapActor.path)
      println("reduceActor ==>	"+reduceActor.path)
    
      override def receive: Receive = {    // Receive用type重命名的PartialFunction
        case msg:String => mapActor ! msg
        case msg:Result => aggregateActor ! msg
        case _ =>
      }
    }
    
    

    3. map任务

    class MapActor(var reduceActor: ActorRef)extends Actor{
      val STOP_WORDS = List("is","a")
      override def receive: Receive = {
        case msg:String => reduceActor ! evlExpression(msg)
        case _ =>
      }
    
      def evlExpression(line:String):MapData = {
        val dataList = new ArrayBuffer[Word]   // scala可变数组
        val parser:StringTokenizer = new StringTokenizer(line)
        while(parser.hasMoreTokens){
          val str: String = parser.nextToken()
          if(!STOP_WORDS.contains(str)){
            dataList += (new Word(str,1))
          }
        }
        new MapData(dataList)
      }
    

    4. reduce任务

    class ReduceActor(var aggregateActor: ActorRef) extends Actor{
      override def receive: Receive = {
        case msg: MapData => aggregateActor ! reduce(msg.dataList)
        case _ =>
      }
    
      def reduce(dataList:ArrayBuffer[Word]) : ReduceData ={
        val map = new HashMap[String,Int]
        for(w:Word <- dataList){
          val str: String = w.word
          map += (str -> map.getOrElse(str,1))
        }
        new ReduceData(map)
      }
    }
    
    

    5. aggregate任务

    class AggregateActor extends Actor{
    
      var finalMap = new HashMap[String,Int]
    
      override def receive: Receive = {
        case msg:ReduceData => sum(msg.raduceMap)
        case msg:Result => println(finalMap)
      }
      def sum(map:HashMap[String,Int]){  //多个reduceactor会向aggregateactor发送整理好的map
        for(tuple <- map){
          val c = finalMap.getOrElse(tuple._1,0)+tuple._2
          finalMap += (tuple._1 -> c)
        }
      }
    }
    

    6. 用到的实体类

    class Word(val word:String,val count:Int)
    
    case class Result();
    
    class MapData(val dataList:ArrayBuffer[Word])
    
    class ReduceData(val raduceMap:HashMap[String,Int])
    
    
  • 相关阅读:
    20200804 千锤百炼软工人第三十天
    20200803 千锤百炼软工人第二十九天
    20200802 千锤百炼软工人第二十八天
    小谢第51问:从输入url到浏览器显示页面发生了什么
    小谢第50问:vuex的五个属性-使用-介绍
    小谢第49问:URL都由什么组成
    小谢第48问:js跳转页面与打开新窗口的方法
    小谢第47问:vue项目中,assets和static的区别
    小谢第46问:js事件机制
    小谢第45问:Ajax 是什么? 如何创建一个 Ajax
  • 原文地址:https://www.cnblogs.com/72808ljup/p/5606461.html
Copyright © 2011-2022 走看看