zoukankan      html  css  js  c++  java
  • (mapreduce题) 找出有共同好友的 users --好好玩

    usr:friend,friend,friend...
    ---------------

     

    仅代表个人意见,希望对你帮助

     

    A:B,C,D,F,E,O
    
    B:A,C,E,K
    
    C:F,A,D,I
    
    D:A,E,F,L
    
    E:B,C,D,M,L
    
    F:A,B,C,D,E,O,M
    
    G:A,C,D,E,F
    
    H:A,C,D,E,O
    
    I:A,O
    
    J:B,O
    
    K:A,C,D
    
    L:D,E,F
    
    M:E,F,G
    
    O:A,H,I,J
     

    最终结果:

    A,B C,E 
    A,C D,F 
    A,D F,E 
    A,F B,C,D,E,O
    B,E C
    C,F A,D 
    D,E L
    D,F A,E 
    D,L E,F 
    E,L D
    F,M E
    H,O A
    I,O A

    ================================================

    提示:

    1.对于这样一个usr(A), 他的好友数list为(A:B,C,D,F,E,O) 分组如下:

     

    (A B)->(A B C D F E O)

    (A C)->(A B C D F E O)

    (A D)->(A B C D F E O)

    (A F)->(A B C D F E O)

    (A E)->(A B C D F E O)

    (A O->(A B C D F E O)

     

      2.假设有另一个usr(C), 她的好友数list为(C:A,B,E), 将其按照上面规则分组(注意需要compare每一个键和usr,小地放前边):

    (A C)->(A B E)

    (B C)->(A B E)

    (E C)->(A B E)

      3.倒排索引后,shuffle后的结果:

    (A C)->(A B C D F E O)(A B E)

     4.因为(A C)的values组的个数>1 ,则我们可以很确定,一定有一组的user是A,另一组的user是C

    接下来找出(A B C D F E O)(A B E)的交集(A B E)。
    排除掉两个user(A C)后集合的个数>=1,因此我们可以确定(A C)有共同好友E。

    按照此逻辑,请写出mapreduce代码:
    package com.bw.hadoop;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    
    public class GX1 {
            public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
                 Configuration config = new Configuration();
                 config.set("fs.defaultFS", "hdfs://192.168.0.100:9000");
                 config.set("yarn.resourcemanager.hostname", "192.168.0.100");
                   
               FileSystem fs = FileSystem.get(config);
                    Job job = Job.getInstance(config);
                 
                 job.setJarByClass(GX1.class);
                 
                  //设置所用到的map类
                  job.setMapperClass(myMapper.class);
                  job.setMapOutputKeyClass(Text.class);
                  job.setMapOutputValueClass(Text.class);
                  
                  //设置所用到的reduce类
                  job.setReducerClass(myRedu.class);
                  job.setOutputKeyClass(Text.class);
                  job.setOutputValueClass(Text.class);
                  
                  //设置输入输出地址
                  FileInputFormat.addInputPath(job, new Path("/input/10.txt"));
                  
                  Path path = new Path("/out14");
                  
                  //job.setPartitionerClass(myPartitioner.class);
                  //job.setNumReduceTasks(2);
                  
                  //判断目录文件是否存在,存在的话,删除
                  if(fs.exists(path)){
                      fs.delete(path, true);
                  }
                  //指定结果文件的输出地址
                  FileOutputFormat.setOutputPath(job,path);
                  
                  //启动处理任务job
                  boolean completion = job.waitForCompletion(true);
                  if(completion){
                      System.out.println("Job SUCCESS!");
                      GX2.main(args);
                  }
            }
            
            public static class myMapper extends Mapper<LongWritable, Text, Text, Text>{
                Text k=new Text();
                Text v=new Text();
                @Override
                protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
                        String line = value.toString();
                        String[] split = line.split(":");
                        String person = split[0];
                        String[] splits = split[1].split(",");
                      for(String spl:splits){
                          k.set(spl);
                          v.set(person);
                           context.write(k, v);
                      }
                 }
                
                
            }
            
            public static class myRedu extends Reducer<Text, Text, Text, Text>{
    
                @Override
                protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
                    throws IOException, InterruptedException {
                       StringBuffer stb = new StringBuffer();
                          for(Text value:values){
                              if(stb.length()!=0){
                                     stb.append(",");
                              }
                                 stb.append(value);
                           }
                              context.write(key, new Text(stb.toString()));
            
                        }        
                
                }
    }
    package com.bw.hadoop;
    
    import java.io.IOException;
    import java.util.Arrays;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    
    public class GX2 {
          public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
              Configuration config = new Configuration();
              config.set("fs.defaultFS", "hdfs://192.168.0.100:9000");
               config.set("yarn.resourcemanager.hostname", "192.168.0.100");
                
                FileSystem fs = FileSystem.get(config);
                  Job job = Job.getInstance(config);
               
               job.setJarByClass(GX2.class);
               
                //设置所用到的map类
                job.setMapperClass(myMapper.class);
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(Text.class);
                
                //设置所用到的reduce类
                job.setReducerClass(myRedu.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
                
                //设置输入输出地址
                FileInputFormat.addInputPath(job, new Path("/out14/part-r-00000"));
                
                Path path = new Path("/out15");
                
                //job.setPartitionerClass(myPartitioner.class);
                //job.setNumReduceTasks(2);
                
                //判断目录文件是否存在,存在的话,删除
                if(fs.exists(path)){
                    fs.delete(path, true);
                }
                //指定结果文件的输出地址
                FileOutputFormat.setOutputPath(job,path);
                
                //启动处理任务job
                boolean completion = job.waitForCompletion(true);
                if(completion){
                    System.out.println("Job SUCCESS!");
                }
          }
          
          public static class myMapper extends Mapper<LongWritable, Text, Text, Text>{
    
            @Override
            protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                    throws IOException, InterruptedException {
                    String line = value.toString();
                    String[] split = line.split("	");
                    String friend = split[0];
                    String[] persons = split[1].split(",");
                    
                    Arrays.sort(persons);
                    
                    for(int i=0; i<persons.length-2; i++){
                        for (int j=i+1; j<persons.length-1; j++){
                            context.write(new Text(persons[i]+"-"+persons[j]), new Text(friend));
                        }
                    }
                    
            }
              
          }
          
          public static class myRedu extends Reducer<Text, Text, Text, Text>{
    
            @Override
            protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
                    throws IOException, InterruptedException {
                   StringBuffer stb = new StringBuffer();
                   for(Text value: values){
                         stb.append(value).append(" ");
                   }
                   context.write(key, new Text(stb.toString()));
            }
    
            
                    
          }
    }


  • 相关阅读:
    Oracle忘记用户名和密码以及管理员用户新增修改删除用户
    Oracle11.2安装和完全卸载及卸载后重装等问题
    软件测试之路2
    软件测试之路1
    Git入门笔记
    CentOS 6.5下二进制安装 MySQL 5.6
    十款压力测试工具
    tomcat 内存设置
    tomcat 安全
    tomcat 模式详解
  • 原文地址:https://www.cnblogs.com/songhaowan/p/7239578.html
Copyright © 2011-2022 走看看