zoukankan      html  css  js  c++  java
  • Scala零基础教学【90-101】Akka 实战-代码实现

    第90讲:基于Scala的Actor之上的分布式并发消息驱动框架Akka初体验

    akka在业界使用非常广泛

    spark背后就是由akka驱动的

    要写消息驱动的编程模型都首推akka

    下面将用30讲讲解akka

    本讲主要讲两部分内容:

    1.akka的重大意义

    2.akka与scala的actor

    Spark源码中使用akka使用鉴赏:

    在spark中有200行左右代码封装了akka的使用

    spark是分布式的计算框架,有master和slave主从节点通信时都是使用akka。

    客户端提交程序时也是使用akka。所以如果要掌握spark必须要理解和掌握akka。

    第91讲:Akka第一个案例动手实战架构设计

    The quick brown fox tried to jump over the lazy dog and fell on the dog
    A dog is a man's best friend

    Map Task -> Reduce Task --|
                                                |-> Aggregate Task
    Map Task -> Reduce Task --|

    Master Actor
    |
    --------------------|---------------------
    |                              |                    |
    Map Actor Reduce Actor Aggregate Actor

    Master Actor给MapActor发一个字符串
    Map Actor根据规则对单词计数,计数完成后把结果传递给MasterActor
    MasterActor把MapData以消息发给reduceActor,
    reduceActor会reduceByKey,把相同单词(key相同)计数相加。
    计算完后再把数据传回给MasterActor。
    如果有多条字符串就会有多组reduce结果。
    MasterActor再把结果发给AggregateActor,进行最后统计
    MasterActor要获得结果需要给AggregateActor发一个空消息,
    AggregateActor收到消息就会把所有统计结果发给MasterActor
    这就是mapReduce计算模型。
    与hadoop的mapreduce不同的是这是基于actor的。
    MapActor对map产生的结果进行本地化统计,
    AggregateActor才相当于hadoop的reducer。
    后面先通过java使用akka。

    第92讲:Akka第一个案例动手实战开发环境的搭建

    第93讲:Akka第一个案例动手实战开发消息实体类

    MapData:

    package akka.dt.app.java.messages;
    
    import java.util.List;
    
    /**
     * sunrunzhi
     * 2018/11/12 9:46
     * 用来让MapActor处理数据后存储在MapData实体中
     * 然后方便将结果交给ReduceActor
     */
    public class MapData {
        private List<WordCount> dataList;
        public List<WordCount> getDataList(){return dataList;}
        public MapData(List<WordCount> dataList){
            this.dataList=dataList;
        }
    
    }
    

    ReduceData:

    package akka.dt.app.java.messages;
    
    import java.util.HashMap;
    
    /**
     * sunrunzhi
     * 2018/11/12 13:35
     */
    public class ReduceData {
        private HashMap<String,Integer> reduceDataList;
        public HashMap<String,Integer> getReduceDataList(){
            return reduceDataList;
        }
        public ReduceData(HashMap<String,Integer> reduceDataList){
            this.reduceDataList=reduceDataList;
        }
    }
    

    Result:

    package akka.dt.app.java.messages;
    
    /**
     * sunrunzhi
     * 2018/11/12 9:58
     */
    public class Result {
        /*传入的字符串先交给MapActor进行切分,然后交给ReduceActor进行本地统计,
          最后交给AggregateActor进行全局的统计,
          想要获得这个结果,通过MasterActor发一个消息Result,Result本身为空,不需要有任何内容。
          这个消息交给MasterActor,MasterActor收到消息时,如果消息是result类型的话转过来会告诉AggregateActor,
          再转发给AggregateActor。*/
    }
    

    WordCount:

    package akka.dt.app.java.messages;
    
    /**
     * sunrunzhi
     * 2018/11/12 9:52
     * WordCount-javaBean
     */
    public class WordCount {
        private String word;
        private Integer count;
        public WordCount(String inWord,Integer inCount){
            word=inWord;
            count=inCount;
        }
    
        public String getWord(){return word;}
        public Integer getCount(){return count;}
    
    }
    

    HelloAkka:

    package akka.dt.app.java.messages;
    
    
    import akka.actor.ActorRef;
    import akka.actor.ActorSystem;
    import akka.actor.Props;
    import akka.dt.app.java.actors.MasterActor;
    
    /**
     * sunrunzhi
     * 2018/11/9 20:14
     */
    public class HelloAkka {
    
        public static void main(String[] args)throws Exception{
            ActorSystem _system=ActorSystem.create("HelloAkka");
            ActorRef master=_system.actorOf(new Props(MasterActor.class),"master");
            master.tell("Hi,My name is Rocky. I'm so so so so happy to me here.");
            master.tell("Today,I'm going to read a news article for so you.");
            master.tell("I hope I hope you'll like it.");
    
            Thread.sleep(500);
            master.tell(new Result());
            Thread.sleep(500);
            _system.shutdown();
    
        }
    }
    

    第94讲:Akka第一个案例动手实战MapActor、ReduceActor、AggregateActor代码详解

    AggregateActor:

    package akka.dt.app.java.actors;
    
    import akka.actor.UntypedActor;
    import akka.dt.app.java.messages.ReduceData;
    import akka.dt.app.java.messages.Result;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * sunrunzhi
     * 2018/11/9 20:18
     */
    public class AggregateActor extends UntypedActor{
        private Map<String,Integer> finalReducedMap=new HashMap<String, Integer>();
        @Override
        public void onReceive(Object message)throws Exception{
            //AggregateActor收到消息有两种,一种是ReduceData类型,一种是Result类型
            if(message instanceof ReduceData){
                ReduceData reduceData=(ReduceData)message;
                aggregateInMemoryReduce(reduceData.getReduceDataList());
            }else if(message instanceof Result){
                System.out.println(finalReducedMap.toString());
            }else{
                unhandled(message);
            }
        }
    
        private void aggregateInMemoryReduce(Map<String,Integer> reduceList){
            Integer count=null;
            for (String key:reduceList.keySet()){
                if(finalReducedMap.containsKey(key)){
                    count=reduceList.get(key)+finalReducedMap.get(key);
                    finalReducedMap.put(key,count);
                }else{
                    finalReducedMap.put(key,reduceList.get(key));
                }
            }
        }
    
    
    
    }
    

    MapActor:

    package akka.dt.app.java.actors;
    
    import akka.actor.ActorRef;
    import akka.actor.UntypedActor;
    import akka.dt.app.java.messages.MapData;
    import akka.dt.app.java.messages.WordCount;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.StringTokenizer;
    
    /**
     * sunrunzhi
     * 2018/11/9 20:17
     */
    public class MapActor extends UntypedActor {
        String[] STOP_WORDS = {"a", "is"};
        private ActorRef reduceActor = null;
        private List<String> STOP_WORDS_LIST = Arrays.asList(STOP_WORDS);
    
        public MapActor(ActorRef inReduceActor) {
            reduceActor = inReduceActor;
        }
    
        @Override
        public void onReceive(Object message) throws Exception {
            if (message instanceof String) {
                String work = (String) message;
                //map the words in the sentence
                MapData data = evaluateExpression(work);//产生MapData实体
                // send the result to ReduceActor
                reduceActor.tell(data);
            } else {
                unhandled(message);
            }
        }
    
        private MapData evaluateExpression(String line) {
            List<WordCount> dataList = new ArrayList<WordCount>();
            StringTokenizer parser = new StringTokenizer(line);
            while (parser.hasMoreTokens()){
                String word =parser.nextToken().toLowerCase();
                if (!STOP_WORDS_LIST.contains(word)) {
                    dataList.add(new WordCount(word, Integer.valueOf(1)));
                }
            }
            return new MapData(dataList);
        }
    }
    

      

    ReduceActor:

    package akka.dt.app.java.actors;
    
    import akka.actor.ActorRef;
    import akka.actor.UntypedActor;
    import akka.dt.app.java.messages.MapData;
    import akka.dt.app.java.messages.ReduceData;
    import akka.dt.app.java.messages.WordCount;
    import com.sun.javafx.collections.MappingChange;
    
    import java.util.HashMap;
    import java.util.List;
    
    /**
     * sunrunzhi
     * 2018/11/9 20:17
     */
    public class ReduceActor extends UntypedActor{
            private ActorRef aggregateActor =null;
            public ReduceActor(ActorRef inAggregateActor) {
                aggregateActor=inAggregateActor;
            }
    
            @Override
            public void onReceive(Object message) throws Exception{
                //AggregateActor收到的消息有两种,一种是ReduceData类型,一种是Result类型
                if(message instanceof MapData){
                    MapData mapData =(MapData)message;
                    //reduce the incoming data
                    ReduceData reduceData=reduce(mapData.getDataList());
                    //forward the result to aggregate actor
                    aggregateActor.tell(reduceData);
                }else {
                    unhandled(message);
                }
            }
    
            private ReduceData reduce(List<WordCount> dataList){
                HashMap<String,Integer> reducedMap=new HashMap<String, Integer>();
                for(WordCount wordCount:dataList){
                    if(reducedMap.containsKey(wordCount.getWord())){
                        Integer value=(Integer)reducedMap.get(wordCount.getWord());
                        value++;
                        reducedMap.put(wordCount.getWord(),value);
                    }else{
                        reducedMap.put(wordCount.getWord(),Integer.valueOf(1));
                    }
                }
                return new ReduceData(reducedMap);
            }
    }
    
    MasterActor:
    package akka.dt.app.java.actors;
    
    import akka.actor.*;
    import akka.dt.app.java.messages.Result;
    
    import java.util.Arrays;
    import java.util.List;
    
    /**
     * sunrunzhi
     * 2018/11/9 20:18
     */
    public class MasterActor extends UntypedActor {
        private ActorRef aggregaterActor = getContext().actorOf(
                new Props(AggregateActor.class), "aggregate");
    
        private ActorRef reduceActor=getContext().actorOf(
                new Props(new UntypedActorFactory() {
                    public UntypedActor create() {
                        return new ReduceActor(aggregaterActor);
                    }
                }),"reduce");
    
        private ActorRef mapActor=getContext().actorOf(
                new Props(new UntypedActorFactory(){
                    public UntypedActor create(){
                        return new MapActor(reduceActor);
                    }
                }),"map");
    
    
        @Override
        public void onReceive(Object message) throws Exception{
            if (message instanceof String) {
                mapActor.tell(message);
            } else if (message instanceof Result) {
                aggregaterActor.tell(message);
            } else{
                unhandled(message);
            }
        }
    
    
    }
    

    第95讲:Akka第一个案例动手实战MasterActor代码详解 

    第96讲:Akka第一个案例动手实战main方法实现中ActorSystem等代码详解

    第97讲:使用SBT开发Akka第一个案例环境搭建详解

     SBT import失败:

     一步一个坑。凑人~~~

    之前调整的项目有报莫名其妙的BUG。

    第一个BUG。

    解决方案:标识红色处打上对号。

     总有些奇奇怪怪的BUG,莫名其妙的又没有了........

    第98讲:使用SBT开发时动手解决rt.jar中CharSequence is broken等问题

    第99讲:手动Artifacts打包并运行SBT开发Akka第一个案例

    第100讲:使用SBT开发Akka第一个案例源码解析消息、main入口、MasterActor

    第101讲:使用SBT开发Akka第一个案例源码解析MapActor、ReduceActor、AggregateActor

  • 相关阅读:
    第一次冲刺站立会议03
    第二次冲刺计划会议
    梦断代码阅读笔记02
    学习进度12
    个人项目——找水王
    学习进度11
    梦断代码阅读笔记01
    学习进度10
    学习进度09
    第一次冲刺个人博客10
  • 原文地址:https://www.cnblogs.com/sunrunzhi/p/9931345.html
Copyright © 2011-2022 走看看