zoukankan      html  css  js  c++  java
  • 实例讲解hadoop中的map/reduce查询(python语言实现)

    条件,假设你已经装好了hadoop集群,配好了hdfs并可以正常运行。

    $hadoop dfs -ls /data/dw/explorer
    Found 1 items
    drwxrwxrwx     - rsync supergroup                    0 2011-11-30 01:06 /data/dw/explorer/20111129


    $ hadoop dfs -ls /data/dw/explorer/20111129
    Found 4 items
    -rw-r--r--     3 rsync supergroup     12294748 2011-11-29 21:10 /data/dw/explorer/20111129/explorer_20111129_19_part-00000.lzo
    -rw-r--r--     3 rsync supergroup             1520 2011-11-29 21:11 /data/dw/explorer/20111129/explorer_20111129_19_part-00000.lzo.index
    -rw-r--r--     3 rsync supergroup     12337366 2011-11-29 22:09 /data/dw/explorer/20111129/explorer_20111129_20_part-00000.lzo
    -rw-r--r--     3 rsync supergroup             1536 2011-11-29 22:10 /data/dw/explorer/20111129/explorer_20111129_20_part-00000.lzo.index

    数据格式如下

    20111129/23:59:54 111.161.25.184 182.132.25.243 <Log_Explorer ProductVer="5.05.1026.1111" UUID="{C9B80A9B-704E-B106-9134-1ED3581D0123}"><UserDoubleClick FileExt="mp3" AssociateKey="Audio.mp3" Count="1"/></Log_Explorer>



    1.map脚本取数据explorer_map.py

    #!/usr/bin/python
    #-*-coding:UTF-8 -*-
    import sys
    import cElementTree

    debug = False#设置lzo文件偏移位
    if debug:
            lzo = 0
    else:
            lzo = 1

    for line in sys.stdin:
            try:
                    flags = line[:-1].split(' ')
    #hadoop查询走标准输入,数据以 分隔,去掉每行中的
                    if len(flags) == 0:
                            break
                    if len(flags) != 11+lzo:
    #hadoop采用lzo则偏移位+1,lzo设置为False则+1
                            continue
                    stat_date=flags[0+lzo]#日期
                    stat_date_bar = stat_date[:4]+"-"+stat_date[4:6]+'-'+stat_date[6:8]#拼成2011-11-29格式
                    version = flags[4+lzo]
                    xmlstr = flags[10+lzo]
                    #xmlstr=line
                    dom = cElementTree.fromstring(xmlstr)
    #xml字段对象,以下均为取值操作
                    uuid = dom.attrib['UUID']
                    node = dom.find('UserDoubleClick')
                    associateKey=node.get('AssociateKey')
                    associateKeys=associateKey.split('.')
                    player = associateKeys[0]
                    fileext=node.get('FileExt')
                    count=node.get('Count')
                    print stat_date_bar+','+version+','+fileext+','+player+','+associateKey+' '+count
    #输出map后的数据,这里map不对数据做任何处理,只做取值,拼接操作
    #将 前的字符串作为key输入reduce, 后的count作为reduce计算用的value
    except Exception,e:
    print e
    #抛出异常        

    2.reduce脚本计算结果并输出explorer_red.py

    #!/usr/bin/python
    #-*-coding:UTF-8 -*-
    import sys
    import cElementTree
    import os
    import string

    res = {}

    for line in sys.stdin:
            try:
                    flags = line[:-1].split(' ')
    #拆分 以获得map传过来的key和value
                    if len(flags) != 2:
    # 切割后,如果数据有问题,元素多于2或者少于2则认为数据不合法,跳出继续下一行
                            continue
                    skey= flags[0]
    #取出第一个元素作为key
                    count=int(flags[1])
    #取出第二个元素作为value
                    if res.has_key(skey) == False:
                            res[skey]=0
                    res[skey] += count
    #计算count总和
            except Exception,e:
                    pass
    #不抛出,继续执行

    for key in res.keys():
            print key+','+'%s' % res[key]
    #格式化输出,以放入临时文件

    3.放入crontab执行的脚本

    #!/bin/sh

    [ $1 ] && day=$1 DATE=`date -d "$1" +%Y%m%d`
    [ $1 ] || day=`date -d "1 day ago" +%Y%m%d`     DATE=`date -d "1 day ago" +%Y%m%d`
    #取昨天日期

    cd /opt/modules/hadoop/hadoop-0.20.203.0/
    #进入hadoop工作目录
    bin/hadoop jar contrib/streaming/hadoop-streaming-0.20.203.0.jar -file /home/rsync/explorer/explorer_map.py -file /home/rsync/explorer/explorer_red.py -mapper /home/rsync/explorer/explorer_map.py -reducer /home/rsync/explorer/explorer_red.py -inputformat com.hadoop.mapred.DeprecatedLzoTextInputFormat -input /data/dw/explorer/$DATE -output /tmp/explorer_$DATE
    #执行map/reduce,并将排序完结果放入hdfs:///tmp/explorer

    bin/hadoop fs -copyToLocal /tmp/explorer_$DATE /tmp
    #将m/r结果从hdfs://tmp/explorer_$DATE 保存到本地/tmp下
    bin/hadoop dfs -rmr /tmp/explorer_$DATE
    #删除hdfs下临时文件夹

    cd
    #返回自身目录
    cd explorer
    #进入explorer文件夹
    ./rm.py $DATE
    执行入库和删除临时文件夹脚本


    4.将/tmp生成的结果入库并删除临时文件夹

    #!/usr/bin/python

    import os
    import sys
    import string

    if len(sys.argv) == 2:
                    date = sys.argv[1:][0] #取脚本参数
                    os.system ("mysql -h192.168.1.229 -ujobs -p223238 -P3306    bf5_data    -e "load data local infile '/tmp/explorer_"+date+"/part-00000' into table explorer FIELDS TERMINATED
    BY '\,' (stat_date,ver,FileExt,player,AssociateKey,count)"")#执行入库sql语句,并用load方式将数据加载到统计表中
                    os.system ("rm -rf /tmp/explorer_"+date)#删除map/reduce过的数据
    else:
                    print "Argv error"

    #因为没有安装MySQLdb包,所以用运行脚本的方式加载数据。

    原始数据和最后完成的输出数据对比,红色为原数据,绿色为输出数据

    20111129/23:59:54 111.161.25.184 182.132.25.243 <Log_Explorer ProductVer="5.05.1026.1111" UUID="{C9B80A9B-704E-B106-9134-1ED3581D0123}"><UserDoubleClick FileExt="mp3" AssociateKey="Audio.mp3" Count="1"/></Log_Explorer>

    -----------------------------------------------------------

    2011-11-29,5.05.1026.1111,mp3,Audio,Audio.mp3,1


    5.调试技巧

    因为这种方式比较抽象,所以你很难得到一个直观的调试过程。建议调试如下

    #将hadoop中的数据文本copy出来一个,lzo需要解压缩,然后将map中的debug模式置为True,也就是不加hadoop中的lzo偏移量。
    #用head输入hadoop里的文件,通过管道操作放入map/reduce中执行,看输出结果

    $head explorer_20111129 | explorer_map.py | explorer_red.py

    一天的数据大概几十个G,以前用awk和perl脚本跑需要至少半小时以上,改用map/reduce方式后,大概20几秒跑完,效率还是提高了很多的。
  • 相关阅读:
    aaronyang的百度地图API之LBS云与.NET开发 Javascript API 2.0【把数据存到LBS云2/2】
    aaronyang的百度地图API之LBS云与.NET开发 Javascript API 2.0【把数据存到LBS云1/2】
    aaronyang的百度地图API之LBS云与.NET开发 Javascript API 2.0【基本地图的操作】
    aaronyang的百度地图API之LBS云 笔记[位置数据 geotable]
    aaronyang的百度地图API之LBS云 笔记[开发准备]
    windows7 sqlserver2012 无法写入受保护的内存 解决办法
    [AaronYang风格]微软Unity2.X系统学习笔记,记录
    Javascript 原生Cookie使用用法
    注册asp.net 4.0 到iis
    茗洋Easy UI 1.3.5 部分问题解决系列专题[自定义alert关闭时间,自动关]
  • 原文地址:https://www.cnblogs.com/java20130722/p/3206898.html
Copyright © 2011-2022 走看看