zoukankan      html  css  js  c++  java
  • MapReduce本地运行模式wordcount实例(附:MapReduce原理简析)

    1.      环境配置

    a)        配置系统环境变量HADOOP_HOME

     

     

     

    b)        hadoop.dll文件放到c:/windows/System32目录下

     

     

    c)        hadoop-2.6.0sharehadoopcommonsources目录下hadoop-common-2.6.0-sources.jar文件中找到orgapachehadoopio ativeioNativeIO.java文件,复制到对应的Eclipseproject NativeIO.java文件还要在原来的包名下

    d)        修改此文件的557行,替换为return true


    e)        在主机中配置虚拟机的IP和用户名



    f)         以管理员身份运行eclipse

     

     

    2.      代码(以wordcount为例)

    a)        MapReduceMapReduce两部分,加上测试,一共三部分


    b)        Map里主要解决文件分割的问题;

    package com.hadoop.hdfs.api.test.mr.wc;

     

    import java.io.IOException;

     

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Mapper;

    /*

     * KEYIN, VALUEIN, 输入的key-value数据类型

     *

     * KEYOUT, VALUEOUT 输出的key-value数据类型

     *

    */

    public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

         

    /*   key:输入的key值,偏移量

          value:输入的value,一行的内容

    */

          @Override

          protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)

                        throws IOException, InterruptedException {

                

                 //获取一行的内容

                 String linestr=value.toString();

     

                 String[] words=linestr.split(" ");//正则表达式怎么实现这些东西?

                 for (String word : words) {

                        //输出写

                        context.write(new Text(word), new LongWritable(1));

                 }

                

          }

    }

     

     

    c)        接收map的结果,然后整合输出

    package com.hadoop.hdfs.api.test.mr.wc;

     

    import java.io.IOException;

    import java.util.Iterator;

     

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Reducer;

     

    /*

     * org.apache.hadoop.mapreduceHadoop2api

    */

    public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

     

          @Override

          protected void reduce(Text key, Iterable<LongWritable> values,

                        Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {

     

                        Iterator<LongWritable>iter=values.iterator();

                        long sum=0;

                        while(iter.hasNext()){

                               sum+=iter.next().get();

                        }

                        context.write(key, new LongWritable(sum));

          }

    }

    d)        运行文件里只需要配置好路径即可

    package com.hadoop.hdfs.api.test.mr.wc;

     

     

     

     

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

     

    public class WCRunner {

     

          public static void main(String[] args) {

                

                 Configuration conf =new Configuration();

    /*          conf.set("hadoop.tmp.dir", "j:/tmp/tmpData");*/

                 try {

                        Job job=Job.getInstance(conf);

                        //指定main方法所在的类

                        job.setJarByClass(WCRunner.class);

                        //指定mapreduce

                        job.setMapperClass(WCMapper.class);

                        job.setReducerClass(WCReducer.class);

                        //指定map的输出keyvalue的数据类型

                        job.setMapOutputKeyClass(Text.class);

                        job.setMapOutputValueClass(LongWritable.class);

                        //指定reduce的输出keyvalue的数据类型

                        job.setOutputKeyClass(Text.class);

                        job.setOutputValueClass(LongWritable.class);

                        //指定输入的文件目录,这里可以是文件,也可以是目录

                        FileInputFormat.setInputPaths(job, new Path(args[0]));

                        //指定输出的文件目录,这里只能是目录,不能是文件

                        FileOutputFormat.setOutputPath(job, new Path(args[1]));

                       

                        //执行job

                        job.waitForCompletion(true);

                       

                 } catch (Exception e) {

                        // TODO Auto-generated catch block

                        e.printStackTrace();

                 }

     

          }

     

    }

     

    注:数据类型

           Value Text类型 Key LongWritable类型

           特别声明:Text的包需要特别注意

           

     

    运行前,传一下路径

    Input是读文件的路径,里面的文件就是我们要读的

    Outputreduce生成文件存放的地方

    特别声明:input路径必须存在,output必须是不存在的

    特特特别声明:用户名一定要注意,不要有空格!!!

     

     

    MapReduce原理

    MapReduce分为两大部分,Map(抓取数据、数据分割)和Reduce(处理数据,数据整合,上传数据)。

     

    从单文件看MapReduce

    1.      HDFS上读取一个文件

    2.      为本地主机分配一个Map任务

    3.      Map作业从输入数据中抽取出键值对

    4.      每一个键值对都作为参数传递给map函数,map函数产生的中间键值对被缓存在内存中。

    5.      上一阶段中解析出来的每一个键值对,调用一次map方法。如果有1000个键值对,就会调用1000map方法。每一次调用map方法会输出零个或者多个键值对。

    6.      为本地主机分配一个Reduce任务

    7.      Reduce任务读取Map任务产生的中间值并排序(因为Map任务产生的键值对可能映射到不同的分区中,当然本地只有一个分区,所以要排序),排序的目的是使相同键的键值对聚集在一起。

    8.      遍历排序后中间键值对,将具有相同键的键值对调用一次reduce方法,对每个不同的键分别调用一次reduce方法。

    9.      reduce函数产生的输出会添加到这个分区的输出文件中。

     

    从集群上看MapReduce

    1.      HDFS上的文件分块,如需要输入的文件为100MB200MB时,因为块大小为128MB,所以共分为三块,块一:100MB;块二:128MB;块三:72MB

    每个块对应一个Map,需要三个Map进程来处理

    2.      为集群上空闲的机器分配Map任务,被分配了Map作业的机器,开始读取对应分片的输入数据

    3.      与单文件过程类似

    4.      Map产生的中间键值对分为N个区保存在本地中,每个区对应一个Reduce任务,将这N个区的位置报告给集群中负责调度的机器,让其将位置信息转发给已分配好Reduce任务的机器。

    5.      Reduce任务的机器从刚获取的地址处,读取中间键值对,然后与单文件类似

    6.      所有执行完毕后,MapReduce输出放在了N个分区的输出文件中(分别对应一个Reduce作业)。用户通常并不需要合并这N个文件,而是将其作为输入交给另一个MapReduce程序处理。整个过程中,输入数据是来自底层分布式文件系统(GFS)的,中间数据是放在本地文件系统的,最终输出数据是写入底层分布式文件系统(GFS)的。

     

     

     

    注:关于MapReduce之间的数据传输过程,MapReduce的核心Shuffle,现在知识有限。只知道它的作用,不知道为什么作用,希望过几天可以整理一下

  • 相关阅读:
    关于路径的小知识点
    转发与重定向
    一种反复的读写文件的方法
    文字排版reportlab
    Qgis中插件的安装位置
    spyder打开文件假死解决
    地图跳跃-超级码力
    尾部的零
    一探torch.nn究竟“What is torch.nn really?”
    KAZE特征和各向异性扩散滤波
  • 原文地址:https://www.cnblogs.com/wxplmm/p/7253934.html
Copyright © 2011-2022 走看看