1. 启动类
object Application extends App{
val _system = ActorSystem("HelloAkka") //构建akka容器
val master:ActorRef = _system.actorOf(Props[MasterActor],name="master") //akka容器创建actor
println("master.path ==> "+master.path) //akka://HelloAkka/user/master
master ! "hi my name is spark, so happy"
master ! "hi my zsh"
master ! "xixi"
Thread.sleep(1000)
master ! new Result
Thread.sleep(500)
_system.terminate
}
2. MasterActor创建map,reduce,aggregate任务的actor
class MasterActor extends Actor{
val aggregateActor:ActorRef = context.actorOf(Props[AggregateActor],name="aggregate")
val reduceActor:ActorRef = context.actorOf(Props(new ReduceActor(aggregateActor)),name="reduce")
val mapActor:ActorRef = context.actorOf(Props(new MapActor(reduceActor)),name="map")
println("aggregateActor ==> "+aggregateActor.path) //akka://HelloAkka/user/master/aggregate (master的子actor)
println("mapActor ==> "+mapActor.path)
println("reduceActor ==> "+reduceActor.path)
override def receive: Receive = { // Receive用type重命名的PartialFunction
case msg:String => mapActor ! msg
case msg:Result => aggregateActor ! msg
case _ =>
}
}
3. map任务
class MapActor(var reduceActor: ActorRef)extends Actor{
val STOP_WORDS = List("is","a")
override def receive: Receive = {
case msg:String => reduceActor ! evlExpression(msg)
case _ =>
}
def evlExpression(line:String):MapData = {
val dataList = new ArrayBuffer[Word] // scala可变数组
val parser:StringTokenizer = new StringTokenizer(line)
while(parser.hasMoreTokens){
val str: String = parser.nextToken()
if(!STOP_WORDS.contains(str)){
dataList += (new Word(str,1))
}
}
new MapData(dataList)
}
4. reduce任务
class ReduceActor(var aggregateActor: ActorRef) extends Actor{
override def receive: Receive = {
case msg: MapData => aggregateActor ! reduce(msg.dataList)
case _ =>
}
def reduce(dataList:ArrayBuffer[Word]) : ReduceData ={
val map = new HashMap[String,Int]
for(w:Word <- dataList){
val str: String = w.word
map += (str -> map.getOrElse(str,1))
}
new ReduceData(map)
}
}
5. aggregate任务
class AggregateActor extends Actor{
var finalMap = new HashMap[String,Int]
override def receive: Receive = {
case msg:ReduceData => sum(msg.raduceMap)
case msg:Result => println(finalMap)
}
def sum(map:HashMap[String,Int]){ //多个reduceactor会向aggregateactor发送整理好的map
for(tuple <- map){
val c = finalMap.getOrElse(tuple._1,0)+tuple._2
finalMap += (tuple._1 -> c)
}
}
}
6. 用到的实体类
class Word(val word:String,val count:Int)
case class Result();
class MapData(val dataList:ArrayBuffer[Word])
class ReduceData(val raduceMap:HashMap[String,Int])