zoukankan      html  css  js  c++  java
  • MapReduce多表连接

    多表关联

        多表关联和单表关联类似,它也是通过对原始数据进行一定的处理,从其中挖掘出关心的信息。下面进入这个实例。

    1 实例描述

        输入是两个文件,一个代表工厂表,包含工厂名列和地址编号列;另一个代表地址表,包含地址名列和地址编号列。要求从输入数据中找出工厂名地址名对应关系,输出"工厂名——地址名"表。

        样例输入如下所示。

        1)factory:

    factoryname                    addressed

    Beijing Red Star                    1

    Shenzhen Thunder                3

    Guangzhou Honda                2

    Beijing Rising                       1

    Guangzhou Development Bank      2

    Tencent                        3

    Back of Beijing                     1

        2)address:

    addressID    addressname

    1            Beijing

    2            Guangzhou

    3            Shenzhen

    4            Xian

        样例输出如下所示。

    factoryname                        addressname

    Back of Beijing                          Beijing

    Beijing Red Star                        Beijing

    Beijing Rising                          Beijing

    Guangzhou Development Bank          Guangzhou

    Guangzhou Honda                    Guangzhou

    Shenzhen Thunder                    Shenzhen

    Tencent                            Shenzhen

    2 设计思路

        多表关联和单表关联相似,都类似于数据库中的自然连接。相比单表关联,多表关联的左右表和连接列更加清楚。所以可以采用和单表关联的相同处理方式,map识别出输入的行属于哪个表之后,对其进行分割,将连接的列值保存在key中,另一列和左右表标识保存在value中,然后输出。reduce拿到连接结果之后,解析value内容,根据标志将左右表内容分开存放,然后求笛卡尔积,最后直接输出。

        这个实例的具体分析参考单表关联实例。下面给出代码。

      1 import java.io.IOException;
      2 import java.lang.String;
      3 import java.util.Iterator;
      4 import java.util.StringTokenizer;
      5 
      6 import org.apache.hadoop.fs.Path;
      7 import org.apache.hadoop.io.Text;
      8 import org.apache.hadoop.mapreduce.Job;
      9 import org.apache.hadoop.mapreduce.Mapper;
     10 import org.apache.hadoop.mapreduce.Reducer;
     11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     13 
     14 public class MTJoin {
     15     public static int time = 0;
     16 
     17     public static class Map extends Mapper<Object, Text, Text, Text> {
     18 
     19         @Override
     20         protected void map(Object key, Text value, Context context)
     21                 throws IOException, InterruptedException {
     22             String line = value.toString();
     23             String relationType = new String();
     24             if (line.contains("factoryname") == true
     25                     || line.contains("addressID") == true) {
     26                 return;
     27             }
     28 
     29             StringTokenizer itr = new StringTokenizer(line);
     30             String mapkey = new String();
     31             String mapvalue = new String();
     32 
     33             String[] split = line.split("    ");
     34 
     35             if (split.length == 2 && split[1].charAt(0) >= '0'
     36                     && split[1].charAt(0) <= '9') {
     37                 mapkey = split[1];
     38                 mapvalue = split[0];
     39                 relationType = "1";
     40             }
     41             if (split.length == 2 && split[0].charAt(0) >= '0'
     42                     && split[0].charAt(0) <= '9') {
     43                 mapkey = split[0];
     44                 mapvalue = split[1];
     45                 relationType = "2";
     46             }
     47 
     48             context.write(new Text(mapkey), new Text(relationType + "+"
     49                     + mapvalue));
     50 
     51         }
     52     }
     53 
     54     public static class Reduce extends Reducer<Text, Text, Text, Text> {
     55 
     56         @Override
     57         protected void reduce(Text key, Iterable<Text> values, Context context)
     58                 throws IOException, InterruptedException {
     59             if (0 == time) {
     60                 context.write(new Text("factoryname"), new Text("addressname"));
     61                 time++;
     62             }
     63             
     64             int factorynum = 0;
     65             String[] factory = new String[10];
     66             int addressnum = 0;
     67             String[] address = new String[10];
     68 
     69             for(Text value:values ){
     70                 if (0 == value.toString().length()) {
     71                     continue;
     72                 }
     73                 
     74                 char relationType = value.toString().charAt(0);
     75 
     76                 // left
     77                 if ('1' == relationType) {
     78                     factory[factorynum] = value.toString().substring(2);
     79                     factorynum++;
     80                 }
     81                 // right
     82                 if ('2' == relationType) {
     83                     address[addressnum] = value.toString().substring(2);
     84                     addressnum++;
     85                 }
     86             }
     87             
     88 
     89             if (0 != factorynum && 0 != addressnum) {
     90                 for (int m = 0; m < factorynum; m++) {
     91                     for (int n = 0; n < addressnum; n++) {
     92                         context.write(new Text(factory[m]),
     93                                 new Text(address[n]));
     94                     }
     95                 }
     96             }
     97         }
     98 
     99     }
    100 
    101     public static void main(String[] args) throws Exception {
    102         Job job = new Job();
    103         job.setJobName("MTJoin");
    104         job.setJarByClass(MTJoin.class);
    105 
    106         job.setMapperClass(Map.class);
    107         job.setReducerClass(Reduce.class);
    108 
    109         job.setOutputKeyClass(Text.class);
    110         job.setOutputValueClass(Text.class);
    111 
    112         FileInputFormat.addInputPath(job, new Path(args[0]));
    113         FileOutputFormat.setOutputPath(job, new Path(args[1]));
    114 
    115         System.exit(job.waitForCompletion(true) ? 0 : 1);
    116     }
    117 }
    View Code
  • 相关阅读:
    131. Palindrome Partitioning
    130. Surrounded Regions
    129. Sum Root to Leaf Numbers
    128. Longest Consecutive Sequence
    125. Valid Palindrome
    124. Binary Tree Maximum Path Sum
    122. Best Time to Buy and Sell Stock II
    121. Best Time to Buy and Sell Stock
    120. Triangle
    119. Pascal's Triangle II
  • 原文地址:https://www.cnblogs.com/liutoutou/p/3481903.html
Copyright © 2011-2022 走看看