zoukankan      html  css  js  c++  java
  • MapReduce的原理及执行过程

    MapReduce简介

    1. MapReduce是一种分布式计算模型,是Google提出的,主要用于搜索领域,解决海量数据的计算问题。
    2. MR有两个阶段组成:Map和Reduce,用户只需实现map()和reduce()两个函数,即可实现分布式计算。

    MapReduce执行流程

     MapReduce原理

     

     MapReduce的执行步骤:

    1、Map任务处理

      1.1 读取HDFS中的文件。每一行解析成一个<k,v>。每一个键值对调用一次map函数。                <0,hello you>   <10,hello me>                    

      1.2 覆盖map(),接收1.1产生的<k,v>,进行处理,转换为新的<k,v>输出。          <hello,1> <you,1> <hello,1> <me,1>

      1.3 对1.2输出的<k,v>进行分区。默认分为一个区。详见《Partitioner

      1.4 对不同分区中的数据进行排序(按照k)、分组。分组指的是相同key的value放到一个集合中。 排序后:<hello,1> <hello,1> <me,1> <you,1>  分组后:<hello,{1,1}><me,{1}><you,{1}>

      1.5 (可选)对分组后的数据进行归约。详见《Combiner

    2、Reduce任务处理

      2.1 多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点上。(shuffle)详见《shuffle过程分析

      2.2 对多个map的输出进行合并、排序。覆盖reduce函数,接收的是分组后的数据,实现自己的业务逻辑, <hello,2> <me,1> <you,1>

        处理后,产生新的<k,v>输出。

      2.3 对reduce输出的<k,v>写到HDFS中。

    Java代码实现

    注:要导入org.apache.hadoop.fs.FileUtil.java。

    1、先创建一个hello文件,上传到HDFS中

    2、然后再编写代码,实现文件中的单词个数统计(代码中被注释掉的代码,是可以省略的,不省略也行)

      1 package mapreduce;
      2 
      3 import java.net.URI;
      4 import org.apache.hadoop.conf.Configuration;
      5 import org.apache.hadoop.fs.FileSystem;
      6 import org.apache.hadoop.fs.Path;
      7 import org.apache.hadoop.io.LongWritable;
      8 import org.apache.hadoop.io.Text;
      9 import org.apache.hadoop.mapreduce.Job;
     10 import org.apache.hadoop.mapreduce.Mapper;
     11 import org.apache.hadoop.mapreduce.Reducer;
     12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
     14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     16 
     17 public class WordCountApp {
     18     static final String INPUT_PATH = "hdfs://chaoren:9000/hello";
     19     static final String OUT_PATH = "hdfs://chaoren:9000/out";
     20 
     21     public static void main(String[] args) throws Exception {
     22         Configuration conf = new Configuration();
     23         FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
     24         Path outPath = new Path(OUT_PATH);
     25         if (fileSystem.exists(outPath)) {
     26             fileSystem.delete(outPath, true);
     27         }
     28 
     29         Job job = new Job(conf, WordCountApp.class.getSimpleName());
     30 
     31         // 1.1指定读取的文件位于哪里
     32         FileInputFormat.setInputPaths(job, INPUT_PATH);
     33         // 指定如何对输入的文件进行格式化,把输入文件每一行解析成键值对
     34         //job.setInputFormatClass(TextInputFormat.class);
     35 
     36         // 1.2指定自定义的map类
     37         job.setMapperClass(MyMapper.class);
     38         // map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略
     39         //job.setOutputKeyClass(Text.class);
     40         //job.setOutputValueClass(LongWritable.class);
     41 
     42         // 1.3分区
     43         //job.setPartitionerClass(org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class);
     44         // 有一个reduce任务运行
     45         //job.setNumReduceTasks(1);
     46 
     47         // 1.4排序、分组
     48 
     49         // 1.5归约
     50 
     51         // 2.2指定自定义reduce类
     52         job.setReducerClass(MyReducer.class);
     53         // 指定reduce的输出类型
     54         job.setOutputKeyClass(Text.class);
     55         job.setOutputValueClass(LongWritable.class);
     56 
     57         // 2.3指定写出到哪里
     58         FileOutputFormat.setOutputPath(job, outPath);
     59         // 指定输出文件的格式化类
     60         //job.setOutputFormatClass(TextOutputFormat.class);
     61 
     62         // 把job提交给jobtracker运行
     63         job.waitForCompletion(true);
     64     }
     65 
     66     /**
     67      * 
     68      * KEYIN     即K1     表示行的偏移量 
     69      * VALUEIN     即V1     表示行文本内容 
     70      * KEYOUT     即K2     表示行中出现的单词 
     71      * VALUEOUT 即V2        表示行中出现的单词的次数,固定值1
     72      * 
     73      */
     74     static class MyMapper extends
     75             Mapper<LongWritable, Text, Text, LongWritable> {
     76         protected void map(LongWritable k1, Text v1, Context context)
     77                 throws java.io.IOException, InterruptedException {
     78             String[] splited = v1.toString().split("	");
     79             for (String word : splited) {
     80                 context.write(new Text(word), new LongWritable(1));
     81             }
     82         };
     83     }
     84 
     85     /**
     86      * KEYIN     即K2     表示行中出现的单词 
     87      * VALUEIN     即V2     表示出现的单词的次数 
     88      * KEYOUT     即K3     表示行中出现的不同单词
     89      * VALUEOUT 即V3     表示行中出现的不同单词的总次数
     90      */
     91     static class MyReducer extends
     92             Reducer<Text, LongWritable, Text, LongWritable> {
     93         protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s,
     94                 Context ctx) throws java.io.IOException,
     95                 InterruptedException {
     96             long times = 0L;
     97             for (LongWritable count : v2s) {
     98                 times += count.get();
     99             }
    100             ctx.write(k2, new LongWritable(times));
    101         };
    102     }
    103 }

    3、运行成功后,可以在Linux中查看操作的结果

  • 相关阅读:
    Git中从远程的分支获取最新的版本到本地方式
    vector map迭代器失效解决方案
    git 远程库 创建私钥
    centos type.h 编译错误问题
    关于/usr/bin/ld: cannot find -lcrypto 的错误
    线程存储(Thread Specific Data)
    bgcolor RGB 和16进制之间的转换,16进制转RGB,源码
    html 网页代码大全,总结,使用
    使用iframe的好处与坏处详细比拼
    java向MySQL插入当前时间的四种方式和java时间日期格式化的几种方法(案例说明)
  • 原文地址:https://www.cnblogs.com/ahu-lichang/p/6645074.html
Copyright © 2011-2022 走看看