zoukankan      html  css  js  c++  java
  • (mapreduce题) 找出有共同好友的 users --好好玩

    usr:friend,friend,friend...
    ---------------

     

    仅代表个人意见,希望对你帮助

     

    A:B,C,D,F,E,O
    
    B:A,C,E,K
    
    C:F,A,D,I
    
    D:A,E,F,L
    
    E:B,C,D,M,L
    
    F:A,B,C,D,E,O,M
    
    G:A,C,D,E,F
    
    H:A,C,D,E,O
    
    I:A,O
    
    J:B,O
    
    K:A,C,D
    
    L:D,E,F
    
    M:E,F,G
    
    O:A,H,I,J
     

    最终结果:

    A,B C,E 
    A,C D,F 
    A,D F,E 
    A,F B,C,D,E,O
    B,E C
    C,F A,D 
    D,E L
    D,F A,E 
    D,L E,F 
    E,L D
    F,M E
    H,O A
    I,O A

    ================================================

    提示:

    1.对于这样一个usr(A), 他的好友数list为(A:B,C,D,F,E,O) 分组如下:

     

    (A B)->(A B C D F E O)

    (A C)->(A B C D F E O)

    (A D)->(A B C D F E O)

    (A F)->(A B C D F E O)

    (A E)->(A B C D F E O)

    (A O->(A B C D F E O)

     

      2.假设有另一个usr(C), 她的好友数list为(C:A,B,E), 将其按照上面规则分组(注意需要compare每一个键和usr,小地放前边):

    (A C)->(A B E)

    (B C)->(A B E)

    (E C)->(A B E)

      3.倒排索引后,shuffle后的结果:

    (A C)->(A B C D F E O)(A B E)

     4.因为(A C)的values组的个数>1 ,则我们可以很确定,一定有一组的user是A,另一组的user是C

    接下来找出(A B C D F E O)(A B E)的交集(A B E)。
    排除掉两个user(A C)后集合的个数>=1,因此我们可以确定(A C)有共同好友E。

    按照此逻辑,请写出mapreduce代码:
    package com.bw.hadoop;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    
    public class GX1 {
            public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
                 Configuration config = new Configuration();
                 config.set("fs.defaultFS", "hdfs://192.168.0.100:9000");
                 config.set("yarn.resourcemanager.hostname", "192.168.0.100");
                   
               FileSystem fs = FileSystem.get(config);
                    Job job = Job.getInstance(config);
                 
                 job.setJarByClass(GX1.class);
                 
                  //设置所用到的map类
                  job.setMapperClass(myMapper.class);
                  job.setMapOutputKeyClass(Text.class);
                  job.setMapOutputValueClass(Text.class);
                  
                  //设置所用到的reduce类
                  job.setReducerClass(myRedu.class);
                  job.setOutputKeyClass(Text.class);
                  job.setOutputValueClass(Text.class);
                  
                  //设置输入输出地址
                  FileInputFormat.addInputPath(job, new Path("/input/10.txt"));
                  
                  Path path = new Path("/out14");
                  
                  //job.setPartitionerClass(myPartitioner.class);
                  //job.setNumReduceTasks(2);
                  
                  //判断目录文件是否存在,存在的话,删除
                  if(fs.exists(path)){
                      fs.delete(path, true);
                  }
                  //指定结果文件的输出地址
                  FileOutputFormat.setOutputPath(job,path);
                  
                  //启动处理任务job
                  boolean completion = job.waitForCompletion(true);
                  if(completion){
                      System.out.println("Job SUCCESS!");
                      GX2.main(args);
                  }
            }
            
            public static class myMapper extends Mapper<LongWritable, Text, Text, Text>{
                Text k=new Text();
                Text v=new Text();
                @Override
                protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
                        String line = value.toString();
                        String[] split = line.split(":");
                        String person = split[0];
                        String[] splits = split[1].split(",");
                      for(String spl:splits){
                          k.set(spl);
                          v.set(person);
                           context.write(k, v);
                      }
                 }
                
                
            }
            
            public static class myRedu extends Reducer<Text, Text, Text, Text>{
    
                @Override
                protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
                    throws IOException, InterruptedException {
                       StringBuffer stb = new StringBuffer();
                          for(Text value:values){
                              if(stb.length()!=0){
                                     stb.append(",");
                              }
                                 stb.append(value);
                           }
                              context.write(key, new Text(stb.toString()));
            
                        }        
                
                }
    }
    package com.bw.hadoop;
    
    import java.io.IOException;
    import java.util.Arrays;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    
    public class GX2 {
          public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
              Configuration config = new Configuration();
              config.set("fs.defaultFS", "hdfs://192.168.0.100:9000");
               config.set("yarn.resourcemanager.hostname", "192.168.0.100");
                
                FileSystem fs = FileSystem.get(config);
                  Job job = Job.getInstance(config);
               
               job.setJarByClass(GX2.class);
               
                //设置所用到的map类
                job.setMapperClass(myMapper.class);
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(Text.class);
                
                //设置所用到的reduce类
                job.setReducerClass(myRedu.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
                
                //设置输入输出地址
                FileInputFormat.addInputPath(job, new Path("/out14/part-r-00000"));
                
                Path path = new Path("/out15");
                
                //job.setPartitionerClass(myPartitioner.class);
                //job.setNumReduceTasks(2);
                
                //判断目录文件是否存在,存在的话,删除
                if(fs.exists(path)){
                    fs.delete(path, true);
                }
                //指定结果文件的输出地址
                FileOutputFormat.setOutputPath(job,path);
                
                //启动处理任务job
                boolean completion = job.waitForCompletion(true);
                if(completion){
                    System.out.println("Job SUCCESS!");
                }
          }
          
          public static class myMapper extends Mapper<LongWritable, Text, Text, Text>{
    
            @Override
            protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                    throws IOException, InterruptedException {
                    String line = value.toString();
                    String[] split = line.split("	");
                    String friend = split[0];
                    String[] persons = split[1].split(",");
                    
                    Arrays.sort(persons);
                    
                    for(int i=0; i<persons.length-2; i++){
                        for (int j=i+1; j<persons.length-1; j++){
                            context.write(new Text(persons[i]+"-"+persons[j]), new Text(friend));
                        }
                    }
                    
            }
              
          }
          
          public static class myRedu extends Reducer<Text, Text, Text, Text>{
    
            @Override
            protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
                    throws IOException, InterruptedException {
                   StringBuffer stb = new StringBuffer();
                   for(Text value: values){
                         stb.append(value).append(" ");
                   }
                   context.write(key, new Text(stb.toString()));
            }
    
            
                    
          }
    }


  • 相关阅读:
    洛谷—— P2234 [HNOI2002]营业额统计
    BZOJ——3555: [Ctsc2014]企鹅QQ
    CodeVs——T 4919 线段树练习4
    python(35)- 异常处理
    August 29th 2016 Week 36th Monday
    August 28th 2016 Week 36th Sunday
    August 27th 2016 Week 35th Saturday
    August 26th 2016 Week 35th Friday
    August 25th 2016 Week 35th Thursday
    August 24th 2016 Week 35th Wednesday
  • 原文地址:https://www.cnblogs.com/songhaowan/p/7239578.html
Copyright © 2011-2022 走看看