Streaming是Hadoop提供的一个可以使用其他编程语言来进行MR编程的API,它使用Unix标准输入输出作为Hadoop和其他编程语言的开发接口,非常轻便。而开发者可以选择自己擅长的编程语言,并且只需要在MR程序中实现计算逻辑后,指定输出即可。
Python可以通过Streaming非常高效地实现MR编程,执行效率也非常快,且基于Python本身的简洁美,特别适合MR的快速开发。
另外,对于Python MR编程,Dumbo, Happy 与 mrjob 也是很好的选择,只是则性能上要逊于Streaming。其中,Dumbo为MR应用提供了更加灵活易用的Python API,它支持将mapper.py与reduce.py封装在一起使用,而Happy则为Jython开发者使用Hadoop框架提供了便利,另外,mrjob则允许用户写多步骤的MapReduce的工作流程。
对于Streaming的实现原理,数据流程,参数设置以及任务执行等方面的介绍,社区有很详细的介绍,本文不再赘述。http://hadoop.apache.org/docs/stable/streaming.html
(一)Map + Reduce
Map阶段按流读入数据,进行字段的拆分以及格式化等操作。
Reduce阶段实现PV, UV的计算
注意:日志文件以不可见字符chr(05)作为分隔符
#!/usr/bin/env python
import sys
for line in sys.stdin:
line = line.strip()
word = line.split(' 05')
print '%s 05%s' % (word[9],word[5]) # url + cookie_id
#!/usr/bin/env python
from operator import itemgetter
import sys
word2count = {}
cookies = set()
for line in sys.stdin:
url,cookie = line.strip().split(' 05')
coo = ' 06'.join([url,str(cookie)])
try:
act = word2count.get(url)
flg = coo in cookies
if not flg:
cookies.add(coo)
if act is None:
word2count[url] = [1, 1]
else:
uv = not flg and 1 or 0
word2count[url] = [act[0] + 1, act[1] + uv]
except ValueError:
sys.exit(1)
#recordsort = sorted(word2count.items(), key=itemgetter(1,0),reverse=True)
recordsort = sorted(word2count.items(), key=lambda word2count:(int(word2count[1][1]),word2count[0]), reverse=True)
for real_url, val in recordsort:
print '%s %s %s'% (real_url, val[0], val[1])
(二)执行Streaming:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-0.19.1-dc-streaming.jar
-input /group/alidw/dhwdata1/alilog/CnLog/20130603/23
-output /group/alidw/ali-log/wfs/log
-mapper mapper.py
-reducer reduce.py
-file /home/dwapp/fusen.wangfs/MR/wfs/mapper.py
-file /home/dwapp/fusen.wangfs/MR/wfs/reduce.py
-jobconf mapred.reduce.tasks=1
-jobconf mapred.job.name="sum_test"
----可能会报错:java.io.IOException: Task process exit with nonzero status of 137. !!!
---原因:只有一个Reduce,计算节点资源不足(比如:磁盘配额不够)
#!/usr/bin/env python
"""A more advanced Mapper, using Python iterators and generators."""
import sys
def read_input(file):
for line in file:
# split the line into words
fields = line.split(' 05')
yield (fields[9],fields[5])
def main():
data = read_input(sys.stdin)
for field in data:
print '%s|||%s' % (field[0], field[1])
if __name__ == "__main__":
main()
2. Reduce.py
#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""
import sys
word2count = {}
cookies = set()
def read_mapper_output(file):
for line in file:
yield line.rstrip().split('|||') # url + cookie_id
def main():
# input comes from STDIN (standard input)
data = read_mapper_output(sys.stdin)
for url,cookie in data:
coo = '|'.join((url,str(cookie)))
try:
act = word2count.get(url)
flg = coo in cookies
if not flg:
cookies.add(coo)
if act is None:
word2count[url] = [1, 1]
else:
uv = not flg and 1 or 0
word2count[url] = [act[0] + 1, act[1] + uv]
except ValueError:
pass
recordsort = sorted(word2count.items(), key=lambda word2count:(int(word2count[1][1]),word2count[0]), reverse=True)
for real_url, val in recordsort:
print '%s %s %s'% (real_url, val[0], val[1])
if __name__ == "__main__":
main()