另外,对于Python MR编程,Dumbo, Happy 与 mrjob 也是很好的选择,只是则性能上要逊于Streaming。其中,Dumbo为MR应用提供了更加灵活易用的Python API,它支持将mapper.py与reduce.py封装在一起使用,而Happy则为Jython开发者使用Hadoop框架提供了便利,另外,mrjob则允许用户写多步骤的MapReduce的工作流程。
(一)Map + Reduce
Reduce阶段实现PV, UV的计算
#!/usr/bin/env python import sys for line in sys.stdin: line = line.strip() word = line.split(' 05') print '%s 05%s' % (word[9],word[5]) # url + cookie_id
#!/usr/bin/env python from operator import itemgetter import sys word2count = {} cookies = set() for line in sys.stdin: url,cookie = line.strip().split(' 05') coo = ' 06'.join([url,str(cookie)]) try: act = word2count.get(url) flg = coo in cookies if not flg: cookies.add(coo) if act is None: word2count[url] = [1, 1] else: uv = not flg and 1 or 0 word2count[url] = [act[0] + 1, act[1] + uv] except ValueError: sys.exit(1) #recordsort = sorted(word2count.items(), key=itemgetter(1,0),reverse=True) recordsort = sorted(word2count.items(), key=lambda word2count:(int(word2count[1][1]),word2count[0]), reverse=True) for real_url, val in recordsort: print '%s %s %s'% (real_url, val[0], val[1])
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-0.19.1-dc-streaming.jar -input /group/alidw/dhwdata1/alilog/CnLog/20130603/23 -output /group/alidw/ali-log/wfs/log -mapper -reducer -file /home/dwapp/fusen.wangfs/MR/wfs/ -file /home/dwapp/fusen.wangfs/MR/wfs/ -jobconf mapred.reduce.tasks=1 -jobconf"sum_test"
----可能会报错 Task process exit with nonzero status of 137. !!!
#!/usr/bin/env python """A more advanced Mapper, using Python iterators and generators.""" import sys def read_input(file): for line in file: # split the line into words fields = line.split(' 05') yield (fields[9],fields[5]) def main(): data = read_input(sys.stdin) for field in data: print '%s|||%s' % (field[0], field[1]) if __name__ == "__main__": main()
#!/usr/bin/env python """A more advanced Reducer, using Python iterators and generators.""" import sys word2count = {} cookies = set() def read_mapper_output(file): for line in file: yield line.rstrip().split('|||') # url + cookie_id def main(): # input comes from STDIN (standard input) data = read_mapper_output(sys.stdin) for url,cookie in data: coo = '|'.join((url,str(cookie))) try: act = word2count.get(url) flg = coo in cookies if not flg: cookies.add(coo) if act is None: word2count[url] = [1, 1] else: uv = not flg and 1 or 0 word2count[url] = [act[0] + 1, act[1] + uv] except ValueError: pass recordsort = sorted(word2count.items(), key=lambda word2count:(int(word2count[1][1]),word2count[0]), reverse=True) for real_url, val in recordsort: print '%s %s %s'% (real_url, val[0], val[1]) if __name__ == "__main__": main()