zoukankan      html  css  js  c++  java
  • MapReduce本地运行模式wordcount实例(附:MapReduce原理简析)

    1.      环境配置

    a)        配置系统环境变量HADOOP_HOME

     

     

     

    b)        hadoop.dll文件放到c:/windows/System32目录下

     

     

    c)        hadoop-2.6.0sharehadoopcommonsources目录下hadoop-common-2.6.0-sources.jar文件中找到orgapachehadoopio ativeioNativeIO.java文件,复制到对应的Eclipseproject NativeIO.java文件还要在原来的包名下

    d)        修改此文件的557行,替换为return true


    e)        在主机中配置虚拟机的IP和用户名



    f)         以管理员身份运行eclipse

     

     

    2.      代码(以wordcount为例)

    a)        MapReduceMapReduce两部分,加上测试,一共三部分


    b)        Map里主要解决文件分割的问题;

    package com.hadoop.hdfs.api.test.mr.wc;

     

    import java.io.IOException;

     

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Mapper;

    /*

     * KEYIN, VALUEIN, 输入的key-value数据类型

     *

     * KEYOUT, VALUEOUT 输出的key-value数据类型

     *

    */

    public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

         

    /*   key:输入的key值,偏移量

          value:输入的value,一行的内容

    */

          @Override

          protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)

                        throws IOException, InterruptedException {

                

                 //获取一行的内容

                 String linestr=value.toString();

     

                 String[] words=linestr.split(" ");//正则表达式怎么实现这些东西?

                 for (String word : words) {

                        //输出写

                        context.write(new Text(word), new LongWritable(1));

                 }

                

          }

    }

     

     

    c)        接收map的结果,然后整合输出

    package com.hadoop.hdfs.api.test.mr.wc;

     

    import java.io.IOException;

    import java.util.Iterator;

     

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Reducer;

     

    /*

     * org.apache.hadoop.mapreduceHadoop2api

    */

    public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

     

          @Override

          protected void reduce(Text key, Iterable<LongWritable> values,

                        Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {

     

                        Iterator<LongWritable>iter=values.iterator();

                        long sum=0;

                        while(iter.hasNext()){

                               sum+=iter.next().get();

                        }

                        context.write(key, new LongWritable(sum));

          }

    }

    d)        运行文件里只需要配置好路径即可

    package com.hadoop.hdfs.api.test.mr.wc;

     

     

     

     

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

     

    public class WCRunner {

     

          public static void main(String[] args) {

                

                 Configuration conf =new Configuration();

    /*          conf.set("hadoop.tmp.dir", "j:/tmp/tmpData");*/

                 try {

                        Job job=Job.getInstance(conf);

                        //指定main方法所在的类

                        job.setJarByClass(WCRunner.class);

                        //指定mapreduce

                        job.setMapperClass(WCMapper.class);

                        job.setReducerClass(WCReducer.class);

                        //指定map的输出keyvalue的数据类型

                        job.setMapOutputKeyClass(Text.class);

                        job.setMapOutputValueClass(LongWritable.class);

                        //指定reduce的输出keyvalue的数据类型

                        job.setOutputKeyClass(Text.class);

                        job.setOutputValueClass(LongWritable.class);

                        //指定输入的文件目录,这里可以是文件,也可以是目录

                        FileInputFormat.setInputPaths(job, new Path(args[0]));

                        //指定输出的文件目录,这里只能是目录,不能是文件

                        FileOutputFormat.setOutputPath(job, new Path(args[1]));

                       

                        //执行job

                        job.waitForCompletion(true);

                       

                 } catch (Exception e) {

                        // TODO Auto-generated catch block

                        e.printStackTrace();

                 }

     

          }

     

    }

     

    注:数据类型

           Value Text类型 Key LongWritable类型

           特别声明:Text的包需要特别注意

           

     

    运行前,传一下路径

    Input是读文件的路径,里面的文件就是我们要读的

    Outputreduce生成文件存放的地方

    特别声明:input路径必须存在,output必须是不存在的

    特特特别声明:用户名一定要注意,不要有空格!!!

     

     

    MapReduce原理

    MapReduce分为两大部分,Map(抓取数据、数据分割)和Reduce(处理数据,数据整合,上传数据)。

     

    从单文件看MapReduce

    1.      HDFS上读取一个文件

    2.      为本地主机分配一个Map任务

    3.      Map作业从输入数据中抽取出键值对

    4.      每一个键值对都作为参数传递给map函数,map函数产生的中间键值对被缓存在内存中。

    5.      上一阶段中解析出来的每一个键值对,调用一次map方法。如果有1000个键值对,就会调用1000map方法。每一次调用map方法会输出零个或者多个键值对。

    6.      为本地主机分配一个Reduce任务

    7.      Reduce任务读取Map任务产生的中间值并排序(因为Map任务产生的键值对可能映射到不同的分区中,当然本地只有一个分区,所以要排序),排序的目的是使相同键的键值对聚集在一起。

    8.      遍历排序后中间键值对,将具有相同键的键值对调用一次reduce方法,对每个不同的键分别调用一次reduce方法。

    9.      reduce函数产生的输出会添加到这个分区的输出文件中。

     

    从集群上看MapReduce

    1.      HDFS上的文件分块,如需要输入的文件为100MB200MB时,因为块大小为128MB,所以共分为三块,块一:100MB;块二:128MB;块三:72MB

    每个块对应一个Map,需要三个Map进程来处理

    2.      为集群上空闲的机器分配Map任务,被分配了Map作业的机器,开始读取对应分片的输入数据

    3.      与单文件过程类似

    4.      Map产生的中间键值对分为N个区保存在本地中,每个区对应一个Reduce任务,将这N个区的位置报告给集群中负责调度的机器,让其将位置信息转发给已分配好Reduce任务的机器。

    5.      Reduce任务的机器从刚获取的地址处,读取中间键值对,然后与单文件类似

    6.      所有执行完毕后,MapReduce输出放在了N个分区的输出文件中(分别对应一个Reduce作业)。用户通常并不需要合并这N个文件,而是将其作为输入交给另一个MapReduce程序处理。整个过程中,输入数据是来自底层分布式文件系统(GFS)的,中间数据是放在本地文件系统的,最终输出数据是写入底层分布式文件系统(GFS)的。

     

     

     

    注:关于MapReduce之间的数据传输过程,MapReduce的核心Shuffle,现在知识有限。只知道它的作用,不知道为什么作用,希望过几天可以整理一下

  • 相关阅读:
    48. Rotate Image
    47. Permutations II
    46. Permutations
    45. Jump Game II
    44. Wildcard Matching
    43. Multiply Strings
    42. Trapping Rain Water
    41. First Missing Positive
    40. Combination Sum II
    39. Combination Sum
  • 原文地址:https://www.cnblogs.com/wxplmm/p/7253934.html
Copyright © 2011-2022 走看看