Hive支持自定义map与reduce script。接下来我用一个简单的wordcount例子加以说明。使用Python开发(如果使用Java开发,请看这里)。
开发环境:
python:2.7.5
hive:2.3.0
hadoop:2.8.1
一、map与reduce脚本
map脚本(mapper.py)
#!/usr/bin/python import sys import re while True: line = sys.stdin.readline().strip() if not line: break p = re.compile(r'W+') words=p.split(line) #write the tuples to stdout for word in words: print '%s %s' % (word, "1")
reduce脚本(reducer.py)
#!/usr/bin/python import sys # maps words to their counts word2count = {} while True: line=sys.stdin.readline().strip() if not line: break # parse the input we got from mapper.py try: word,count= line.split(' ', 1) except: continue # convert count (currently a string) to int try: count = int(filter(str.isdigit,count)) except ValueError: continue try: word2count[word] = word2count[word]+count except: word2count[word] = count # write the tuples to stdout # Note: they are unsorted for word in word2count.keys(): print '%s %s' % ( word, word2count[word] )
注意一点的是,不能使用for line in std.in,因为for是一个字节一个字节的读取,而不是一行一行地读。而且在对map输出的word,count进行拆分时,要注意将拆分的count部分非数字部分去掉,以免count转换成int错误。
二、编写hive hql
drop table if exists raw_lines; -- create table raw_line, and read all the lines in '/user/inputs', this is the path on your local HDFS create external table if not exists raw_lines(line string) ROW FORMAT DELIMITED stored as textfile location '/user/inputs'; drop table if exists word_count; -- create table word_count, this is the output table which will be put in '/user/outputs' as a text file, this is the path on your local HDFS create external table if not exists word_count(word string, count int) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' lines terminated by ' ' STORED AS TEXTFILE LOCATION '/user/outputs/'; -- add the mapper&reducer scripts as resources, please change your/local/path add file /home/yanggy/mapper.py; add file /home/yanggy/reducer.py; from ( from raw_lines map raw_lines.line --call the mapper here using 'mapper.py' as word, count cluster by word) map_output insert overwrite table word_count reduce map_output.word, map_output.count --call the reducer here using 'reducer.py' as word,count;