zoukankan      html  css  js  c++  java
  • hadoop程序MapReduce之SingletonTableJoin

    需求:单表关联问题。从文件中孩子和父母的关系挖掘出孙子和爷奶关系

    样板:child-parent.txt 

             xiaoming daxiong

             daxiong alice

             daxiong jack

    输出:xiaoming alice

            xiaoming jack

    分析设计:

    mapper部分设计:

    1、<k1,k1>k1代表:一行数据的编号位置,v1代表:一行数据。

    2、左表:<k2,v2>k2代表:parent名字,v2代表:(1,child名字),此处1:代表左表标志。

    3、右表:<k3,v3>k3代表:child名字,v3代表:(2,parent名字),此处2:代表右表标志。

    reduce部分设计:

    4、<k4,v4>k4代表:相同的key,v4代表:list<String>

    5、求笛卡尔积<k5,v5>:k5代表:grandChild名字,v5代表:grandParent名字。

    程序部分:

    SingletonTableJoinMapper类

    package com.cn.singletonTableJoin;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class SingletonTableJoinMapper extends Mapper<Object, Text, Text, Text> {
        @Override
        protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            String childName = new String();
            String parentName = new String();
            String relationType = new String();
            String[] values=new String[2]; 
            int i = 0;
            StringTokenizer itr = new StringTokenizer(value.toString());
            while(itr.hasMoreElements()){
                values[i] = itr.nextToken();
                i++;
            }
            if(values[0].compareTo("child") != 0){
                childName  = values[0];
                parentName = values[1];
                relationType = "1";
                context.write(new Text(parentName), new Text(relationType+" "+childName));
                relationType = "2";
                context.write(new Text(childName), new Text(relationType+" "+parentName));
            }
        } 
    }

    SingletonTableJoinReduce类:

    package com.cn.singletonTableJoin;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class SingletonTableJoinReduce extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            List<String> grandChild = new ArrayList<String>();
            List<String> grandParent = new ArrayList<String>();
            Iterator<Text> itr = values.iterator();
            while(itr.hasNext()){
                String[] record = itr.next().toString().split(" ");
                if(0 == record[0].length()){
                    continue;
                }
                if("1".equals(record[0])){
                    grandChild.add(record[1]);
                }else if("2".equals(record[0])){
                    grandParent.add(record[1]);
                }
            }
            if(0 != grandChild.size() && 0 != grandParent.size()){
                for(String grandchild : grandChild){
                    for(String grandparent : grandParent){
                        context.write(new Text(grandchild), new Text(grandparent));
                    }
                }
            }
        }
    }

    SingletonTableJoin类

    package com.cn.singletonTableJoin;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    /**
     * 单表关联
     * @author root
     *
     */
    public class SingletonTableJoin {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
            if (otherArgs.length != 2) {
               System.err.println("Usage: SingletonTableJoin  ");
               System.exit(2);
            }
            //创建一个job
            Job job = new Job(conf, "SingletonTableJoin");
            job.setJarByClass(SingletonTableJoin.class);
            
            //设置文件的输入输出路径
            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
            
            //设置mapper和reduce处理类
            job.setMapperClass(SingletonTableJoinMapper.class);
            job.setReducerClass(SingletonTableJoinReduce.class);
            
          //设置输出key-value数据类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            
           //提交作业并等待它完成
           System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }

    把总结当成一种习惯。

  • 相关阅读:
    Java 泛型 泛型的约束与局限性
    Java 泛型 泛型方法
    Java 泛型 泛型数组
    Java 泛型 协变性、逆变性
    Java 泛型 协变式覆盖和泛型重载
    Java 泛型 泛型代码和虚拟机
    Insertion Sort List
    Remove Duplicates from Sorted List II
    String to Integer (atoi)
    SpringMvc源码入门
  • 原文地址:https://www.cnblogs.com/xubiao/p/5759422.html
Copyright © 2011-2022 走看看