zoukankan      html  css  js  c++  java
  • Mapreduce实例——单表join

    现有某电商的用户好友数据文件,名为 buyer1,buyer1中包含(buyer_id,friends_id)两个字段,内容是以"\t"分隔,编写MapReduce进行单表连接,查询出用户的间接好友关系。例如:10001的好友是10002,而10002的好友是10005,那么10001和10005就是间接好友关系。

    buyer1(buyer_id,friends_id)
    10001    10002
    10002    10005
    10003    10002
    10004    10006
    10005    10007
    10006    10022
    10007    10032
    10009    10006
    10010    10005
    10011    10013
    buyer1

    mapreduce程序:

    package mapreduce7;
    
    import java.io.IOException;
    import java.util.Iterator;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    //04.Mapreduce实例——单表join
    public class DanJoin {
        public static class Map extends Mapper<Object,Text,Text,Text>{
            public void map(Object key,Text value,Context context)
                    throws IOException,InterruptedException{
                String line = value.toString();
                String[] arr = line.split("\t");
                String mapkey=arr[0];
                String mapvalue=arr[1];
                String relationtype=new String();
                relationtype="1";
                context.write(new Text(mapkey),new Text(relationtype+"+"+mapvalue));
                //System.out.println(relationtype+"+"+mapvalue);
                relationtype="2";
                context.write(new Text(mapvalue),new Text(relationtype+"+"+mapkey));
                //System.out.println(relationtype+"+"+mapvalue);
            }
        }
        public static class Reduce extends Reducer<Text, Text, Text, Text>{
            public void reduce(Text key,Iterable<Text> values,Context context)
                    throws IOException,InterruptedException{
                int buyernum=0;
                String[] buyer=new String[20];
                int friendsnum=0;
                String[] friends=new String[20];
                Iterator ite=values.iterator();
                while(ite.hasNext()){
                    String record=ite.next().toString();
                    int len=record.length();
                    int i=2;
                    if(0==len){
                        continue;
                    }
                    char relationtype=record.charAt(0);
                    if('1'==relationtype){
                        buyer [buyernum]=record.substring(i);
                        buyernum++;
                    }
                    if('2'==relationtype){
                        friends[friendsnum]=record.substring(i);
                        friendsnum++;
                    }
                }
                if(0!=buyernum&&0!=friendsnum){
                    for(int m=0;m<buyernum;m++){
                        for(int n=0;n<friendsnum;n++){
                            if(buyer[m]!=friends[n]){
                                context.write(new Text(buyer[m]),new Text(friends[n]));
                            }
                        }
                    }
                }
            }
        }
        public static void main(String[] args) throws Exception{
    
            Configuration conf=new Configuration();
            String[] otherArgs=new String[2];
            otherArgs[0]="hdfs://192.168.51.100:8020/mymapreduce7/in/buyer1";
            otherArgs[1]="hdfs://192.168.51.100:8020/mymapreduce7/out";
            Job job=new Job(conf," Table join");
            job.setJarByClass(DanJoin.class);
            job.setMapperClass(Map.class);
            job.setReducerClass(Reduce.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
            System.exit(job.waitForCompletion(true)?0:1);
    
        }
    }

    结果:

    原理:

    以本实验的buyer1(buyer_id,friends_id)表为例来阐述单表连接的实验原理。单表连接,连接的是左表的buyer_id列和右表的friends_id列,且左表和右表是同一个表。因此,在map阶段将读入数据分割成buyer_id和friends_id之后,会将buyer_id设置成key,friends_id设置成value,直接输出并将其作为左表;再将同一对buyer_id和friends_id中的friends_id设置成key,buyer_id设置成value进行输出,作为右表。为了区分输出中的左右表,需要在输出的value中再加上左右表的信息,比如在value的String最开始处加上字符1表示左表,加上字符2表示右表。这样在map的结果中就形成了左表和右表,然后在shuffle过程中完成连接。reduce接收到连接的结果,其中每个key的value-list就包含了"buyer_idfriends_id--friends_idbuyer_id"关系。取出每个key的value-list进行解析,将左表中的buyer_id放入一个数组,右表中的friends_id放入一个数组,然后对两个数组求笛卡尔积就是最后的结果

  • 相关阅读:
    第十一周课程总结
    第七周课程总结&实验报告(五)
    第四周课程总结&试验报告(二)
    2019春总结作业
    第十二周编程总结
    第十一周编程总结
    JAVA学期总结
    第十四周课程总结&实验报告
    第十三周课程总结
    第十二周学习总结
  • 原文地址:https://www.cnblogs.com/Arisf/p/15576280.html
Copyright © 2011-2022 走看看