什么是Hadoop Streaming
Hadoop提供的一个编程工具,允许用户使用任何可执行文件或脚本作为mapper和Reducer
一个例子(shell简洁版本)
$HADOOP_HOME/bin/hadoop jar
$HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar
-input myInputDirs
-output myOutputDir
-mapper cat
-reducer wc
解析:
首先找到Hadoop Streaming所在的包
然后定义输入输出路径
然后定义mapper和reducer
这里是用shell中的cat作为mapper,wc作为reducer
同一个例子(shell详细版),每行可能有多个单词
mapper.sh
#! /bin/bash
while read LINE; do
for word in $LINE
do
echo "$word 1"
done
done
reducer.sh
#! /bin/bash
count=0
started=0
word=""
while read LINE;do
newword=`echo $LINE | cut -d ' ' -f 1`
if [ "$word" != "$newword" ];then
[ $started -ne 0 ] && echo "$word $count"
word=$newword
count=1
started=1
else
count=$(( $count + 1 ))
fi
done
echo "$word $count"
同一个例子(python版)
map
#!/usr/bin/env python
import sys
# maps words to their counts
word2count = {}
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words while removing any empty strings
words = filter(lambda word: word, line.split())
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
print '%s %s' % (word, 1)
reducer
#!/usr/bin/env python
from operator import itemgetter
import sys
# maps words to their counts
word2count = {}
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split()
# convert count (currently a string) to int
try:
count = int(count)
word2count[word] = word2count.get(word, 0) + count
except ValueError:
# count was not a number, so silently
# ignore/discard this line
pass
# sort the words lexigraphically;
#
# this step is NOT required, we just do it so that our
# final output will look more like the official Hadoop
# word count examples
sorted_word2count = sorted(word2count.items(), key=itemgetter(0))
# write the results to STDOUT (standard output)
for word, count in sorted_word2count:
print '%s %s'% (word, count)
测试:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar
-input myInputDirs
-output myOutputDir
-mapper Mapper.py
-reducer Reducerr.py
-file Mapper.py
-file Reducer.py
或者本地测试
cat input.txt | python Mapper.py | sort | python Reducer.py
书中的例子(ruby版本)
Ruby版本
map函数:
#! /usr/bin/env ruby
STDIN.each_line do |line|
var = line
year, temp, q = val[15,4], val[87,5], val[92,1]
puts "#{year} #{temp}" if (temp != "+9999" && q =~ /[01459]/)
end
reduce函数:
#! /usr/bin/env ruby
last_key, max_val = nil, 0
STDIN.each_line do |line|
key, val = line.split(" ")
if last_key && last_key != key
puts "#{last_key} #{max_val}"
last_key, max_val = key, val.to_i
else
last_key, max_val = key, [max_val, val.to_i].max
end
end
puts "#{last_key} #{max_val}" if last_key
reduce的输入是map的输出, 并且已经被Hadoop按照key排过序了.
所以, 如果 last_key 不为空, 可以代码还没有到最后; last_key!=key, 就是说这个key是一个新key. 打印之前key一条记录, 因为之前的key所有值都处理完了.
并且更新last_key, 最大值就是val.
如果是重复的key, 就和存储的最大值进行比较.
最后再打印最后一个K/V pair.
测试
书中的例子(python版本)
Hadoop Streaming编程原理
mapper和reducer会从标准输入中读取用户数据,一行一行处理后发送给标准输出,Streaming工具会创建MapReduce job,发送给各个taskTracker,同时监控整个job的执行过程
如果一个文件(可执行文件或者脚本)作为mapper,mapper初始化时,每一个mapper任务会把文件作为一个单独的进程启动
mapper任务运行时,把输入切分成行,然后把每一行提供给可执行文件进程的标准输入。同时mapper收集可执行文件进程的标准输出内容,并把收到的每一行内容转化为key/value对作为mapper的输出。
默认情况下,一行的第一个tab之前的作为key,后面的作为value
如果没有tab,整行作为key,value为空
用法
Hadoop jar + Hadoop Streaming jar + option
option有:
-input
-output
-mapper
-reducer
-file:打包文件到提交的作业中,可以使mapper或者reducer要用的输入文件,如配置文件,字典等
-partitioner
-combiner
-D:作业的一些属性,以前用的是-jobconf
mapred.map.tasks:map task的数目
mapred.reduce.tasks
stream.map.input.field.separator/stream.map.output.field.separator:map输入输出的分隔符,默认为
本地测试:
cat input.txt|python Mapper.py|sort|python Reducer.py
或者
cat input.txt|./Mapper|sort|./Reducer
摘自
http://dongxicheng.org/mapreduce/hadoop-streaming-programming/