Hadoop Join 与 Not In的实现
(一)源数据与要实现的查询
1. 要实现的查询
select a.sid ,a.name ,b.course ,b.score from Student a join Score b on a.SID = b.SID left outer join Filter c on a.name = c.name where c.name is null
2. 本地源文件:以制表符为字段分隔符
student.dat course.dat filter.dat
(二)Hadoop join的实现思路
给每个数据源加上一个数字标记label,这样hadoop对其排序后同一个字段的数据排在一起并且按照label排好序了,于是直接将相邻相同key的数据合并在一起输出就得到了结果。
1、 map阶段:给表1和表2加标记,其实就是多输出一个字段,比如表一加标记为0,表2加标记为2;
2、 partion阶段:根据学号key为第一主键,标记label为第二主键进行排序和分区
3、 reduce阶段:由于已经按照第一主键、第二主键排好了序,将相邻相同key数据合并输出
(三)Streaming快速编程
1. mapper.py
#!/usr/bin/env python import os import sys,re def mapper(): ip = set() # 尽量不要声明global def load_Fil(file='dictlink'): for line in open(): yield ip.add(line.strip()) def read_input(file): filepath = os.environ["map_input_file"] filename = re.findall('(.*).dat',os.path.split(filepath)[-1])[0] for line in file: if line.strip()=="": continue fields = line[:-1].split("t") sno = fields[0] if filename == 'student': name = fields[1] yield (sno,'0',name) elif filename == 'course': courseno = fields[1] grade = fields[2] yield (sno,'1',courseno,grade) for k in load_Fil(): pass data = read_input(sys.stdin) for field in data: if len(field) == 3 and field[2] not in ip: print 't'.join((field[0],field[1],field[2])) elif len(field) == 4: print 't'.join((field[0],field[1],field[2],field[3])) if __name__=='__main__': mapper()
2. reduce.py
#!/usr/bin/env python import sys lastsno = '' for line in sys.stdin: if line.strip()=="": continue fields = line.strip().split('t') sno = fields[0] if sno != lastsno: name='' if fields[1] == '0': name = fields[2] elif sno==lastsno: if fields[1] == '1': courseno = fields[2] grade = fields[3] #sys.stderr.write('(4) ****:%s||%sn'%(courseno,grade)) if name: print 't'.join((lastsno,name,courseno,grade)) lastsno = sno #sys.stderr.write('(6) *****%s=%s*******n'%(lastsno,sno))
(四)准备文件
1. 准备好本地文件
2. 将本地文件student.dat与course.dat 上传到云梯目录
hadoop fs -mkdir /group/alidw/ali-log/wfs/join hadoop fs -put student.dat course.dat /group/alidw/ali-log/wfs/join
(五)设置Streaming参数并执行
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-0.19.1-dc-streaming.jar -D stream.num.map.output.key.fields=2 -D num.key.fields.for.partition=1 -D mapred.map.tasks=10 -D mapred.reduce.tasks=3 -D mapred.job.name="join_test" -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner -input /group/alidw/ali-log/wfs/join/course.dat -input /group/alidw/ali-log/wfs/join/student.dat -output /group/alidw/ali-log/wfs/joinput -mapper mapper.py -reducer reduce.py -file /home/dwapp/fusen.wangfs/MR/join/mapper.py -file /home/dwapp/fusen.wangfs/MR/join/reduce.py -cacheFile /group/alidw/ali-log/wfs/join/filter.dat#dictlink
(六)最后的输出:为了便于查看,你也可以创建外部表
[dwapp@dw-yuntigw-64 join]$ hadoop fs -cat /group/alidw/ali-log/wfs/joinput/part-00000 001 Jack Language 90 001 Jack Math 80 [dwapp@dw-yuntigw-64 join]$ hadoop fs -cat /group/alidw/ali-log/wfs/joinput/part-00001 002 Marry Language 95 002 Marry Math 82 [dwapp@dw-yuntigw-64 join]$ hadoop fs -cat /group/alidw/ali-log/wfs/joinput/part-00002 003 Nacy Language 80
(七)总结
1. 如果过滤表不大,数据过滤(如not in,not like等)应尽量放在map阶段来实现
2. 否则,过滤操作就在reduce阶段实现
3. 本文通过Streaming文件分发策略,来实现各个map的数据过滤
4. 重写reduce.py后,MapJoin & Left Semi Join & Left Outer Join 等亦可快速实现