前言: hadoop中表连接其实类似于我们用sqlserver对数据进行跨表查询时运用的inner join一样,两个连接的数据要有关系连接起来,中间必须有一个相等的字段进行连接,其实hadoop的表连接就是对文本的处理,处理的文本中有一部分的内容是一样的,然后把这鞋大量的数据按照中间的一个相同的部分进行连接,用来解决大数据在关系型数据库查询困难的问题。
之前一直做c#语言的开发是一个本本分分做网站开发的程序员,像对hadoop这类用java语言做开发的内容一直属于菜鸟级别,hadoop中表连接也只最近慢慢学的,也是因为要解决工作中的一些问题慢慢熟悉起来的,在工作中确实解决了不少问题,下面我就讲一下我在工作中那个地方用到了表连接,其实还有很多地方,这里举一个比较经典的。
我们公司是做论坛的,数据了谈不上太大,不过也还够我这种小角色忙活半天了,之前出现了一个需求,就是拿到每一个帖子的发帖用户,和该帖子下所有的回帖用户,然后基于这个数据统计和发帖用户最相关的回帖用户,其实就是找用户与用户之间的关系。
这里的数据有两份(1)发帖人的信息,postid(帖子的id)和userid(发帖人id)(2)回帖人信息,postid(帖子的id)和userid(回帖人id)。这两个数据在数据库中是分别放在两个表中的,看似简单的问题如果一旦跟大数据扯上了关系就不好处理了,这里的发题人信息由3百万的数据,而回帖人的信息有7千多万条数据,中间如果用sqlserver的inner join根本是没办法查询的,这个时候就可以用hadoop的表连接了,首先把数据用工具从数据库到出成文本,因为要对两部分数据进行标示,所以两份数据要在文本中用" "分割的之后的length必须不同,
这一份是主帖的数据第一列是postid,第二列是用户的id,第三列是随便取出来的数据作为主帖的表示
这一份是回帖的数据,第一列是postid,第二列是回帖的用户id
在map阶段用postid作为key进行,中间给主帖一个标示typeL,并且给回帖的数据进行一个标示typeR。
1 private static final Text typeL = new Text("A:"); 2 private static final Text typeR = new Text("B:"); 3 public static class GetMap extends Mapper<Object, Text, Text, MapWritable> { 4 public void map(Object key,Text value,Context context) throws IOException,InterruptedException { 5 if (value==null) { 6 return; 7 } 8 9 String [] detail=value.toString().split(" "); 10 if (detail.length==2) { 11 12 //回帖信息 13 Text threadid=new Text(detail[0]); 14 MapWritable mapWritable=new MapWritable(); 15 mapWritable.put(typeR, new Text(detail[1])); 16 context.write(threadid, mapWritable); 17 } 18 else 19 { 20 //主帖信息 21 Text threadid=new Text(detail[0]); 22 MapWritable mapWritable=new MapWritable(); 23 mapWritable.put(typeL, new Text(detail[1])); 24 context.write(threadid, mapWritable); 25 } 26 27 } 28 29 }
在reduce就可以把相关联的帖子的发帖人id和回帖人id关联起来了
1 public static class GetReduce extends Reducer<Text, MapWritable, Text, Text>{ 2 public void reduce(Text key,Iterable<MapWritable> values,Context context) throws IOException,InterruptedException{ 3 Iterator<MapWritable> iterator=values.iterator(); 4 ArrayList<Text> threadList=new ArrayList<Text>(); 5 ArrayList<Text> postlist=new ArrayList<Text>(); 6 while (iterator.hasNext()) { 7 MapWritable mapWritable=iterator.next(); 8 if (mapWritable.containsKey(typeL)) { 9 threadList.add((Text) mapWritable.get(typeL)); 10 }else if (mapWritable.containsKey(typeR)) { 11 postlist.add((Text)mapWritable.get(typeR)); 12 13 } 14 } 15 if (threadList.size()==1&&postlist.size()>0) { 16 String string=""; 17 for (Text text : postlist) { 18 string=string+text.toString()+" "; 19 } 20 context.write(threadList.get(0), new Text(string)); 21 } 22 23 } 24 }
最终的结果展示
每一行中,第一个就是发帖人id,后边跟着的就是回帖人的信息。