zoukankan      html  css  js  c++  java
  • Hadoop:使用原生python编写MapReduce

    功能实现

    功能:统计文本文件中所有单词出现的频率功能。

    下面是要统计的文本文件

    【/root/hadooptest/input.txt】

    foo foo quux labs foo bar quux abc bar see you by test welcome test
    abc labs foo me python hadoop ab ac bc bec python

    编写Map代码

    Map代码,它会从标准输入(stdin)读取数据,默认以空格分割单词,然后按行输出单词机器出现频率到标准输出(stdout),不过整个Map处理过程并不会统计每个单词出现的总次数,而是直接输出“word,1”,以便作为Reduce的输入进行统计,要求mapper.py具备执行权限。

    【/root/hadooptest/mapper.py】

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import sys
    
    #输入为标准输入stdin
    for line in sys.stdin:
    #删除开头和结尾的空行
    line = line.strip()
    #以默认空格分隔单词到words列表
    words = line.split()
    for word in words:
    #输出所有单词,格式为“单词,1”以便作为Reduce的输入
    print '%s	%s' % (word,1)0

    编写Reduce代码

    Reduce代码,它会从标准输入(stdin)读取mapper.py的结果,然后统计每个单词出现的总次数并输出到标准输出(stdout),要求reducer.py同样具备可执行 权限。

    【/root/hadooptest/reducer.py】

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    from operator import itemgetter
    import sys
    
    current_word = None
    current_count = 0
    word = None
    
    #获取标准输入,即mapper.py的标准输出
    for line in sys.stdin:
    #删除开头和结尾的空行
    line = line.strip()
    
    #解析mapper.py输出作为程序的输入,以tab作为分隔符
    word,count = line.split('	',1)
    
    #转换count从字符型到整型
    try:
    count = int(count)
    except ValueError:
    #count非数字时,忽略此行
    continue
    
    #要求mapper.py的输出做排序(sort)操作,以便对连续的word做判断
    if current_word == word:
    current_count += count
    else:
    if current_word:
    #输出当前word统计结果到标准输出
    print '%s	%s' % (current_word,current_count)
    current_count = count
    current_word = word
    
    #输出最后一个word统计
    if current_word == word:
    print '%s	%s' % (current_word,current_count)

    测试代码

    在Hadoop平台运行前进行本地测试

    [root@wx ~]# cd /root/hadooptest/
    [root@wx hadooptest]# cat input.txt | ./mapper.py 
    foo 1
    foo 1
    quux 1
    labs 1
    foo 1
    bar 1
    quux 1
    abc 1
    bar 1
    see 1
    you 1
    by 1
    test 1
    welcome 1
    test 1
    abc 1
    labs 1
    foo 1
    me 1
    python 1
    hadoop 1
    ab 1
    ac 1
    bc 1
    bec 1
    python 1
    
    
    [root@wx hadooptest]# cat input.txt | ./mapper.py | sort -k1,1 | ./reducer.py 
    ab 1
    abc 2
    ac 1
    bar 2
    bc 1
    bec 1
    by 1
    foo 4
    hadoop 1
    labs 2
    me 1
    python 2
    quux 2
    see 1
    test 2
    welcome 1
    you 1

    Hadoop平台运行

    在HDFS上创建文本文件存储目录,本示例中为/user/root/word

    /usr/local/hadoop-2.6.4/bin/hadoop fs -mkdir -p /user/root/word

    将输入文件上传到HDFS,本例中是/root/hadooptest/input.txt

    /usr/local/hadoop-2.6.4/bin/hadoop fs -put /root/hadooptest/input.txt /user/root/word

    查看/user/root/word目录下的文件

    /usr/local/hadoop-2.6.4/bin/hadoop fs -ls /user/root/word
    #结果:
    Found 1 items
    -rw-r--r-- 2 root supergroup 118 2016-03-22 13:36 /user/root/word/input.txt

    执行MapReduce任务,输出结果文件制定为/output/word

    /usr/local/hadoop-2.6.4/bin/hadoop jar /usr/local/hadoop-2.6.4/share/hadoop/tools/lib/hadoop-streaming-2.6.4.jar -files 'mapper.py,reducer.py' -input /user/root/word -output /output/word -mapper ./mapper.py -reducer ./reducer.py

    参数说明:

    /usr/local/hadoop-2.6.4/bin/hadoop jar /usr/local/hadoop-2.6.4/share/hadoop/tools/lib/hadoop-streaming-2.6.4.jar 
    -input <输入目录>  # 可以指定多个输入路径,例如:-input '/user/foo/dir1' -input '/user/foo/dir2'
    -inputformat <输入格式 JavaClassName> 
    -output <输出目录> 
    -outputformat <输出格式 JavaClassName> 
    -mapper <mapper executable or JavaClassName> 
    -reducer <reducer executable or JavaClassName> 
    -combiner <combiner executable or JavaClassName> 
    -partitioner <JavaClassName> 
    -cmdenv <name=value>  # 可以传递环境变量,可以当作参数传入到任务中,可以配置多个
    -file <依赖的文件>  # 配置文件,字典等依赖
    -D <name=value>  # 作业的属性配置

    查看生成的分析结果文件清单,其中/output/word/part-00000为分析结果文件

    [root@wx hadooptest]# /usr/local/hadoop-2.6.4/bin/hadoop fs -ls /output/word
    Found 2 items
    -rw-r--r-- 2 root supergroup 0 2016-03-22 13:47 /output/word/_SUCCESS
    -rw-r--r-- 2 root supergroup 110 2016-03-22 13:47 /output/word/part-00000

    查看结果数据

    [root@wx hadooptest]# /usr/local/hadoop-2.6.4/bin/hadoop fs -cat /output/word/part-00000
    ab 1
    abc 2
    ac 1
    bar 2
    bc 1
    bec 1
    by 1
    foo 4
    hadoop 1
    labs 2
    me 1
    python 2
    quux 2
    see 1
    test 2
    welcome 1
    you 1

    参考资料:

    根据刘天斯《Python自动化运维技术与最佳实践》整理

  • 相关阅读:
    python 函数嵌套
    python 函数对象
    python 函数参数
    python 连接MySQL报错及解决方案
    解决 No module named pip
    python 文件处理
    python
    python 元祖
    python 读取域名信息
    ubuntu 配置网卡,DNS, iptables
  • 原文地址:https://www.cnblogs.com/MacoLee/p/5799203.html
Copyright © 2011-2022 走看看