zoukankan      html  css  js  c++  java
  • Hadoop-Map/Reduce之单表连接的实现

    MapReduce程序就是根据其特性对数据进行一个简单的逻辑处理,其中最为重要的一个特性就是根据key值将value值进行合并,其次就是在shuffle阶段有排序。
    遇到一个MR程序就是要巧妙利用合并、排序的特性。
    单表关联就是根据利用了合并的原理。
    先上测试数据
    child    parent
    Tom    Lucy
    Tom    Jack
    Lucy    Marry
    Lucy    Ben
    Jack    Alice
    Jack    Jesse
     
    结果数据
    grandchild    grandparent
    Tom    Marry
    Tom    Ben
    Tom    Alice
    Tom    Jesse
     
    原理说明:
    从要求中我们很容易想到利用parent作为key,这样就能够把grandchild和grandparent放到valuelist中。对valueList中的值进行一个笛卡尔积就能够得到最终结果。
    单表连接中,左表和右表都是自身,我们用c#区分左表,用p#区分右表
    map\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
    context.write(" Lucy", " C#Tom")        context.write(" Jack", " C#Tom")    context.write(" Marry", " C#Lucy")   context.write(" Alice", " C#Jack")    ......
    context.write(" Tom", " P#Lucy")        context.write(" Tom", " P#Jack")    context.write(" Lucy", " P#Marry")   context.write(" Jack", " P#Alice")    ......
     
    <" Lucy" , {" C#Tom", " P#Marry", " P#Ben"}>  <" Jack" , {" C#Tom", " P#Alice", " P#Jesse"}>     <" Marry" , { " C#Lucy"}>    <" Alice" , { " C#Jack"}>     <" Tom" , {" P#Lucy"," P#Jack"}>
    Reduce\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
    context.write(" Tom", " Marry")    context.write(" Tom", " Ben")        context.write(" Tom", " Alice")    context.write(" Tom", " Jesse")
     
    代码奉上
     
    package cn.genekang.hadoop.test;
    
    import java.io.IOException;
    import java.util.ArrayList;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class STjoin {
        /*
         * child parentTom LucyTom JackLucy MarryLucy BenJack AliceJack Jesse* *
         */
        // 单表连接
        public static class StjoinMap extends
                Mapper<LongWritable, Text, Text, Text> {
    
            private Text kText = new Text();
            private Text vText = new Text();
    
            @Override
            protected void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                String[] lineSplit = value.toString().split("	");
                // c#代表的是左表 p#代表的是右表
                // 右表
                kText.set(lineSplit[1]);
                vText.set("p#" + lineSplit[0]);
                context.write(kText, vText);
    
                // 左表
                kText.set(lineSplit[0]);
                vText.set("c#" + lineSplit[1]);
                context.write(kText, vText);
    
            }
    
        }
    
        public static class StjoinReduce extends Reducer<Text, Text, Text, Text> {
            private Text kText = new Text();
            private Text vText = new Text();
    
            @Override
            protected void reduce(Text key, Iterable<Text> values, Context context)
                    throws IOException, InterruptedException {
                ArrayList<String> cList = new ArrayList<String>();
                ArrayList<String> pList = new ArrayList<String>();
                for (Text v : values) {
                    if (v.toString().contains("c#")) {
                        cList.add(v.toString().substring(2));
                    } else if (v.toString().contains("p#")) {
                        pList.add(v.toString().substring(2));
    
                    }
                }
    
                if (!cList.isEmpty() && !pList.isEmpty()) {
                    for (String c : cList) {
                        for (String p : pList) {
                            kText.set(c);
                            vText.set(p);
                            context.write(kText, vText);
                        }
                    }
                }
    
                // 清空list
                cList.clear();
                pList.clear();
            }
    
        }
    
        public static void main(String[] args) throws IOException,
                ClassNotFoundException, InterruptedException {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            job.setJarByClass(STjoin.class);
    
            job.setMapperClass(StjoinMap.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
    
            job.setReducerClass(StjoinReduce.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    
    }
  • 相关阅读:
    十一.状态设计模式
    十. 享元设计模式
    Orcale(一)概念
    java类加载器
    spring中的事务管理机制
    spring中的annotation注解类配置
    countDownLatch和Semaphore用于多线程
    布隆过滤器
    mybatis-genator自动生成的mapper中模糊查询使用方法
    java中的异常
  • 原文地址:https://www.cnblogs.com/6tian/p/4062074.html
Copyright © 2011-2022 走看看