zoukankan      html  css  js  c++  java
  • MapReduce的原理及执行过程

    MapReduce简介

    1. MapReduce是一种分布式计算模型,是Google提出的,主要用于搜索领域,解决海量数据的计算问题。
    2. MR有两个阶段组成:Map和Reduce,用户只需实现map()和reduce()两个函数,即可实现分布式计算。

    MapReduce执行流程

     MapReduce原理

     

     MapReduce的执行步骤:

    1、Map任务处理

      1.1 读取HDFS中的文件。每一行解析成一个<k,v>。每一个键值对调用一次map函数。                <0,hello you>   <10,hello me>                    

      1.2 覆盖map(),接收1.1产生的<k,v>,进行处理,转换为新的<k,v>输出。          <hello,1> <you,1> <hello,1> <me,1>

      1.3 对1.2输出的<k,v>进行分区。默认分为一个区。详见《Partitioner

      1.4 对不同分区中的数据进行排序(按照k)、分组。分组指的是相同key的value放到一个集合中。 排序后:<hello,1> <hello,1> <me,1> <you,1>  分组后:<hello,{1,1}><me,{1}><you,{1}>

      1.5 (可选)对分组后的数据进行归约。详见《Combiner

    2、Reduce任务处理

      2.1 多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点上。(shuffle)详见《shuffle过程分析

      2.2 对多个map的输出进行合并、排序。覆盖reduce函数,接收的是分组后的数据,实现自己的业务逻辑, <hello,2> <me,1> <you,1>

        处理后,产生新的<k,v>输出。

      2.3 对reduce输出的<k,v>写到HDFS中。

    Java代码实现

    注:要导入org.apache.hadoop.fs.FileUtil.java。

    1、先创建一个hello文件,上传到HDFS中

    2、然后再编写代码,实现文件中的单词个数统计(代码中被注释掉的代码,是可以省略的,不省略也行)

    复制代码
      1 package mapreduce;
      2 
      3 import java.net.URI;
      4 import org.apache.hadoop.conf.Configuration;
      5 import org.apache.hadoop.fs.FileSystem;
      6 import org.apache.hadoop.fs.Path;
      7 import org.apache.hadoop.io.LongWritable;
      8 import org.apache.hadoop.io.Text;
      9 import org.apache.hadoop.mapreduce.Job;
     10 import org.apache.hadoop.mapreduce.Mapper;
     11 import org.apache.hadoop.mapreduce.Reducer;
     12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
     14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     16 
     17 public class WordCountApp {
     18     static final String INPUT_PATH = "hdfs://chaoren:9000/hello";
     19     static final String OUT_PATH = "hdfs://chaoren:9000/out";
     20 
     21     public static void main(String[] args) throws Exception {
     22         Configuration conf = new Configuration();
     23         FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
     24         Path outPath = new Path(OUT_PATH);
     25         if (fileSystem.exists(outPath)) {
     26             fileSystem.delete(outPath, true);
     27         }
     28 
     29         Job job = new Job(conf, WordCountApp.class.getSimpleName());
     30 
     31         // 1.1指定读取的文件位于哪里
     32         FileInputFormat.setInputPaths(job, INPUT_PATH);
     33         // 指定如何对输入的文件进行格式化,把输入文件每一行解析成键值对
     34         //job.setInputFormatClass(TextInputFormat.class);
     35 
     36         // 1.2指定自定义的map类
     37         job.setMapperClass(MyMapper.class);
     38         // map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略
     39         //job.setOutputKeyClass(Text.class);
     40         //job.setOutputValueClass(LongWritable.class);
     41 
     42         // 1.3分区
     43         //job.setPartitionerClass(org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class);
     44         // 有一个reduce任务运行
     45         //job.setNumReduceTasks(1);
     46 
     47         // 1.4排序、分组
     48 
     49         // 1.5归约
     50 
     51         // 2.2指定自定义reduce类
     52         job.setReducerClass(MyReducer.class);
     53         // 指定reduce的输出类型
     54         job.setOutputKeyClass(Text.class);
     55         job.setOutputValueClass(LongWritable.class);
     56 
     57         // 2.3指定写出到哪里
     58         FileOutputFormat.setOutputPath(job, outPath);
     59         // 指定输出文件的格式化类
     60         //job.setOutputFormatClass(TextOutputFormat.class);
     61 
     62         // 把job提交给jobtracker运行
     63         job.waitForCompletion(true);
     64     }
     65 
     66     /**
     67      * 
     68      * KEYIN     即K1     表示行的偏移量 
     69      * VALUEIN     即V1     表示行文本内容 
     70      * KEYOUT     即K2     表示行中出现的单词 
     71      * VALUEOUT 即V2        表示行中出现的单词的次数,固定值1
     72      * 
     73      */
     74     static class MyMapper extends
     75             Mapper<LongWritable, Text, Text, LongWritable> {
     76         protected void map(LongWritable k1, Text v1, Context context)
     77                 throws java.io.IOException, InterruptedException {
     78             String[] splited = v1.toString().split("	");
     79             for (String word : splited) {
     80                 context.write(new Text(word), new LongWritable(1));
     81             }
     82         };
     83     }
     84 
     85     /**
     86      * KEYIN     即K2     表示行中出现的单词 
     87      * VALUEIN     即V2     表示出现的单词的次数 
     88      * KEYOUT     即K3     表示行中出现的不同单词
     89      * VALUEOUT 即V3     表示行中出现的不同单词的总次数
     90      */
     91     static class MyReducer extends
     92             Reducer<Text, LongWritable, Text, LongWritable> {
     93         protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s,
     94                 Context ctx) throws java.io.IOException,
     95                 InterruptedException {
     96             long times = 0L;
     97             for (LongWritable count : v2s) {
     98                 times += count.get();
     99             }
    100             ctx.write(k2, new LongWritable(times));
    101         };
    102     }
    103 }
    复制代码

    3、运行成功后,可以在Linux中查看操作的结果

    转载自:https://www.cnblogs.com/ahu-lichang/p/6645074.html

  • 相关阅读:
    Java vs Python
    Compiled Language vs Scripting Language
    445. Add Two Numbers II
    213. House Robber II
    198. House Robber
    276. Paint Fence
    77. Combinations
    54. Spiral Matrix
    82. Remove Duplicates from Sorted List II
    80. Remove Duplicates from Sorted Array II
  • 原文地址:https://www.cnblogs.com/lyy-blog/p/8527343.html
Copyright © 2011-2022 走看看