Streaming是Hadoop提供的一个可以使用其他编程语言来进行MR编程的API,它使用Unix标准输入输出作为Hadoop和其他编程语言的开发接口,非常轻便。而开发者可以选择自己擅长的编程语言,并且只需要在MR程序中实现计算逻辑后,指定输出即可。
Python可以通过Streaming非常高效地实现MR编程,执行效率也非常快,且基于Python本身的简洁美,特别适合MR的快速开发。
另外,对于Python MR编程,Dumbo, Happy 与 mrjob 也是很好的选择,只是则性能上要逊于Streaming。其中,Dumbo为MR应用提供了更加灵活易用的Python API,它支持将mapper.py与reduce.py封装在一起使用,而Happy则为Jython开发者使用Hadoop框架提供了便利,另外,mrjob则允许用户写多步骤的MapReduce的工作流程。
对于Streaming的实现原理,数据流程,参数设置以及任务执行等方面的介绍,社区有很详细的介绍,本文不再赘述。http://hadoop.apache.org/docs/stable/streaming.html
(一)Map + Reduce
Map阶段按流读入数据,进行字段的拆分以及格式化等操作。
Reduce阶段实现PV, UV的计算
注意:日志文件以不可见字符chr(05)作为分隔符
#!/usr/bin/env python import sys for line in sys.stdin: line = line.strip() word = line.split(' 05') print '%s 05%s' % (word[9],word[5]) # url + cookie_id
#!/usr/bin/env python from operator import itemgetter import sys word2count = {} cookies = set() for line in sys.stdin: url,cookie = line.strip().split(' 05') coo = ' 06'.join([url,str(cookie)]) try: act = word2count.get(url) flg = coo in cookies if not flg: cookies.add(coo) if act is None: word2count[url] = [1, 1] else: uv = not flg and 1 or 0 word2count[url] = [act[0] + 1, act[1] + uv] except ValueError: sys.exit(1) #recordsort = sorted(word2count.items(), key=itemgetter(1,0),reverse=True) recordsort = sorted(word2count.items(), key=lambda word2count:(int(word2count[1][1]),word2count[0]), reverse=True) for real_url, val in recordsort: print '%s %s %s'% (real_url, val[0], val[1])
(二)执行Streaming:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-0.19.1-dc-streaming.jar -input /group/alidw/dhwdata1/alilog/CnLog/20130603/23 -output /group/alidw/ali-log/wfs/log -mapper mapper.py -reducer reduce.py -file /home/dwapp/fusen.wangfs/MR/wfs/mapper.py -file /home/dwapp/fusen.wangfs/MR/wfs/reduce.py -jobconf mapred.reduce.tasks=1 -jobconf mapred.job.name="sum_test"
----可能会报错:java.io.IOException: Task process exit with nonzero status of 137. !!!
---原因:只有一个Reduce,计算节点资源不足(比如:磁盘配额不够)
#!/usr/bin/env python """A more advanced Mapper, using Python iterators and generators.""" import sys def read_input(file): for line in file: # split the line into words fields = line.split(' 05') yield (fields[9],fields[5]) def main(): data = read_input(sys.stdin) for field in data: print '%s|||%s' % (field[0], field[1]) if __name__ == "__main__": main()
2. Reduce.py
#!/usr/bin/env python """A more advanced Reducer, using Python iterators and generators.""" import sys word2count = {} cookies = set() def read_mapper_output(file): for line in file: yield line.rstrip().split('|||') # url + cookie_id def main(): # input comes from STDIN (standard input) data = read_mapper_output(sys.stdin) for url,cookie in data: coo = '|'.join((url,str(cookie))) try: act = word2count.get(url) flg = coo in cookies if not flg: cookies.add(coo) if act is None: word2count[url] = [1, 1] else: uv = not flg and 1 or 0 word2count[url] = [act[0] + 1, act[1] + uv] except ValueError: pass recordsort = sorted(word2count.items(), key=lambda word2count:(int(word2count[1][1]),word2count[0]), reverse=True) for real_url, val in recordsort: print '%s %s %s'% (real_url, val[0], val[1]) if __name__ == "__main__": main()