zoukankan      html  css  js  c++  java
  • Akka实现WordCount(Scala)

    Akka实现WordCount(Scala):

    架构图:

    项目结构:

     

    pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    
    	<groupId>com.citi.sky</groupId>
    	<artifactId>AkkaPJ</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<packaging>jar</packaging>
    
    	<name>AkkaPJ</name>
    	<url>http://maven.apache.org</url>
    
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    	</properties>
    
    	<dependencies>
    	
    	
    		<dependency>
    			<groupId>junit</groupId>
    			<artifactId>junit</artifactId>
    			<version>4.12</version>
    			<scope>test</scope>
    		</dependency>
    	
    	  <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>2.11.6</version>
            </dependency>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-compiler</artifactId>
                <version>2.11.6</version>
            </dependency>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-reflect</artifactId>
                <version>2.11.6</version>
            </dependency>
    	
    	
    		<dependency>
    			<groupId>com.typesafe.akka</groupId>
    			<artifactId>akka-actor_2.11</artifactId>
    			<version>2.3.3</version>
    
    		</dependency>
    
    		<dependency>
    			<groupId>com.typesafe.akka</groupId>
    			<artifactId>akka-testkit_2.11</artifactId>
    			<version>2.3.6</version>
    			<scope>test</scope>
    		</dependency>
    		
    		<dependency>
    			<groupId>org.scalatest</groupId>
    			<artifactId>scalatest_2.11</artifactId>
    			<version>3.0.4</version>
    			<scope>test</scope>
    		</dependency>
    
    
    
    	</dependencies>
    	
    	  <build>
            <plugins>
                <plugin>
                    <groupId>org.scala-tools</groupId>
                    <artifactId>maven-scala-plugin</artifactId>
                    <version>2.15.2</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    	
    </project>
    

    消息:

    case class MapData (dataList: List[WordCount])
    
    case class ReduceData (reduceDataList: Map[String, Int])
    
    case class Result()
    
    case class WordCount (key: String, count: Int)
    

    Actors:

    MasterActor

    import akka.actor.Actor
    import akka.actor.Props
    import com.citi.dw.messages.Result
    
    class MasterActor extends Actor {
    
      private val aggregateActor = context.actorOf(Props(classOf[AggregateActor]), "aggregateActor")
      private val reduceActor = context.actorOf(Props(classOf[ReduceActor], aggregateActor), "reduceActor")
      private val mapActor = context.actorOf(Props(classOf[MapActor], reduceActor), "mapActor")
      
      
    
      def receive: Actor.Receive = {
        case msg: String => {
          mapActor ! msg
        }
        case msg: Result => {
          aggregateActor.forward(msg)
        }
    //    case msg: Map[String, Int] => 
        case _ => println("MasterActor receive wrong message.")
      }
    }
    

    MapActor:

    import akka.actor.Actor
    import com.citi.dw.messages.MapData
    import com.citi.dw.messages.WordCount
    import scala.collection.mutable.ListBuffer
    import akka.actor.ActorRef
    
    class MapActor(val reduceActor: ActorRef) extends Actor {
      def receive: Actor.Receive = {
        case msg: String => {
          val mapData = evaluateExpression(msg)
          reduceActor ! mapData
        }
        case _ => println("MapActor receive wrong message.")
      }
      
      private[this] def evaluateExpression(line: String): MapData = {
        val dataList = ListBuffer[WordCount]()
        line.split(" ").map(word => dataList += WordCount(word, 1))
        
    //    val wordArr = line.split(" ")
    //    for(word <- wordArr) {
    //       dataList += WordCount(word, 1)
    //    }
    //    println(dataList)
        MapData(dataList.toList)
      }
      
      
    }
    

    ReduceActor:

    import akka.actor.Actor
    import com.citi.dw.messages.MapData
    import com.citi.dw.messages.ReduceData
    import com.citi.dw.messages.WordCount
    import scala.collection.mutable.HashMap
    import akka.actor.ActorRef
    
    class ReduceActor(val aggregateActor: ActorRef) extends Actor {
    
      def receive: Actor.Receive = {
        case msg: MapData => {
          val reduceData = reduce(msg.dataList)
          aggregateActor ! reduceData
        }
        case _ => println("ReduceActor receive wrong message.")
      }
    
      private[this] def reduce(dataList: List[WordCount]): ReduceData = {
        val reduceMap = HashMap[String, Int]()
    
        for (wc <- dataList) {
          wc match {
            case WordCount(key, count) if reduceMap.contains(key) => {
              val localSumCount = reduceMap.get(key).get + count
              reduceMap += ((key, localSumCount))
              //          println(reduceMap)
            }
            case WordCount(key, count) => {
              reduceMap += ((key, 1))
              //          println(reduceMap)
            }
          }
    
        }
    
        ReduceData(reduceMap.toMap)
      }
    
    }
    

    AggregateActor:

    import akka.actor.Actor
    import com.citi.dw.messages.ReduceData
    import scala.collection.mutable.HashMap
    import com.citi.dw.messages.Result
    import akka.actor.ActorRef
    
    class AggregateActor extends Actor {
    
      private[this] var finalReduceMap = HashMap[String, Int]()
    
      def receive: Actor.Receive = {
        case msg: ReduceData => {
          aggregateAndReduce(msg.reduceDataList)
        }
        case msg: Result => {
    //      println(f"Result: ${finalReduceMap}")
    //      sender().tell(finalReduceMap.toMap, ActorRef.noSender)
          sender ! finalReduceMap.toMap
        }
        case _ => println("AggregateActor receive wrong message.")
      }
    
      private[this] def aggregateAndReduce(reduceList: Map[String, Int]) = {
    //   println(s"final: ${finalReduceMap}")
        for (key <- reduceList.keys) {
          if (finalReduceMap.contains(key)) {
         
            val count = finalReduceMap.get(key).get + reduceList.get(key).get
            finalReduceMap += ((key, count))
          } else {
            finalReduceMap += ((key, reduceList.get(key).get))
          }
        }
    
      }
    
    }
    

    主程序:

    import akka.actor.ActorSystem
    import akka.actor.Props
    import com.citi.dw.actors.MasterActor
    import com.citi.dw.messages.Result
    import akka.pattern.ask
    import scala.concurrent.duration._
    import akka.util.Timeout
    import scala.util._
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.Await
    
    object AkkaWordCount extends App {
    
      implicit val timeout = Timeout(5 seconds)
      val system = ActorSystem("WordCountAkka")
      val master = system.actorOf(Props(classOf[MasterActor]), "master")
    
      master ! "Hi! Hi!"
      master ! ("My name is Sky. I am so so so happy to be here ")
      master ! ("Today, I am going to introduce word count for Akka ")
      master ! ("I hope hope It is helpful to you ")
      master ! ("Thank you ")
    
      Thread.sleep(1000)
    
      val future = master ? Result()
    //  future.onComplete({
    //    case Success(x: String) => println(x)
    //    case Failure(t)         => println(t)
    //    case msg                => println("unknown message! " + msg)
    //  })
    
      val result = Await.result(future, timeout.duration).asInstanceOf[Map[String, Int]]
      result.map(m => println(m._1, m._2))
    
    
      system.shutdown()
    
    }
    

    运行结果:

    (for,1)
    (name,1)
    (count,1)
    (is,2)
    (am,2)
    (My,1)
    (going,1)
    (so,3)
    (introduce,1)
    (Sky.,1)
    (I,3)
    (to,3)
    (Hi!,2)
    (you,2)
    (here,1)
    (happy,1)
    (Thank,1)
    (hope,2)
    (Today,,1)
    (helpful,1)
    (Akka,1)
    (It,1)
    (be,1)
    (word,1)

     

  • 相关阅读:
    什么是动态链接库
    <<TCP/IP高效编程>>读书笔记
    C++ 函数
    我的vim配置
    FastReport4.6程序员手册_翻译
    DUnit研究初步
    ADO BUG之'无法为更新定位行....' 解决之道
    极限编程的集成测试工具Dunit
    总结
    项目管理检查清单项目启动
  • 原文地址:https://www.cnblogs.com/AK47Sonic/p/8324901.html
Copyright © 2011-2022 走看看