Hadoop Join 与 Not In的实现
(一)源数据与要实现的查询
1. 要实现的查询
select a.sid
,a.name
,b.course
,b.score
from Student a
join Score b
on a.SID = b.SID
left outer join Filter c
on a.name = c.name
where c.name is null
2. 本地源文件:以制表符为字段分隔符
student.dat course.dat filter.dat
(二)Hadoop join的实现思路
给每个数据源加上一个数字标记label,这样hadoop对其排序后同一个字段的数据排在一起并且按照label排好序了,于是直接将相邻相同key的数据合并在一起输出就得到了结果。
1、 map阶段:给表1和表2加标记,其实就是多输出一个字段,比如表一加标记为0,表2加标记为2;
2、 partion阶段:根据学号key为第一主键,标记label为第二主键进行排序和分区
3、 reduce阶段:由于已经按照第一主键、第二主键排好了序,将相邻相同key数据合并输出
(三)Streaming快速编程
1. mapper.py
#!/usr/bin/env python
import os
import sys,re
def mapper():
ip = set() # 尽量不要声明global
def load_Fil(file='dictlink'):
for line in open():
yield ip.add(line.strip())
def read_input(file):
filepath = os.environ["map_input_file"]
filename = re.findall('(.*).dat',os.path.split(filepath)[-1])[0]
for line in file:
if line.strip()=="":
continue
fields = line[:-1].split("t")
sno = fields[0]
if filename == 'student':
name = fields[1]
yield (sno,'0',name)
elif filename == 'course':
courseno = fields[1]
grade = fields[2]
yield (sno,'1',courseno,grade)
for k in load_Fil():
pass
data = read_input(sys.stdin)
for field in data:
if len(field) == 3 and field[2] not in ip:
print 't'.join((field[0],field[1],field[2]))
elif len(field) == 4:
print 't'.join((field[0],field[1],field[2],field[3]))
if __name__=='__main__':
mapper()
2. reduce.py
#!/usr/bin/env python
import sys
lastsno = ''
for line in sys.stdin:
if line.strip()=="":
continue
fields = line.strip().split('t')
sno = fields[0]
if sno != lastsno:
name=''
if fields[1] == '0':
name = fields[2]
elif sno==lastsno:
if fields[1] == '1':
courseno = fields[2]
grade = fields[3]
#sys.stderr.write('(4) ****:%s||%sn'%(courseno,grade))
if name:
print 't'.join((lastsno,name,courseno,grade))
lastsno = sno
#sys.stderr.write('(6) *****%s=%s*******n'%(lastsno,sno))
(四)准备文件
1. 准备好本地文件
2. 将本地文件student.dat与course.dat 上传到云梯目录
hadoop fs -mkdir /group/alidw/ali-log/wfs/join hadoop fs -put student.dat course.dat /group/alidw/ali-log/wfs/join
(五)设置Streaming参数并执行
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-0.19.1-dc-streaming.jar
-D stream.num.map.output.key.fields=2
-D num.key.fields.for.partition=1
-D mapred.map.tasks=10
-D mapred.reduce.tasks=3
-D mapred.job.name="join_test"
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
-input /group/alidw/ali-log/wfs/join/course.dat
-input /group/alidw/ali-log/wfs/join/student.dat
-output /group/alidw/ali-log/wfs/joinput
-mapper mapper.py
-reducer reduce.py
-file /home/dwapp/fusen.wangfs/MR/join/mapper.py
-file /home/dwapp/fusen.wangfs/MR/join/reduce.py
-cacheFile /group/alidw/ali-log/wfs/join/filter.dat#dictlink
(六)最后的输出:为了便于查看,你也可以创建外部表
[dwapp@dw-yuntigw-64 join]$ hadoop fs -cat /group/alidw/ali-log/wfs/joinput/part-00000 001 Jack Language 90 001 Jack Math 80 [dwapp@dw-yuntigw-64 join]$ hadoop fs -cat /group/alidw/ali-log/wfs/joinput/part-00001 002 Marry Language 95 002 Marry Math 82 [dwapp@dw-yuntigw-64 join]$ hadoop fs -cat /group/alidw/ali-log/wfs/joinput/part-00002 003 Nacy Language 80
(七)总结
1. 如果过滤表不大,数据过滤(如not in,not like等)应尽量放在map阶段来实现
2. 否则,过滤操作就在reduce阶段实现
3. 本文通过Streaming文件分发策略,来实现各个map的数据过滤
4. 重写reduce.py后,MapJoin & Left Semi Join & Left Outer Join 等亦可快速实现