zoukankan      html  css  js  c++  java
  • hadoop程序MapReduce之SingletonTableJoin

    需求:单表关联问题。从文件中孩子和父母的关系挖掘出孙子和爷奶关系

    样板:child-parent.txt 

             xiaoming daxiong

             daxiong alice

             daxiong jack

    输出:xiaoming alice

            xiaoming jack

    分析设计:

    mapper部分设计:

    1、<k1,k1>k1代表:一行数据的编号位置,v1代表:一行数据。

    2、左表:<k2,v2>k2代表:parent名字,v2代表:(1,child名字),此处1:代表左表标志。

    3、右表:<k3,v3>k3代表:child名字,v3代表:(2,parent名字),此处2:代表右表标志。

    reduce部分设计:

    4、<k4,v4>k4代表:相同的key,v4代表:list<String>

    5、求笛卡尔积<k5,v5>:k5代表:grandChild名字,v5代表:grandParent名字。

    程序部分:

    SingletonTableJoinMapper类

    package com.cn.singletonTableJoin;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class SingletonTableJoinMapper extends Mapper<Object, Text, Text, Text> {
        @Override
        protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            String childName = new String();
            String parentName = new String();
            String relationType = new String();
            String[] values=new String[2]; 
            int i = 0;
            StringTokenizer itr = new StringTokenizer(value.toString());
            while(itr.hasMoreElements()){
                values[i] = itr.nextToken();
                i++;
            }
            if(values[0].compareTo("child") != 0){
                childName  = values[0];
                parentName = values[1];
                relationType = "1";
                context.write(new Text(parentName), new Text(relationType+" "+childName));
                relationType = "2";
                context.write(new Text(childName), new Text(relationType+" "+parentName));
            }
        } 
    }

    SingletonTableJoinReduce类:

    package com.cn.singletonTableJoin;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class SingletonTableJoinReduce extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            List<String> grandChild = new ArrayList<String>();
            List<String> grandParent = new ArrayList<String>();
            Iterator<Text> itr = values.iterator();
            while(itr.hasNext()){
                String[] record = itr.next().toString().split(" ");
                if(0 == record[0].length()){
                    continue;
                }
                if("1".equals(record[0])){
                    grandChild.add(record[1]);
                }else if("2".equals(record[0])){
                    grandParent.add(record[1]);
                }
            }
            if(0 != grandChild.size() && 0 != grandParent.size()){
                for(String grandchild : grandChild){
                    for(String grandparent : grandParent){
                        context.write(new Text(grandchild), new Text(grandparent));
                    }
                }
            }
        }
    }

    SingletonTableJoin类

    package com.cn.singletonTableJoin;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    /**
     * 单表关联
     * @author root
     *
     */
    public class SingletonTableJoin {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
            if (otherArgs.length != 2) {
               System.err.println("Usage: SingletonTableJoin  ");
               System.exit(2);
            }
            //创建一个job
            Job job = new Job(conf, "SingletonTableJoin");
            job.setJarByClass(SingletonTableJoin.class);
            
            //设置文件的输入输出路径
            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
            
            //设置mapper和reduce处理类
            job.setMapperClass(SingletonTableJoinMapper.class);
            job.setReducerClass(SingletonTableJoinReduce.class);
            
          //设置输出key-value数据类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            
           //提交作业并等待它完成
           System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }

    把总结当成一种习惯。

  • 相关阅读:
    (转)我是一个小线程
    Gson本地和服务器环境不同遇到的Date转换问题 Failed to parse date []: Invalid time zone indicator
    Bigdecimal 比较equals与compareTo
    springboot jpa mongodb 多条件分页查询
    springboot Consider defining a bean of type 'xxx' in your configuration
    mongodb you can't add a second
    java8 获取某天最大(23:59:59)和最小时间(00:00:00)
    java volatile详解
    SpringBoot dubbo之class is not visible from class loader
    springboot dubbo filter之依赖注入null
  • 原文地址:https://www.cnblogs.com/xubiao/p/5759422.html
Copyright © 2011-2022 走看看