zoukankan      html  css  js  c++  java
  • [Hadoop]-从数据去重认识MapReduce

      这学期刚好开了一门大数据的课,就是完完全全简简单单的介绍的那种,然后就接触到这里面最被人熟知的Hadoop了。看了官网的教程【吐槽一下,果然英语还是很重要!】,嗯啊,一知半解地搭建了本地和伪分布式的,然后是在没弄懂,求助了Google,搞来了一台机子,嗯,搭了个分布式的。其实是作业要求啦,觉得自己平时用单机的完全够了啦~

      然后被要求去做个WordCount和数据去重的小例子,嗯啊,我就抱着半桶水的Java知识就出发走向“大数据“【其实很小】了。

      立马求助官网【官网就是好,虽然看的慢,英语技术两不误!】,看了给出来的WordCount的例子,自己就写了一下数据去重的小例子,还好成功了,顺便对MapReduce了解了更多。下面说一下自己的认识和实现的思路。

      首先整个的流程大致是这样的

      1.输入数据InputData在被Map()处理之前会先由InputFormat调用getRecordReader()生成RecordReader,RecordReader再调用creatKey()和creatValue()生成可供Map使用的<key,value>对。其中有很多格式可继承于InputFormat,如我们最常用TextInputFor就是继承于FileInputFormat,将每一行数据都生成一个记录。

      2.到Map呢就是发挥数据价值的时候了。想想这些数据都能拿来干嘛,想干嘛,再coding你想要做的一切一切吧。

      3.Map输出的<key,value>对在被送到Reduce之前呢,会被先送到Shuffle处理一下成为<key,value-list>的样子,Reduce最喜欢这样的了。怎么变成这样呢?嗯,就是将相同的Key数据合并在一起了,还可以指定Job.setCombinerClass(class)来指定组合的方式。还有乱糟糟的Reduce也是不喜欢的,所以在它们组合之后还要再sort一下才行,如果遇到等值的数据呢,你又想自己来定怎么排序,那就指定Job.setGroupingComparaterClass(class)来‘二次排序’吧。

      4.又是一个发挥想象力的过程了。也是要好好想想Reduce能帮你干什么呢,如果觉得没必要的话,我们也可以不指定Reduce的,让Map处理后的数据就直接输出好了。在Map和Reduce阶段都可以借助Counter来获得一些统计信息哦。

      5.快到尾声了,记得每一种InputFormat都有一种OutputFormat和它对应的,最常用的还是Text类型的。

      6.上面我们提到了Job这个东西,其实一个Job可以理解分为Map和Reduce两个过程。所以我们既可以定义Map和Reduce,也是给Job设定各种各样的配置。最简单的设置会在后面程序的注释里给出。

      数据去重实现的思路:

      根据上面的流程分析,既然数据是一行一行split之后再传进去Map的,而Map的输出结果是会经过Shuffle合并相同key之后再给Reduce的,那我们将Map输入的value变成Map输出的key就ok了,这里就不用管输出的value-list是什么东西了,重复的就combine了,达到需要。

      这是代码的具体实现:

      

    /*
        运行环境
        CentOS7
        OpenJDK-1.7.0.91
        Hadoop2.7.1
        !单机节点测试!
    */
    
    //导入所需的包
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class Single
    {
    
        //map将输入中的value复制到输出数据的key上,并直接输出
        /*
            四个参数的意思分别是:
            Object:输入到Map中的key的类型
            Text:输入到Map中的value的类型
            Text:输出到Reduce中的key的类型
            Text:输出到Reduce中的value的类型
        */
        public static class Map extends Mapper<Object, Text, Text, Text>
        {
            //从输入中得到的每行的数据的类型
            private static Text line = new Text();
    
            //实现map函数
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException
            {
                //获取并输出每一次的处理过程
                line = value;
                System.out.println("The process of the Map:" + key);
                context.write(line, new Text(""));
            }
        }
    
        //reduce将输入中的key复制到输出数据的key上,并直接输出
        public static class Reduce extends Reducer<Text, Text, Text, Text>
        {
    
            //实现reduce函数
            public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException
            {
                //获取并输出每一次的处理过程
                System.out.println("The process of the Reduce:" + key);
                context.write(key, new Text(""));
    
            }
    
        }
    
        public static void main(String[] args) throws Exception
        {
    
            //设置配置类
            Configuration conf = new Configuration();
    
            //是从命令行里获取输入数据和输出数据的路径,所以这里要获取和判断一下
            String[] pathArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    
            if(pathArgs.length != 2)
            {
                System.err.println("Please set the path of <InputData> & <OutputData> in the command!");
                System.exit(2);
            }
    
            //Job job = new Job(conf, "Date-Single");
            //照着葫芦画不出来,就Google一下解决方法
            Job job = Job.getInstance();
            job.setJobName("single");
            job.setJarByClass(Single.class);
            //设置Map、Combine和Reduce处理类
            job.setMapperClass(Map.class);
            job.setCombinerClass(Reduce.class);
            job.setReducerClass(Reduce.class);
            //设置输出类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            //设置输入和输出目录
            FileInputFormat.addInputPath(job, new Path(pathArgs[0]));
            FileOutputFormat.setOutputPath(job, new Path(pathArgs[1]));
            //这里是根据是否等待job完成之后再返回结果并退出程序
            System.exit(job.waitForCompletion(true) ? 0 : 1);
    
        }
    }

      后记:

      在这个学习过程中,很多东西都是”不求甚解“,对于它们只有一些很浅显的理解,如有错误之处,劳烦告知,谢谢。

      本文由AnnsShadoW发表于:http://www.cnblogs.com/annsshadow/p/5006317.html 

  • 相关阅读:
    js 仿 asp中的 asc 和 chr 函数的代码
    escape,encodeURI,encodeURIComponent
    从项目从SVN上check下来,用idea打开后,idea没有SVN的工具栏解决方法
    idea中导入SVN的项目时,连接失败,报“Cannot run program "svn"
    spring基础----事件(Applicaition Event)
    idea在导入项目时遇到的问题
    Spring基础---bean的初始化和销毁
    spring基础----EL和资源调用
    spring基础----Bean的Scope
    面试题-------------js三种信息框
  • 原文地址:https://www.cnblogs.com/annsshadow/p/5006317.html
Copyright © 2011-2022 走看看