zoukankan      html  css  js  c++  java
  • Hadoop学习之路(二十八)MapReduce的API使用(五)

    求所有两两用户之间的共同好友

    数据格式

    A:B,C,D,F,E,O
    B:A,C,E,K
    C:F,A,D,I
    D:A,E,F,L
    E:B,C,D,M,L
    F:A,B,C,D,E,O,M
    G:A,C,D,E,F
    H:A,C,D,E,O
    I:A,O
    J:B,O
    K:A,C,D
    L:D,E,F
    M:E,F,G
    O:A,H,I,J,K

    以上是数据:
    A:B,C,D,F,E,O
    表示:B,C,D,E,F,O是A用户的好友。

      1 public class SharedFriend {
      2     /*
      3      第一阶段的map函数主要完成以下任务
      4      1.遍历原始文件中每行<所有朋友>信息
      5      2.遍历“朋友”集合,以每个“朋友”为键,原来的“人”为值  即输出<朋友,人>
      6      */
      7     static class SharedFriendMapper01 extends Mapper<LongWritable, Text, Text, Text>{
      8         @Override
      9         protected void map(LongWritable key, Text value,Context context)
     10                 throws IOException, InterruptedException {
     11             String line = value.toString();
     12             String[] person_friends = line.split(":");
     13             String person = person_friends[0];
     14             String[] friends = person_friends[1].split(",");
     15             
     16             for(String friend : friends){
     17                 context.write(new Text(friend), new Text(person));
     18             }
     19         }
     20     }
     21     
     22     /*
     23       第一阶段的reduce函数主要完成以下任务
     24       1.对所有传过来的<朋友,list(人)>进行拼接,输出<朋友,拥有这名朋友的所有人>
     25      */
     26     static class SharedFriendReducer01 extends Reducer<Text, Text, Text, Text>{
     27         @Override
     28         protected void reduce(Text key, Iterable<Text> values,Context context)
     29                 throws IOException, InterruptedException {
     30             StringBuffer sb = new StringBuffer();
     31             for(Text friend : values){
     32                 sb.append(friend.toString()).append(",");
     33             }
     34             sb.deleteCharAt(sb.length()-1);
     35             context.write(key, new Text(sb.toString()));
     36         }
     37     }
     38     
     39     /*
     40     第二阶段的map函数主要完成以下任务
     41     1.将上一阶段reduce输出的<朋友,拥有这名朋友的所有人>信息中的 “拥有这名朋友的所有人”进行排序 ,以防出现B-C C-B这样的重复
     42     2.将 “拥有这名朋友的所有人”进行两两配对,并将配对后的字符串当做键,“朋友”当做值输出,即输出<人-人,共同朋友>
     43      */
     44     static class SharedFriendMapper02 extends Mapper<LongWritable, Text, Text, Text>{
     45         @Override
     46         protected void map(LongWritable key, Text value,Context context)
     47                 throws IOException, InterruptedException {
     48             String line = value.toString();
     49             String[] friend_persons = line.split("	");
     50             String friend = friend_persons[0];
     51             String[] persons = friend_persons[1].split(",");
     52             Arrays.sort(persons); //排序
     53             
     54             //两两配对
     55             for(int i=0;i<persons.length-1;i++){
     56                 for(int j=i+1;j<persons.length;j++){
     57                     context.write(new Text(persons[i]+"-"+persons[j]+":"), new Text(friend));
     58                 }
     59             }
     60         }
     61     }
     62     
     63     /*
     64     第二阶段的reduce函数主要完成以下任务
     65     1.<人-人,list(共同朋友)> 中的“共同好友”进行拼接 最后输出<人-人,两人的所有共同好友>
     66      */
     67     static class SharedFriendReducer02 extends Reducer<Text, Text, Text, Text>{
     68         @Override
     69         protected void reduce(Text key, Iterable<Text> values,Context context)
     70                 throws IOException, InterruptedException {
     71             StringBuffer sb = new StringBuffer();
     72             Set<String> set = new HashSet<String>();
     73             for(Text friend : values){
     74                 if(!set.contains(friend.toString()))
     75                     set.add(friend.toString());
     76             }
     77             for(String friend : set){
     78                 sb.append(friend.toString()).append(",");
     79             }
     80             sb.deleteCharAt(sb.length()-1);
     81             
     82             context.write(key, new Text(sb.toString()));
     83         }
     84     }
     85     
     86     public static void main(String[] args)throws Exception {
     87         Configuration conf = new Configuration();
     88 
     89         //第一阶段
     90         Job job1 = Job.getInstance(conf);
     91         job1.setJarByClass(SharedFriend.class);
     92         job1.setMapperClass(SharedFriendMapper01.class);
     93         job1.setReducerClass(SharedFriendReducer01.class);
     94         
     95         job1.setOutputKeyClass(Text.class);
     96         job1.setOutputValueClass(Text.class);
     97         
     98         FileInputFormat.setInputPaths(job1, new Path("H:/大数据/mapreduce/sharedfriend/input"));
     99         FileOutputFormat.setOutputPath(job1, new Path("H:/大数据/mapreduce/sharedfriend/output"));
    100         
    101         boolean res1 = job1.waitForCompletion(true);
    102         
    103         //第二阶段
    104         Job job2 = Job.getInstance(conf);
    105         job2.setJarByClass(SharedFriend.class);
    106         job2.setMapperClass(SharedFriendMapper02.class);
    107         job2.setReducerClass(SharedFriendReducer02.class);
    108         
    109         job2.setOutputKeyClass(Text.class);
    110         job2.setOutputValueClass(Text.class);
    111         
    112         FileInputFormat.setInputPaths(job2, new Path("H:/大数据/mapreduce/sharedfriend/output"));
    113         FileOutputFormat.setOutputPath(job2, new Path("H:/大数据/mapreduce/sharedfriend/output01"));
    114         
    115         boolean res2 = job2.waitForCompletion(true);
    116         
    117         System.exit(res1?0:1);
    118     }
    119 }

    第一阶段输出结果

     1 A    F,I,O,K,G,D,C,H,B
     2 B    E,J,F,A
     3 C    B,E,K,A,H,G,F
     4 D    H,C,G,F,E,A,K,L
     5 E    A,B,L,G,M,F,D,H
     6 F    C,M,L,A,D,G
     7 G    M
     8 H    O
     9 I    O,C
    10 J    O
    11 K    O,B
    12 L    D,E
    13 M    E,F
    14 O    A,H,I,J,F
    View Code

    第二阶段输出结果

     1 A-B    C,E
     2 A-C    D,F
     3 A-D    E,F
     4 A-E    C,B,D
     5 A-F    E,O,C,D,B
     6 A-G    F,C,E,D
     7 A-H    D,O,C,E
     8 A-I    O
     9 A-J    B,O
    10 A-K    C,D
    11 A-L    D,E,F
    12 A-M    E,F
    13 B-C    A
    14 B-D    A,E
    15 B-E    C
    16 B-F    A,C,E
    17 B-G    E,C,A
    18 B-H    A,E,C
    19 B-I    A
    20 B-K    A,C
    21 B-L    E
    22 B-M    E
    23 B-O    K,A
    24 C-D    F,A
    25 C-E    D
    26 C-F    D,A
    27 C-G    D,F,A
    28 C-H    D,A
    29 C-I    A
    30 C-K    A,D
    31 C-L    D,F
    32 C-M    F
    33 C-O    I,A
    34 D-E    L
    35 D-F    A,E
    36 D-G    F,A,E
    37 D-H    A,E
    38 D-I    A
    39 D-K    A
    40 D-L    F,E
    41 D-M    F,E
    42 D-O    A
    43 E-F    C,D,M,B
    44 E-G    C,D
    45 E-H    C,D
    46 E-J    B
    47 E-K    D,C
    48 E-L    D
    49 F-G    C,E,D,A
    50 F-H    D,O,A,E,C
    51 F-I    A,O
    52 F-J    O,B
    53 F-K    D,C,A
    54 F-L    D,E
    55 F-M    E
    56 F-O    A
    57 G-H    E,C,D,A
    58 G-I    A
    59 G-K    D,A,C
    60 G-L    F,E,D
    61 G-M    E,F
    62 G-O    A
    63 H-I    A,O
    64 H-J    O
    65 H-K    C,D,A
    66 H-L    D,E
    67 H-M    E
    68 H-O    A
    69 I-J    O
    70 I-K    A
    71 I-O    A
    72 K-L    D
    73 K-O    A
    74 L-M    F,E
    View Code
  • 相关阅读:
    什么是线程组,为什么在 Java 中不推荐使用?
    什么是 FutureTask?使用 ExecutorService 启动任务?
    Java 中用到的线程调度算法是什么?
    什么是阻塞队列?阻塞队列的实现原理是什么?如何使用 阻塞队列来实现生产者-消费者模型?
    说说对 SQL 语句优化有哪些方法?(选择几条)
    什么是 Executors 框架?
    Java Concurrency API 中的 Lock 接口(Lock interface) 是什么?对比同步它有什么优势?
    什么是原子操作?在 Java Concurrency API 中有哪些原 子类(atomic classes)?
    Java 中你怎样唤醒一个阻塞的线程?
    你将如何使用 thread dump?你将如何分析 Thread dump?
  • 原文地址:https://www.cnblogs.com/qingyunzong/p/8616476.html
Copyright © 2011-2022 走看看