zoukankan      html  css  js  c++  java
  • 使用Python实现Hadoop MapReduce程序

    转自:使用Python实现Hadoop MapReduce程序

    英文原文:Writing an Hadoop MapReduce Program in Python

    根据上面两篇文章,下面是我在自己的ubuntu上的运行过程。文字基本采用博文使用Python实现Hadoop MapReduce程序,  打字很浪费时间滴。 

    在这个实例中,我将会向大家介绍如何使用Python 为 Hadoop编写一个简单的MapReduce程序。

    尽管Hadoop 框架是使用Java编写的但是我们仍然需要使用像C++、Python等语言来实现 Hadoop程序。尽管Hadoop官方网站给的示例程序是使用Jython编写并打包成Jar文件,这样显然造成了不便,其实,不一定非要这样来实现,我们可以使用Python与Hadoop 关联进行编程,看看位于/src/examples/python/WordCount.py  的例子,你将了解到我在说什么。

    我们想要做什么?

    我们将编写一个简单的 MapReduce 程序,使用的是C-Python,而不是Jython编写后打包成jar包的程序。
    我们的这个例子将模仿 WordCount 并使用Python来实现,例子通过读取文本文件来统计出单词的出现次数。结果也以文本形式输出,每一行包含一个单词和单词出现的次数,两者中间使用制表符来想间隔。

    先决条件

    编写这个程序之前,你学要架设好Hadoop 集群,这样才能不会在后期工作抓瞎。如果你没有架设好,那么在后面有个简明教程来教你在Ubuntu Linux 上搭建(同样适用于其他发行版linux、unix)

    如何在Ubuntu Linux 上搭建hadoop的单节点模式和伪分布模式,请参阅博文Ubuntu上搭建Hadoop环境(单机模式+伪分布模式)

    Python的MapReduce代码

    使用Python编写MapReduce代码的技巧就在于我们使用了 HadoopStreaming 来帮助我们在Map 和 Reduce间传递数据通过STDIN (标准输入)和STDOUT (标准输出).我们仅仅使用Python的sys.stdin来输入数据,使用sys.stdout输出数据,这样做是因为HadoopStreaming会帮我们办好其他事。这是真的,别不相信!
    Map: mapper.py

    将下列的代码保存在/usr/local/hadoop/mapper.py中,他将从STDIN读取数据并将单词成行分隔开,生成一个列表映射单词与发生次数的关系:
    注意:要确保这个脚本有足够权限(chmod +x mapper.py)。

    [python] view plaincopy
     
    1. #!/usr/bin/env python  
    2.   
    3. import sys  
    4.   
    5. # input comes from STDIN (standard input)  
    6. for line in sys.stdin:  
    7.     # remove leading and trailing whitespace  
    8.     line = line.strip()  
    9.     # split the line into words  
    10.     words = line.split()  
    11.     # increase counters  
    12.     for word in words:  
    13.         # write the results to STDOUT (standard output);  
    14.         # what we output here will be the input for the  
    15.         # Reduce step, i.e. the input for reducer.py  
    16.         #  
    17.         # tab-delimited; the trivial word count is 1  
    18.         print '%s %s' % (word, 1)  

    在这个脚本中,并不计算出单词出现的总数,它将输出 "<word> 1" 迅速地,尽管<word>可能会在输入中出现多次,计算是留给后来的Reduce步骤(或叫做程序)来实现。当然你可以改变下编码风格,完全尊重你的习惯。Reduce: reducer.py


    将代码存储在/usr/local/hadoop/reducer.py 中,这个脚本的作用是从mapper.py 的STDIN中读取结果,然后计算每个单词出现次数的总和,并输出结果到STDOUT。

    同样,要注意脚本权限:chmod +x reducer.py

    [python] view plaincopy
     
    1. #!/usr/bin/env python  
    2.   
    3. from operator import itemgetter  
    4. import sys  
    5.   
    6. current_word = None  
    7. current_count = 0  
    8. word = None  
    9.   
    10. # input comes from STDIN  
    11. for line in sys.stdin:  
    12.     # remove leading and trailing whitespace  
    13.     line = line.strip()  
    14.   
    15.     # parse the input we got from mapper.py  
    16.     word, count = line.split(' '1)  
    17.   
    18.     # convert count (currently a string) to int  
    19.     try:  
    20.         count = int(count)  
    21.     except ValueError:  
    22.         # count was not a number, so silently  
    23.         # ignore/discard this line  
    24.         continue  
    25.   
    26.     # this IF-switch only works because Hadoop sorts map output  
    27.     # by key (here: word) before it is passed to the reducer  
    28.     if current_word == word:  
    29.         current_count += count  
    30.     else:  
    31.         if current_word:  
    32.             # write result to STDOUT  
    33.             print '%s %s' % (current_word, current_count)  
    34.         current_count = count  
    35.         current_word = word  
    36.   
    37. # do not forget to output the last word if needed!  
    38. if current_word == word:  
    39.     print '%s %s' % (current_word, current_count)  

    测试你的代码(cat data | map | sort | reduce)

    我建议你在运行MapReduce job测试前尝试手工测试你的mapper.py 和 reducer.py脚本,以免得不到任何返回结果

    这里有一些建议,关于如何测试你的Map和Reduce的功能:

    [plain] view plaincopy
     
    1. hadoop@derekUbun:/usr/local/hadoop$ echo "foo foo quux labs foo bar quux" | ./mapper.py  
    2. foo      1  
    3. foo      1  
    4. quux     1  
    5. labs     1  
    6. foo      1  
    7. bar      1  
    8. quux     1  
    9. hadoop@derekUbun:/usr/local/hadoop$ echo "foo foo quux labs foo bar quux" |./mapper.py | sort |./reducer.py  
    10. bar     1  
    11. foo     3  
    12. labs    1  
    13. quux    2  

    # using one of the ebooks as example input
    # (see below on where to get the ebooks)

    [plain] view plaincopy
     
    1. hadoop@derekUbun:/usr/local/hadoop$ cat book/book.txt |./mapper.pysubscribe      1  
    2. to   1  
    3. our      1  
    4. email    1  
    5. newsletter   1  
    6. to   1  
    7. hear     1  
    8. about    1  
    9. new      1  
    10. eBooks.      1  

    在Hadoop平台上运行Python脚本


    为了这个例子,我们将需要一本电子书,把它放在/usr/local/hadpoop/book/book.txt之下
     

    [plain] view plaincopy
     
    1. hadoop@derekUbun:/usr/local/hadoop$ ls -l book  
    2. 总用量 636  
    3. -rw-rw-r-- 1 derek derek 649669  3月 12 12:22 book.txt  


    复制本地数据到HDFS

    在我们运行MapReduce job 前,我们需要将本地的文件复制到HDFS中:

    [plain] view plaincopy
     
    1. hadoop@derekUbun:/usr/local/hadoop$ hadoop dfs -copyFromLocal /usr/local/hadoop/book book  
    2. hadoop@derekUbun:/usr/local/hadoop$ hadoop dfs -ls  
    3. Found 3 items  
    4. drwxr-xr-x   - hadoop supergroup          0 2013-03-12 15:56 /user/hadoop/book  

    执行 MapReduce job现在,一切准备就绪,我们将在运行Python MapReduce job 在Hadoop集群上。像我上面所说的,我们使用的是HadoopStreaming 帮助我们传递数据在Map和Reduce间并通过STDIN和STDOUT,进行标准化输入输出。

    [plain] view plaincopy
     
    1. hadoop@derekUbun:/usr/local/hadoop$hadoop jar contrib/streaming/hadoop-streaming-1.1.2.jar   
    2. -mapper /usr/local/hadoop/mapper.py   
    3. -reducer /usr/local/hadoop/reducer.py   
    4. -input book/*   
    5. -output book-output  

    在运行中,如果你想更改Hadoop的一些设置,如增加Reduce任务的数量,你可以使用“-jobconf”选项:

    [plain] view plaincopy
     
    1. hadoop@derekUbun:/usr/local/hadoop$hadoop jar contrib/streaming/hadoop-streaming-1.1.2.jar   
    2. -jobconf mapred.reduce.tasks=4  
    3.   
    4. -mapper /usr/local/hadoop/mapper.py   
    5. -reducer /usr/local/hadoop/reducer.py   
    6. -input book/*   
    7. -output book-output   

    如果上面两个运行出错,请参考下面一段代码。注意,重新运行,需要删除dfs中的output文件

    [plain] view plaincopy
     
    1. bin/hadoop jar contrib/streaming/hadoop-streaming-1.1.2.jar    
    2. -mapper task1/mapper.py    
    3. -file task1/mapper.py    
    4. -reducer task1/reducer.py    
    5. -file task1/reducer.py    
    6. -input url   
    7. -output url-output    
    8. -jobconf mapred.reduce.tasks=3   


    一个重要的备忘是关于Hadoop does not honor mapred.map.tasks 这个任务将会读取HDFS目录下的book并处理他们,将结果存储在独立的结果文件中,并存储在HDFS目录下的book-output目录。之前执行的结果如下:

    [plain] view plaincopy
     
    1. hadoop@derekUbun:/usr/local/hadoop$ hadoop jar contrib/streaming/hadoop-streaming-1.1.2.jar -jobconf mapred.reduce.tasks=4 -mapper /usr/local/hadoop/mapper.py -reducer /usr/local/hadoop/reducer.py -input book/* -output book-output  
    2. 13/03/12 16:01:05 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.  
    3. packageJobJar: [/usr/local/hadoop/tmp/hadoop-unjar4835873410426602498/] [] /tmp/streamjob5047485520312501206.jar tmpDir=null  
    4. 13/03/12 16:01:06 INFO util.NativeCodeLoader: Loaded the native-hadoop library  
    5. 13/03/12 16:01:06 WARN snappy.LoadSnappy: Snappy native library not loaded  
    6. 13/03/12 16:01:06 INFO mapred.FileInputFormat: Total input paths to process : 1  
    7. 13/03/12 16:01:06 INFO streaming.StreamJob: getLocalDirs(): [/usr/local/hadoop/tmp/mapred/local]  
    8. 13/03/12 16:01:06 INFO streaming.StreamJob: Running job: job_201303121448_0010  
    9. 13/03/12 16:01:06 INFO streaming.StreamJob: To kill this job, run:  
    10. 13/03/12 16:01:06 INFO streaming.StreamJob: /usr/local/hadoop/libexec/../bin/hadoop job  -Dmapred.job.tracker=localhost:9001 -kill job_201303121448_0010  
    11. 13/03/12 16:01:06 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201303121448_0010  
    12. 13/03/12 16:01:07 INFO streaming.StreamJob:  map 0%  reduce 0%  
    13. 13/03/12 16:01:10 INFO streaming.StreamJob:  map 100%  reduce 0%  
    14. 13/03/12 16:01:17 INFO streaming.StreamJob:  map 100%  reduce 8%  
    15. 13/03/12 16:01:18 INFO streaming.StreamJob:  map 100%  reduce 33%  
    16. 13/03/12 16:01:19 INFO streaming.StreamJob:  map 100%  reduce 50%  
    17. 13/03/12 16:01:26 INFO streaming.StreamJob:  map 100%  reduce 67%  
    18. 13/03/12 16:01:27 INFO streaming.StreamJob:  map 100%  reduce 83%  
    19. 13/03/12 16:01:28 INFO streaming.StreamJob:  map 100%  reduce 100%  
    20. 13/03/12 16:01:29 INFO streaming.StreamJob: Job complete: job_201303121448_0010  
    21. 13/03/12 16:01:29 INFO streaming.StreamJob: Output: book-output  
    22. hadoop@derekUbun:/usr/local/hadoop$  

    如你所见到的上面的输出结果,Hadoop 同时还提供了一个基本的WEB接口显示统计结果和信息。
    当Hadoop集群在执行时,你可以使用浏览器访问 http://localhost:50030/ :


    检查结果是否输出并存储在HDFS目录下的book-output中:

    [plain] view plaincopy
     
    1. hadoop@derekUbun:/usr/local/hadoop$ hadoop dfs -ls book-output  
    2. Found 6 items  
    3. -rw-r--r--   2 hadoop supergroup          0 2013-03-12 16:01 /user/hadoop/book-output/_SUCCESS  
    4. drwxr-xr-x   - hadoop supergroup          0 2013-03-12 16:01 /user/hadoop/book-output/_logs  
    5. -rw-r--r--   2 hadoop supergroup         33 2013-03-12 16:01 /user/hadoop/book-output/part-00000  
    6. -rw-r--r--   2 hadoop supergroup         60 2013-03-12 16:01 /user/hadoop/book-output/part-00001  
    7. -rw-r--r--   2 hadoop supergroup         54 2013-03-12 16:01 /user/hadoop/book-output/part-00002  
    8. -rw-r--r--   2 hadoop supergroup         47 2013-03-12 16:01 /user/hadoop/book-output/part-00003  
    9. hadoop@derekUbun:/usr/local/hadoop$  


    可以使用dfs -cat 命令检查文件目录

    [plain] view plaincopy
     
    1. hadoop@derekUbun:/usr/local/hadoop$ hadoop dfs -cat book-output/part-00000  
    2. about   1  
    3. eBooks.     1  
    4. the     1  
    5. to  2  
    6. hadoop@derekUbun:/usr/local/hadoop$   

    下面是原英文作者mapper.py和reducer.py的两个修改版本:

    mapper.py

    [python] view plaincopy
     
    1. #!/usr/bin/env python  
    2. """A more advanced Mapper, using Python iterators and generators."""  
    3.   
    4. import sys  
    5.   
    6. def read_input(file):  
    7.     for line in file:  
    8.         # split the line into words  
    9.         yield line.split()  
    10.   
    11. def main(separator=' '):  
    12.     # input comes from STDIN (standard input)  
    13.     data = read_input(sys.stdin)  
    14.     for words in data:  
    15.         # write the results to STDOUT (standard output);  
    16.         # what we output here will be the input for the  
    17.         # Reduce step, i.e. the input for reducer.py  
    18.         #  
    19.         # tab-delimited; the trivial word count is 1  
    20.         for word in words:  
    21.             print '%s%s%d' % (word, separator, 1)  
    22.   
    23. if __name__ == "__main__":  
    24.     main()  


    reducer.py

    [python] view plaincopy
     
      1. #!/usr/bin/env python  
      2. """A more advanced Reducer, using Python iterators and generators."""  
      3.   
      4. from itertools import groupby  
      5. from operator import itemgetter  
      6. import sys  
      7.   
      8. def read_mapper_output(file, separator=' '):  
      9.     for line in file:  
      10.         yield line.rstrip().split(separator, 1)  
      11.   
      12. def main(separator=' '):  
      13.     # input comes from STDIN (standard input)  
      14.     data = read_mapper_output(sys.stdin, separator=separator)  
      15.     # groupby groups multiple word-count pairs by word,  
      16.     # and creates an iterator that returns consecutive keys and their group:  
      17.     #   current_word - string containing a word (the key)  
      18.     #   group - iterator yielding all ["<current_word>", "<count>"] items  
      19.     for current_word, group in groupby(data, itemgetter(0)):  
      20.         try:  
      21.             total_count = sum(int(count) for current_word, count in group)  
      22.             print "%s%s%d" % (current_word, separator, total_count)  
      23.         except ValueError:  
      24.             # count was not a number, so silently discard this item  
      25.             pass  
      26.   
      27. if __name__ == "__main__":  
      28.     main()  
  • 相关阅读:
    Thinking in Java Reading Note(9.接口)
    Thinking in java Reading Note(8.多态)
    Thinking in Java Reading Note(7.复用类)
    SQL必知必会
    Thinking in Java Reading Note(5.初始化与清理)
    Thinking in Java Reading Note(2.一切都是对象)
    鸟哥的Linux私房菜笔记(1.基础)
    Thinking in Java Reading Note(1.对象导论)
    CoreJava2 Reading Note(2:I/O)
    CoreJava2 Reading Note(1:Stream)
  • 原文地址:https://www.cnblogs.com/DjangoBlog/p/3789593.html
Copyright © 2011-2022 走看看