zoukankan      html  css  js  c++  java
  • MapReduce 入门与 WordCount 讲解

    1. MapReduce的思想核心是“分而治之”

    Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。

    Reduce负责“合”,即对map阶段的结果进行全局汇总。

    MapReduce运行在yarn集群

    1. ResourceManager
    2. NodeManager

    Map和Reduce为程序员提供了一个清晰的操作接口抽象描述。MapReduce中定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现.Map和Reduce,MapReduce处理的数据类型是<key,value>键值对。

    Map: (k1; v1) → [(k2; v2)]
    Reduce: (k2; [v2]) → [(k3; v3)]
    

    2. MapReduce 编程规范

    MapReduce 的开发一共有八个步骤, 其中 Map 阶段分为 2 个步骤,Shuffle阶段4个步骤,Reduce阶段分为2个步骤

    Map 阶段 2 个步骤

    1. 设置 InputFormat 类, 将数据切分为 Key-Value(K1和V1) 对, 输入到第二步
    2. 自定义 Map 逻辑, 将第一步的结果转换成另外的 Key-Value(K2和V2) 对, 输出结果

    Shuffe 阶段 4 个步骤

    1. 分区 Partitioner 对输出的 Key-Value 对进行分区
    2. 排序 Sort 对不同分区的数据按照相同的 Key 排序
    3. 规约 Combiner (可选) 对分组过的数据初步规约, 降低数据的网络拷贝
    4. 分组 parse 对数据进行分组, 相同 Key 的 Value 放入一个集合中

    Reduce 阶段 2 个步骤

    1. 对多个 Map 任务的结果进行排序以及合并, 编写 Reduce 函数实现自己的逻辑, 对输入的 Key-Value 进行处理, 转为新的 Key-Value(K3和V3)输出
    2. 设置 OutputFormat 处理并保存 Reduce 输出的 Key-Value 数据

    3. WordCount 实例

    step 1:Mapper类

    /**
     * @Author hwj
     * @Date 2020/8/7 12:54
     * @Desc: 将 k1 v1 转换为 k2 v2
     **/
    /*
    k1 行偏移量   LongWritable
    v1 行文本     Text
    k2 单词文本   Text
    v2 个数       LongWritable
     */
    /*
    步骤:重写提供的map方法
    1. 拆开文本数据
    2. 获取 k2,v2
    3. 将 k2,v2 写入上下文中
     */
    public class WordCountMapper extends Mapper<LongWritable, Text,Text,LongWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            Text text = new Text();
            LongWritable longWritable = new LongWritable();
            // 拿到一行数据转换为 String
            // 将这一行切分成各个单词
            // 遍历数组,输出 k2,v2,写入到上下文中
            String line = value.toString();
            String[] words = line.split(" ");
            for(String word: words){
                text.set(word);
                longWritable.set(1);
                context.write(text,longWritable);
            }
        }
    }
    

    step2 reducer 类

    /**
     * @Author hwj
     * @Date 2020/8/7 16:11
     * @Desc: 将 k2 v2 转化为 k3,v3
     **/
    /*
    k2 单词文本            Text
    v2 <迭代器> 个数       LongWritable    <1,1,1,1>
    k3 单词文本             Text
    v3 每个单词累积个数  LongWritable       4
     */
    public class WordCountReducer extends Reducer<Text, LongWritable,Text,LongWritable> {
        /*
    步骤:重写提供的 reduce 方法
    1. 拆开文本数据
    2. 获取 k2,v2
    3. 将 k2,v2 写入上下文中
     */
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
            long count=0;
            for(LongWritable value:values){
                count=count+value.get();
            }
            context.write(key,new LongWritable(count));
        }
    }
    

    step 3 主函数 Main

    /**
     * @Author hwj
     * @Date 2020/8/7 16:26
     * @Desc: 将业务逻辑相关的信息:哪个是 Mapper,哪个是 reducer,要处理的数据在哪,输出的数据放哪) 描述成一个 job对象
     *
     **/
    public class WordCountMain {
        // 将这个描述好的对象提交给集群去运行
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            //  Configuration 封装了对应客户端或服务器的配置
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            job.setJarByClass(WordCountMain.class);
            // 指定Map阶段的处理方式
            job.setMapperClass(WordCountMapper.class);
            // 指定 reduce 阶段的处理方式
            job.setReducerClass(WordCountReducer.class);
            // 指定 Map 阶段键值对输出的数据类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
            // 指定 reduce 阶段输出到文件的键值对类型
    
    //        FileInputFormat.setInputPaths(job,new Path("file:///G:\input"));
            FileInputFormat.setInputPaths(job,"hdfs://node01:8020/wordcount");
            FileOutputFormat.setOutputPath(job,new Path("hdfs://node01:8020/wordcount/output"));
            // 向 yarn 集群提交这个 job
            boolean res=job.waitForCompletion(true);
            System.exit(res?0:1);
        }
    }
    
    • 若是输入输出路径均在本地,则直接运行程序。
    • 这里建议现在本地运行,查看代码是否有错。
    • 输出路径一定要不存在,已存在会报错
    • 下面是当输入输出路径在hdfs文件系统中的后续步骤

    step4 打包 jar,并上传

    • 在 maven 区 clean-->package
    • 上传到node01节点的相应路径下

    step5 执行程序,获得统计词频的输出

    在jar包存放目录下执行
    hadoop jar mapreduce_wordcount-1.0-SNAPSHOT.jar pers.hwj.WordCountMain
    其中,pers.hwj.WordCountMain,是主函数的路径(主函数右键 Copy Reference

    问题及回答

    mapreduce 需要注意什么

    1. 文件输入,若文件都是小文件,可以先做小文件的合并
    2. 若设置分区,则主函数里面要配置 reducetask 的个数
    3. 注意输入输出的k,v相应的数据类型要考虑清楚

    源码 : https://github.com/aliyundata/HadoopProject/tree/master/mapreduce_wordcount

    初晨暖阳,夜落星河。 少年披梦,远方有歌。 红黄之上,春夏晚风。 闲肆游走,人群熙攘。
  • 相关阅读:
    vue自定义指令
    ZOJ Problem Set–2104 Let the Balloon Rise
    ZOJ Problem Set 3202 Secondprice Auction
    ZOJ Problem Set–1879 Jolly Jumpers
    ZOJ Problem Set–2405 Specialized FourDigit Numbers
    ZOJ Problem Set–1874 Primary Arithmetic
    ZOJ Problem Set–1970 All in All
    ZOJ Problem Set–1828 Fibonacci Numbers
    要怎么样调整状态呢
    ZOJ Problem Set–1951 Goldbach's Conjecture
  • 原文地址:https://www.cnblogs.com/alidata/p/13448603.html
Copyright © 2011-2022 走看看