zoukankan      html  css  js  c++  java
  • python + Streaming框架的MR实践与优化

    Streaming是Hadoop提供的一个可以使用其他编程语言来进行MR编程的API,它使用Unix标准输入输出作为Hadoop和其他编程语言的开发接口,非常轻便。而开发者可以选择自己擅长的编程语言,并且只需要在MR程序中实现计算逻辑后,指定输出即可。

        Python可以通过Streaming非常高效地实现MR编程,执行效率也非常快,且基于Python本身的简洁美,特别适合MR的快速开发。

        另外,对于Python MR编程,Dumbo, Happy 与 mrjob 也是很好的选择,只是则性能上要逊于Streaming。其中,Dumbo为MR应用提供了更加灵活易用的Python API,它支持将mapper.py与reduce.py封装在一起使用,而Happy则为Jython开发者使用Hadoop框架提供了便利,另外,mrjob则允许用户写多步骤的MapReduce的工作流程。

        对于Streaming的实现原理,数据流程,参数设置以及任务执行等方面的介绍,社区有很详细的介绍,本文不再赘述。http://hadoop.apache.org/docs/stable/streaming.html

    (一)Map + Reduce

        Map阶段按流读入数据,进行字段的拆分以及格式化等操作。

        Reduce阶段实现PV, UV的计算

        注意:日志文件以不可见字符chr(05)作为分隔符

    1. Mapper.py  
    #!/usr/bin/env python
    import sys
    
    for line in sys.stdin:
        line = line.strip()
        word = line.split('05')
        print '%s05%s' % (word[9],word[5])   # url + cookie_id 
    2. Reduce.py     #注意:计算UV时, 如果不指定partitioner, 只能设置一个reduce!!!
    #!/usr/bin/env python
    from operator import itemgetter
    import sys
    
    word2count = {}
    cookies = set()
    for line in sys.stdin:
        url,cookie = line.strip().split('05')
        coo = '06'.join([url,str(cookie)])
        try:
            act = word2count.get(url)
            flg = coo in cookies
            if not flg:
                cookies.add(coo) 
            if act is None:
                word2count[url] = [1, 1]
            else:
                uv = not flg and 1 or 0
                word2count[url] = [act[0] + 1, act[1] + uv]
        except ValueError:
            sys.exit(1)
     
    #recordsort = sorted(word2count.items(), key=itemgetter(1,0),reverse=True)
    recordsort = sorted(word2count.items(), key=lambda word2count:(int(word2count[1][1]),word2count[0]), reverse=True)
    
    for real_url, val in recordsort:
        print '%s	%s	%s'% (real_url, val[0], val[1])

    (二)执行Streaming:

    $HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-0.19.1-dc-streaming.jar 
        -input /group/alidw/dhwdata1/alilog/CnLog/20130603/23 
        -output /group/alidw/ali-log/wfs/log 
        -mapper  mapper.py 
        -reducer reduce.py 
        -file /home/dwapp/fusen.wangfs/MR/wfs/mapper.py 
        -file /home/dwapp/fusen.wangfs/MR/wfs/reduce.py 
        -jobconf mapred.reduce.tasks=1 
        -jobconf mapred.job.name="sum_test"

         ----可能会报错:java.io.IOException: Task process exit with nonzero status of 137. !!! 

         ---原因:只有一个Reduce,计算节点资源不足(比如:磁盘配额不够)

     (三)优化
        Streaming保证每个map的输出都是有序的,然而,map与map之间的输出却是局部有序的。而为了计算单页面的PV与UV,就必须设置全局变量,因而强制指定一个reduce进行计算,但这是极其粗鲁的,这显然违背了MR框架的优越性。
        优化一:充分利用机器资源,让多个reduce完成计算。
        优化二:利用python生成器提升运行效率,节省内存消耗。
     1. Mapper.py  
    #!/usr/bin/env python
    """A more advanced Mapper, using Python iterators and generators."""
    import sys
    
    def read_input(file):
        for line in file:
            # split the line into words
            fields = line.split('05')
            yield (fields[9],fields[5])
    
    def main():
        data = read_input(sys.stdin)
        for field in data:
            print '%s|||%s' % (field[0], field[1])
    
    if __name__ == "__main__":
        main()

     2. Reduce.py

    #!/usr/bin/env python
    """A more advanced Reducer, using Python iterators and generators."""
    
    import sys
    
    word2count = {}
    cookies = set()
    
    def read_mapper_output(file):
        for line in file:
            yield line.rstrip().split('|||')  # url + cookie_id 
    
    def main():
        # input comes from STDIN (standard input)
        data = read_mapper_output(sys.stdin)
        for url,cookie in data:
            coo = '|'.join((url,str(cookie)))
            try:
                act = word2count.get(url)
                flg = coo in cookies
                if not flg:
                    cookies.add(coo) 
                if act is None:
                    word2count[url] = [1, 1]
                else:
                    uv = not flg and 1 or 0
                    word2count[url] = [act[0] + 1, act[1] + uv]
            except ValueError:
                pass
        recordsort = sorted(word2count.items(), key=lambda word2count:(int(word2count[1][1]),word2count[0]), reverse=True)
        
        for real_url, val in recordsort:
            print '%s	%s	%s'% (real_url, val[0], val[1])
        
    if __name__ == "__main__":
        main()
    3. 执行Streaming (优化)               ---分组后指定多个Reduce计算
    $HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-0.19.1-dc-streaming.jar
        -Dstream.map.output.field.separator='+++'
        -D map.output.key.field.separator='|||'
        -D num.key.fields.for.partition=1
        -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
        -input /group/alidw/dhwdata1/alilog/CnLog/20130607/23
        -output /group/alibaba-dw-cbu/output/ipage/test/log
        -mapper mapper.py
        -reducer reduce.py
        -file /home/dwapp/fusen.wangfs/MR/wfs/mapper.py
        -file /home/dwapp/fusen.wangfs/MR/wfs/reduce.py
        -jobconf mapred.reduce.tasks=100
        -jobconf mapred.job.name="sum_test"
  • 相关阅读:
    多图详解!10大高性能开发核心技术(转发)
    从 Spring Cloud 看一个微服务框架的「五脏六腑」
    eclipse中的springBoot项目 执行maven build 和maven install 报错
    Mysql怎么删除某表中的一条数据
    eclipse 中需要配置jdk、需要配置jre吗? 以及安装eclipse后需要做的一些配置
    IntelliJ IDEA 2019.2最新版本免费激活码(亲测可用)
    在springBoot项目配置项目的访问路径的时候 server.context-path不起作用的原因
    共享类型的基站概念
    oracle创建索引
    ORACLE中的DBLINK概念及使用DBLINK对远程数据库的连接
  • 原文地址:https://www.cnblogs.com/tychyg/p/5277485.html
Copyright © 2011-2022 走看看