zoukankan      html  css  js  c++  java
  • MapReduce 入门与 WordCount 讲解

    1. MapReduce的思想核心是“分而治之”

    Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。

    Reduce负责“合”,即对map阶段的结果进行全局汇总。

    MapReduce运行在yarn集群

    1. ResourceManager
    2. NodeManager

    Map和Reduce为程序员提供了一个清晰的操作接口抽象描述。MapReduce中定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现.Map和Reduce,MapReduce处理的数据类型是<key,value>键值对。

    Map: (k1; v1) → [(k2; v2)]
    Reduce: (k2; [v2]) → [(k3; v3)]
    

    2. MapReduce 编程规范

    MapReduce 的开发一共有八个步骤, 其中 Map 阶段分为 2 个步骤,Shuffle阶段4个步骤,Reduce阶段分为2个步骤

    Map 阶段 2 个步骤

    1. 设置 InputFormat 类, 将数据切分为 Key-Value(K1和V1) 对, 输入到第二步
    2. 自定义 Map 逻辑, 将第一步的结果转换成另外的 Key-Value(K2和V2) 对, 输出结果

    Shuffe 阶段 4 个步骤

    1. 分区 Partitioner 对输出的 Key-Value 对进行分区
    2. 排序 Sort 对不同分区的数据按照相同的 Key 排序
    3. 规约 Combiner (可选) 对分组过的数据初步规约, 降低数据的网络拷贝
    4. 分组 parse 对数据进行分组, 相同 Key 的 Value 放入一个集合中

    Reduce 阶段 2 个步骤

    1. 对多个 Map 任务的结果进行排序以及合并, 编写 Reduce 函数实现自己的逻辑, 对输入的 Key-Value 进行处理, 转为新的 Key-Value(K3和V3)输出
    2. 设置 OutputFormat 处理并保存 Reduce 输出的 Key-Value 数据

    3. WordCount 实例

    step 1:Mapper类

    /**
     * @Author hwj
     * @Date 2020/8/7 12:54
     * @Desc: 将 k1 v1 转换为 k2 v2
     **/
    /*
    k1 行偏移量   LongWritable
    v1 行文本     Text
    k2 单词文本   Text
    v2 个数       LongWritable
     */
    /*
    步骤:重写提供的map方法
    1. 拆开文本数据
    2. 获取 k2,v2
    3. 将 k2,v2 写入上下文中
     */
    public class WordCountMapper extends Mapper<LongWritable, Text,Text,LongWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            Text text = new Text();
            LongWritable longWritable = new LongWritable();
            // 拿到一行数据转换为 String
            // 将这一行切分成各个单词
            // 遍历数组,输出 k2,v2,写入到上下文中
            String line = value.toString();
            String[] words = line.split(" ");
            for(String word: words){
                text.set(word);
                longWritable.set(1);
                context.write(text,longWritable);
            }
        }
    }
    

    step2 reducer 类

    /**
     * @Author hwj
     * @Date 2020/8/7 16:11
     * @Desc: 将 k2 v2 转化为 k3,v3
     **/
    /*
    k2 单词文本            Text
    v2 <迭代器> 个数       LongWritable    <1,1,1,1>
    k3 单词文本             Text
    v3 每个单词累积个数  LongWritable       4
     */
    public class WordCountReducer extends Reducer<Text, LongWritable,Text,LongWritable> {
        /*
    步骤:重写提供的 reduce 方法
    1. 拆开文本数据
    2. 获取 k2,v2
    3. 将 k2,v2 写入上下文中
     */
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
            long count=0;
            for(LongWritable value:values){
                count=count+value.get();
            }
            context.write(key,new LongWritable(count));
        }
    }
    

    step 3 主函数 Main

    /**
     * @Author hwj
     * @Date 2020/8/7 16:26
     * @Desc: 将业务逻辑相关的信息:哪个是 Mapper,哪个是 reducer,要处理的数据在哪,输出的数据放哪) 描述成一个 job对象
     *
     **/
    public class WordCountMain {
        // 将这个描述好的对象提交给集群去运行
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            //  Configuration 封装了对应客户端或服务器的配置
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            job.setJarByClass(WordCountMain.class);
            // 指定Map阶段的处理方式
            job.setMapperClass(WordCountMapper.class);
            // 指定 reduce 阶段的处理方式
            job.setReducerClass(WordCountReducer.class);
            // 指定 Map 阶段键值对输出的数据类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
            // 指定 reduce 阶段输出到文件的键值对类型
    
    //        FileInputFormat.setInputPaths(job,new Path("file:///G:\input"));
            FileInputFormat.setInputPaths(job,"hdfs://node01:8020/wordcount");
            FileOutputFormat.setOutputPath(job,new Path("hdfs://node01:8020/wordcount/output"));
            // 向 yarn 集群提交这个 job
            boolean res=job.waitForCompletion(true);
            System.exit(res?0:1);
        }
    }
    
    • 若是输入输出路径均在本地,则直接运行程序。
    • 这里建议现在本地运行,查看代码是否有错。
    • 输出路径一定要不存在,已存在会报错
    • 下面是当输入输出路径在hdfs文件系统中的后续步骤

    step4 打包 jar,并上传

    • 在 maven 区 clean-->package
    • 上传到node01节点的相应路径下

    step5 执行程序,获得统计词频的输出

    在jar包存放目录下执行
    hadoop jar mapreduce_wordcount-1.0-SNAPSHOT.jar pers.hwj.WordCountMain
    其中,pers.hwj.WordCountMain,是主函数的路径(主函数右键 Copy Reference

    问题及回答

    mapreduce 需要注意什么

    1. 文件输入,若文件都是小文件,可以先做小文件的合并
    2. 若设置分区,则主函数里面要配置 reducetask 的个数
    3. 注意输入输出的k,v相应的数据类型要考虑清楚

    源码 : https://github.com/aliyundata/HadoopProject/tree/master/mapreduce_wordcount

    初晨暖阳,夜落星河。 少年披梦,远方有歌。 红黄之上,春夏晚风。 闲肆游走,人群熙攘。
  • 相关阅读:
    C# 提取方法重构
    防抖和节流
    利用Object.keys快速循环对象
    MVVM深入理解---小白速会
    异步组件使用详解
    动态组件使用详解
    Vue.$nextTick详解
    深入理解vue .sync修饰符
    vue计算属性---快速上手
    grid-layout 网格布局--快速上手
  • 原文地址:https://www.cnblogs.com/alidata/p/13448603.html
Copyright © 2011-2022 走看看