zoukankan      html  css  js  c++  java
  • MapReduce编程基础

     MapReduce编程基础

    1. WordCount示例及MapReduce程序框架

    2.  MapReduce程序执行流程

    3.  深入学习MapReduce编程(1)

    4. 参考资料及代码下载 

    <1>. WordCount示例及MapReduce程序框架 

    首先通过一个简单的程序来实际运行一个MapReduce程序,然后通过这个程序我们来哦那个结一下MapReduce编程模型。

    下载源程序:/Files/xuqiang/WordCount.rar,将该程序打包成wordcount.jar下面的命令,随便写一个文本文件,这里是WordCountMrtrial,并上传到hdfs上,这里的路径是/tmp/WordCountMrtrial,运行下面的命令:

     xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ ./bin/hadoop jar wordcount.jar WordCount /tmp/WordCountMrtrial /tmp/result

     如果该任务运行完成之后,将在hdfs的/tmp/result目录下生成类似于这样的结果:

     gentleman 11

    get 12
    give 8
    go 6
    good 9
    government 16

    运行一个程序的基本上就是这样一个过程,我们来看看具体程序:

    main函数中首先生成一个Job对象, Job job = new Job(conf, "word count");然后设置job的MapperClass,ReducerClass,设置输入文件路径 FileInputFormat.addInputPath(job, new Path(otherArgs[0]));设置输出文件路径:FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));等待程序运行完成:System.exit(job.waitForCompletion(true) ? 0 : 1);可以看出main中仅仅是启动了一个job,然后设置该job相关的参数,具体实现MapReduce是mapper类和reducer类。

     TokenizerMapper类中map函数将一行分割成<K2, V2>,然后IntSumReducer的reduce将<K2, list<V2>>转换成最终结果<K3, V3>。

     

    通过这个示例基本上也能总结出简单的MapReduce编程的模型:一个Mapper类,一个Reducer类,一个Driver类。

     

    <2>. MapReduce程序执行流程 

     这里所描述的执行流程更加注重是从程序的角度去理解,更加全面的流程可参考[这里]。

     

    首先用户指定待处理的文件,在WordCount就是文件WordCountMrtrial,这是hadoop根据设定的 InputDataFormat来将输入文件分割成一个record(key/value对),然后将这些record传递给map函数,在 WordCount示例中,对应的record就是<line_number行号, line_content该行内容>;

    然后map函数根据输入的record,形成<K2, V2>,在WordCount示例中形成<K2, V2>就是<single_word, word_count>,例如<"a", 1>;

    如果map过程完成之后,hadoop将这些生成的<K2, V2>按照K2进行分组,形成<K2,list(V2) >,之后传递给reduce函数,在该函数中最终得到程序的输出结果<K3, V3>。

    <3>. 深入学习MapReduce编程(1)

    3.1 hadoop data types

    由于在hadoop需要将key/value对序列化,然后通过网络network发送到集群中的其他机器上,所以说hadoop中的类型需要能够序列化。

    具体而言,自定义的类型,如果一个类class实现了Writable interface的话,那么这个可以作为value类型,如果一个class实现了WritableComparable<T> interface的话,那么这个class可以作为value类型或者是key类型。 

    hadoop本身已经实现了一些预定义的类型predefined classes,并且这些类型实现了WritableComparable<T>接口。

    3.2 Mapper

    如果一个类想要成为一个mapper,那么该类需要实现Mapper接口,同时继承自MapReduceBase。在MapReduceBase类中,两个方法是特别需要注意的:

    void configure( JobConf job):这个方法是在任务被运行之前调用 

    void close():在任务运行完成之后调用

    剩下的工作就是编写map方法,原型如下:

    void map(Object key, Text value, Context context

                        ) throws IOException, InterruptedException;

     这个方法根据<K1, V1>生成<K2, V2>,然后通过context输出。

    同样的在hadoop中预先定义了如下的Mapper:

     

    3.3 Reducer

    如果一个类想要成为Reducer的话,需要首先实现Reducer接口,然后需要继承自MapReduceBase。

    当reducer接收从mapper传递而来的key/value对,然后根据key来排序,分组,最终生成<K2, list<V2>> ,然后reducer根据<K2, list<V2>>生成<K3, V3>.

    同样在hadoop中预定义了一些Reducer:

     

    3.4 Partitioner

     Partitioner的作用主要是将mapper运行的结果“导向directing”到reducer。如果一个类想要成为Partitioner,那么需要实现Partitioner接口,该接口继承自JobConfigurable,定义如下:

    复制代码
    public interface Partitioner<K2, V2> extends JobConfigurable {
      /** 
       * Get the paritition number for a given key (hence record) given the total 
       * number of partitions i.e. number of reduce-tasks for the job.
       *   
       * <p>Typically a hash function on a all or a subset of the key.</p>
       *
       * @param key the key to be paritioned.
       * @param value the entry value.
       * @param numPartitions the total number of partitions.
       * @return the partition number for the <code>key</code>.
       */
      int getPartition(K2 key, V2 value, int numPartitions);
    复制代码

    hadoop将根据方法getPartition的返回值确定将mapper的值发送到那个reducer上。返回值相同的key/value对将被“导向“至同一个reducer。

    3.5 Input Data Format and Output Data Format

    3.5.1 Input Data Format 

    上面我们的假设是MapReduce程序的输入是key/value对,也就是<K1, V1>,但是实际上一般情况下MapReduce程序的输入是big file的形式,那么如何将这个文件转换成<K1, V1>,即file -> <K1, V1>。这就需要使用InputFormat接口了。 

    下面是几个常用InputFormat的实现类:

     

    当然除了使用hadoop预先定义的InputDataFormat之外,还可以自定义,这是需要实现InputFormat接口。该接口仅仅包含两个方法:

     InputSplit[] getSplits(JobConf job, int numSplits) throws  IOException;该接口实现将大文件分割成小块split。
      RecordReader<K, V> getRecordReader(InputSplit split,
                                         JobConf job, 

                                         Reporter reporter) throws IOException; 

    该方法输入分割成的split,然后返回RecordReader,通过RecordReader来遍历该split内的record。 

    3.5.2 Output Data Format

    每个reducer将自己的输出写入到结果文件中,这是使用output data format来配置输出的文件的格式。hadoop预先实现了:

     

    3.6 Streaming in Hadoop

    3.6.1 执行流程

    我们知道在linux中存在所谓的“流”的概念,也就是说我们可以使用下面的命令:

    cat input.txt | RandomSample.py 10 >sampled_output.txt 

    同样在hadoop中我们也可以使用类似的命令,显然这样能够在很大程度上加快程序的开发进程。下面来看看hadoop中流执行的过程:

     

    hadoop streaming从标砖输入STDIN读取数据,默认的情况下使用 来分割每行,如果不存在 的话,那么这时正行的内容将被看作是key,而此时的value内容为空;

    然后调用mapper程序,输出<K2, V2>;

    之后,调用Partitioner来将<K2, V2>输出到对应的reducer上;

    reducer根据输入的<K2, list(V2)> 得到最终结果<K3, V3>并输出到STDOUT上。 

    3.6.2 简单示例程序 

    下面我们假设需要做这样一个工作,输入一个文件,文件中每行是一个数字,然后得到该文件中的数字的最大值(当然这里可以使用streaming中自带的Aggregate)。 首先我们编写一个python文件(如果对python不是很熟悉,看看[这里]):

    3.6.2.1 准备数据

     xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ echo "http://www.cs.brandeis.edu" >url1

     xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ echo "http://www.nytimes.com" >url2

    上传到hdfs上:

     xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -mkdir urls

    xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -put url1 urls/
    xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -put url2 urls/

    3.6.2.2 编写mapper multifetch.py

    复制代码
    #!/usr/bin/env python

    import sys, urllib, re

    title_re = re.compile("<title>(.*?)</title>",
            re.MULTILINE | re.DOTALL | re.IGNORECASE)

    for line in sys.stdin:
        # We assume that we are fed a series of URLs, one per line
        url = line.strip()
        # Fetch the content and output the title (pairs are tab-delimited)
        match = title_re.search(urllib.urlopen(url).read())
        if match:
            print url, " ", match.group(1).strip()
    复制代码

    该文件的主要作用是给定一个url,然后输出该url代表的html页面的title部分。

    在本地测试一下该程序:

    xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ echo "http://www.cs.brandeis.edu" >urls
    xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ echo "http://www.nytimes.com" >>urls

    xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ sudo chmod u+x ./multifetch.py 

     xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ cat urls | ./multifetch.py 将输出:

     http://www.cs.brandeis.edu Computer Science Department | Brandeis University

    http://www.nytimes.com The New York Times - Breaking News, World News &amp; Multimedia

    3.6.2.3 编写reducer reducer.py

    编写reducer.py文件 

    复制代码
     #!/usr/bin/env python
    from operator import itemgetter
    import sys

    for line in sys.stdin:
        line = line.strip()
        print line
    复制代码

    xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ chmod u+x ./reducer.py  

    现在我们的mapper和reducer已经准备好了,那么首先在本地上运行测试一下程序的功能,下面的命令模拟在hadoop上运行的过程:

    首先mapper从stdin读取数据,这里是一行;

    然后读取该行的内容作为一个url,然后得到该url代表的html的title的内容,输出<url, url-title-content>;

    调用sort命令将mapper输出排序;

    将排序完成的结果交给reducer,这里的reducer仅仅是将结果输出。 

    xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ cat urls | ./multifetch.py | sort | ./reducer.py 
    http://www.cs.brandeis.edu     Computer Science Department | Brandeis University
    http://www.nytimes.com     The New York Times - Breaking News, World News &amp; Multimedia  

    显然程序能够正确 

    3.6.2.4 在hadoop streaming上运行

    xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop jar ./mapred/contrib/streaming/hadoop-0.21.0-streaming.jar 
    > -mapper /home/xuqiang/hadoop/src/hadoop-0.21.0/multifetch.py 
    > -reducer /home/xuqiang/hadoop/src/hadoop-0.21.0/reducer.py 
    > -input urls/* 
    -output titles 

     程序运行完成之后,查看运行结果:

     xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -cat titles/part-00000

    <4>. 参考资料及代码下载

    http://pages.cs.brandeis.edu/~cs147a/lab/hadoop-example/ 

    http://hadoop.apache.org/mapreduce/docs/r0.21.0/streaming.html#Hadoop+Streaming 

    <Hadoop In Action> 

    出处:http://www.cnblogs.com/xuqiang/archive/2011/06/05/2071935.html

  • 相关阅读:
    Java 并发专题 :FutureTask 实现预加载数据 在线看电子书、浏览器浏览网页等
    Excel里的单元格提行
    Ubuntu各版本的历史发行界面
    CentOS常用命令、快照、克隆大揭秘
    虚拟网络编辑器的知识和出现的一些问题(没有VMnet0或VMnet8)
    CentOS图形界面下如何安装Eclipse和使用maven
    Hadoop MapReduce编程 API入门系列之mr编程快捷键活用技巧详解(四)
    Hadoop MapReduce概念学习系列之mr程序详谈(二十三)
    Hadoop MapReduce概念学习系列之mr的Shuffle(二十二)
    Hadoop MapReduce概念学习系列之JobTracker、ResourceManager、Task Tracker、NodeManager(二十一)
  • 原文地址:https://www.cnblogs.com/GarfieldEr007/p/4926282.html
Copyright © 2011-2022 走看看