zoukankan      html  css  js  c++  java
  • MapReduce:给出children-parents(孩子——父母)表,要求输出grandchild-grandparent(孙子——爷奶)表

    hadoop中使用MapReduce单表关联案例:

    MapReduce:给出children-parents(孩子——父母)表,要求输出grandchild-grandparent(孙子——爷奶)表。

    给出表:

    Tom Lucy
    Tom Jack
    Jone Lucy
    Jone Jack
    Lucy Mary
    Lucy Ben
    Jack Alice
    Jack Jesse
    Terry Alice
    Terry Jesse
    Philip Terry
    Philip Alma
    Mark Terry
    Mark Alma

    要求实现如下效果:

    设计思路:将这张单表分成两张表如下:

    将左表的parents列和右表的child列进行连接,连接结果中除去连接的两列就是所需要的结果:"grandchild--grandparents"表。

    因为MapReduce的shuffle过程会将相同的key会连接在一起,所以在map阶段将读入数据分割成children和parents之后,

    左表:将parents设置成key,children设置成value进行输出

    右表:将children设置成key,parents设置成value进行输出

    为了区分输出中的左右表,需要在输出的value中再加上左右表的信息,比如在value的最开始处加上字符1表示左表,加上字符2表示右表。

    这样在map的结果中就形成了左表和右表,然后在shuffle过程中完成连接。

    reduce接收到连接的结果,遍历values集合,得到每个value的值,将左表中的children放入children数组,右表中的parents放入parents数组,然后对两个数组求笛卡尔积就能得到最后结果。

    代码如下(由于水平有限,不保证完全正确,如果发现错误欢迎指正):

    package com;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    public class TestParents {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration config = new  Configuration();
            config.set("fs.defaultFS", "hdfs://192.168.0.100:9000");
            config.set("yarn.resourcemanager.hostname", "192.168.0.100");
            
            FileSystem fs = FileSystem.get(config);
            
            Job job = Job.getInstance(config);
            
            job.setJarByClass(TestParents.class);
            
            //设置所用到的map类
            job.setMapperClass(myMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            
            //设置用到的reducer类
            job.setReducerClass(myReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            
            //设置输入输出地址
            FileInputFormat.addInputPath(job, new Path("/input/parent.txt"));
            
            Path path = new Path("/output3/");
            
            if(fs.exists(path)){
                fs.delete(path, true);
            }
            
            //指定文件的输出地址
            FileOutputFormat.setOutputPath(job, path);
            
            //启动处理任务job
            boolean completion = job.waitForCompletion(true);
            if(completion){
                System.out.println("Job Success!");
            }
            
        }
        
         public static class myMapper extends Mapper<Object, Text, Text, Text> {
                // 实现map函数
                public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                    String temp=new String();// 左右表标识
                    
                    String values=value.toString();
                    String words[]=values.split(" ");
                    
                    //Tom    Lucy
                    // 输出左表
                    temp = "1";
                    context.write(new Text(words[1]), new Text(temp +"+"+ words[0] + "+" + words[1]));
                    //(Lucy,1+Tom+Lucy)
     
                    // 输出右表
                    temp = "2";
                    context.write(new Text(words[0]), new Text(temp +"+"+ words[0] + "+" + words[1]));
                    //(Tom,2+Tom+Lucy)
                }
            }
    
            public static class myReducer extends Reducer<Text, Text, Text, Text> {
                // 实现reducer函数
                public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                   List<String> grandchild = new ArrayList<String>();
                   List<String> grandparent = new ArrayList<String>();
                   
                   for (Text value : values) {
                    char temp=(char) value.charAt(0);
                    String words[]=value.toString().split("[+]");  //1,Tom+Lucy
                    // +、*、|、/等符号在正则表达示中有相应的不同意义,一般来讲只需要加[]、或是\即可
                    
                    if(temp == '1'){
                        grandchild.add(words[1]);
                    }
                    
                    if(temp == '2'){
                        grandparent.add(words[2]);
                    }
                }
                   
                   //求笛卡尔儿积
                   for (String gc : grandchild) {
                    for (String gp : grandparent) {
                        context.write(new Text(gc),new Text(gp));
                    }
                }
            }
        } 
    }

    运行详解:

    (1)Map处理如下所示:

        Tom        Lucy                    map输出:               <Lucy,1+Tom+Lucy>
                                                            <Tom,2+Tom+Lucy >
    
        Tom        Jack                    map输出:                <Jack,1+Tom+Jack>
                                                            <Tom,2+Tom+Jack>
    
        Jone        Lucy                   map输出:                <Lucy,1+Jone+Lucy>
                                                            <Jone,2+Jone+Lucy>
    
        Jone        Jack                   map输出:                <Jack,1+Jone+Jack>
                                                            <Jone,2+Jone+Jack>
    
        Lucy        Mary                   map输出:                <Mary,1+Lucy+Mary>
                                                            <Lucy,2+Lucy+Mary>
    
        Lucy        Ben                    map输出:                <Ben,1+Lucy+Ben>
                                                             <Lucy,2+Lucy+Ben>
    
        Jack        Alice                  map输出:                <Alice,1+Jack+Alice>
                                                              <Jack,2+Jack+Alice>
    
        Jack        Jesse                  map输出:                <Jesse,1+Jack+Jesse>
                                                              <Jack,2+Jack+Jesse>
    
        Terry        Alice                 map输出:               <Alice,1+Terry+Alice>
                                                              <Terry,2+Terry+Alice>
    
        Terry        Jesse                 map输出:                <Jesse,1+Terry+Jesse>
                                                              <Terry,2+Terry+Jesse>
    
        Philip        Terry                map输出:               <Terry,1+Philip+Terry>
                                                              <Philip,2+Philip+Terry>
    
        Philip        Alma                 map输出:                <Alma,1+Philip+Alma>
                                                              <Philip,2+Philip+Alma>
    
        Mark        Terry                  map输出:                <Terry,1+Mark+Terry>
                                                              <Mark,2+Mark+Terry>
    
        Mark        Alma                   map输出:                <Alma,1+Mark+Alma>
                                                               <Mark,2+Mark+Alma>

    (2)Shuffle处理如下:

    map函数输出

    排序结果

    shuffle连接

    <Lucy1+Tom+Lucy>

    <Tom2+Tom+Lucy>

    <Jack1+Tom+Jack>

    <Tom2+Tom+Jack>

    <Lucy1+Jone+Lucy>

    <Jone2+Jone+Lucy>

    <Jack1+Jone+Jack>

    <Jone2+Jone+Jack>

    <Mary1+Lucy+Mary>

    <Lucy2+Lucy+Mary>

    <Ben1+Lucy+Ben>

    <Lucy2+Lucy+Ben>

    <Alice1+Jack+Alice>

    <Jack2+Jack+Alice>

    <Jesse1+Jack+Jesse>

    <Jack2+Jack+Jesse>

    <Alice1+Terry+Alice>

    <Terry2+Terry+Alice>

    <Jesse1+Terry+Jesse>

    <Terry2+Terry+Jesse>

    <Terry1+Philip+Terry>

    <Philip2+Philip+Terry>

    <Alma1+Philip+Alma>

    <Philip2+Philip+Alma>

    <Terry1+Mark+Terry>

    <Mark2+Mark+Terry>

    <Alma1+Mark+Alma>

    <Mark2+Mark+Alma>

    <Alice1+Jack+Alice>

    <Alice1+Terry+Alice>

    <Alma1+Philip+Alma>

    <Alma1+Mark+Alma>

    <Ben1+Lucy+Ben>

    <Jack1+Tom+Jack>

    <Jack1+Jone+Jack>

    <Jack2+Jack+Alice>

    <Jack2+Jack+Jesse>

    <Jesse1+Jack+Jesse>

    <Jesse1+Terry+Jesse>

    <Jone2+Jone+Lucy>

    <Jone2+Jone+Jack>

    <Lucy1+Tom+Lucy>

    <Lucy1+Jone+Lucy>

    <Lucy2+Lucy+Mary>

    <Lucy2+Lucy+Ben>

    <Mary1+Lucy+Mary>

    <Mark2+Mark+Terry>

    <Mark2+Mark+Alma>

    <Philip2+Philip+Terry>

    <Philip2+Philip+Alma>

    <Terry2+Terry+Alice>

    <Terry2+Terry+Jesse>

    <Terry1+Philip+Terry>

    <Terry1+Mark+Terry>

    <Tom2+Tom+Lucy>

    <Tom2+Tom+Jack>

    <Alice1+Jack+Alice

            1+Terry+Alice

            1+Philip+Alma

            1+Mark+Alma >

    <Ben1+Lucy+Ben>

    <Jack1+Tom+Jack

            1+Jone+Jack

            2+Jack+Alice

            2+Jack+Jesse >

    <Jesse1+Jack+Jesse

            1+Terry+Jesse >

    <Jone2+Jone+Lucy

            2+Jone+Jack>

    <Lucy1+Tom+Lucy

            1+Jone+Lucy

            2+Lucy+Mary

            2+Lucy+Ben>

    <Mary1+Lucy+Mary

            2+Mark+Terry

            2+Mark+Alma>

    <Philip2+Philip+Terry

            2+Philip+Alma>

    <Terry2+Terry+Alice

            2+Terry+Jesse

            1+Philip+Terry

            1+Mark+Terry>

    <Tom2+Tom+Lucy

            2+Tom+Jack>

    (3)Reduce处理:

    取出values(Jack , {1+Tom+Jack},{1+Jone+Jack},{1+Jone+Jack},{2+Jack+Jesse })遍历出来的一条value的值:1+Tom+Jack

    根据1或者2,把值给grandchild数组和grandparent数组。

    最后由语句:           for (String gc : grandchild) {

                    for (String gp : grandparent) {
                        context.write(new Text(gc),new Text(gp));
                    }
                }

    得知:只要数组grandchild中没有值或者数组grandparent没有值,则不会做处理,根据这条规则去除无效的shuffle连接,就能得出最后的结果。

    如果您认为这篇文章还不错或者有所收获,您可以通过右边的“打赏”功能 打赏我一杯咖啡【物质支持】,也可以点击下方的【好文要顶】按钮【精神支持】,因为这两种支持都是使我继续写作、分享的最大动力!

  • 相关阅读:
    你人生中的那口井挖了没有?
    Stream接口
    console (控制台)
    assert.fail()
    assert.strictEqual()
    assert.equal()
    assert.ifError()
    assert.ok()
    nodejs assert 模块
    闭包
  • 原文地址:https://www.cnblogs.com/supiaopiao/p/7244007.html
Copyright © 2011-2022 走看看