zoukankan      html  css  js  c++  java
  • MapReduce实战(六)共同粉丝

    需求:

    利用mapReduce实现类似微博中查找共同粉丝的功能。如下:

    A:B,C,D,F,E,O
    B:A,C,E,K
    C:F,A,D,I
    D:A,E,F,L
    E:B,C,D,M,L
    F:A,B,C,D,E,O,M
    G:A,C,D,E,F
    H:A,C,D,E,O
    I:A,O
    J:B,O
    K:A,C,D
    L:D,E,F
    M:E,F,G
    O:A,H,I,J

    求出哪些人两两之间有共同粉丝,及他俩的共同粉丝都是谁。
    比如:
    A,B  [C,E]

    分析:

    在利用MapReduce程序解答之前,我们不妨用单机程序练习一下,思路很简单,可以利用两个for循环进行遍历,分别找之间的共同好友,如果有则存到list中,设一个map,key就是两个人的ID,value就是存的list,最后就能求得两个人之间的共同好友。程序如下:

    package com.darrenchan.test;
    
    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.InputStreamReader;
    import java.util.ArrayList;
    import java.util.LinkedHashMap;
    import java.util.List;
    import java.util.Map;
    
    public class Test {
        public static void main(String[] args) throws Exception {
            FileInputStream fis = new FileInputStream(new File("data.txt"));
            InputStreamReader isr = new InputStreamReader(fis);
            BufferedReader br = new BufferedReader(isr);
            String line = null;
    
            // 将文件中的内容存到list中
            List<String> list = new ArrayList<String>();
            while ((line = br.readLine()) != null) {
                list.add(line);
            }
    
            Map<String, List<String>> map = new LinkedHashMap<>();
            // 对list进行处理
            for (int i = 0; i < list.size(); i++) {
                for (int j = i + 1; j < list.size(); j++) {
                    //临时的list,用于拼接最后结果中的共同好友
                    List<String> tempList = new ArrayList<>();
                    //按照":"进行分割
                    String keyi = list.get(i).split(":")[0];
                    String keyj = list.get(j).split(":")[0];
                    String contenti = list.get(i).split(":")[1];
                    String contentj = list.get(j).split(":")[1];
    
                    //让i层的每一个好友分别和j层的好友找共同好友
                    String[] fields = contenti.split(",");
                    for (int k = 0; k < fields.length; k++) {
                        if (contentj.contains(fields[k])) {
                            tempList.add(fields[k]);
                        }
                    }
                    
                    // 如果tempList里面有内容说明就是有相同元素
                    if (tempList.size() > 0) {
                        map.put(keyi + "," + keyj, tempList);
                    }
                }
            }
    
            // 打印map
            for (String key : map.keySet()) {
                System.out.println(key + ":" + map.get(key));
            }
        }
    }

    求得结果:

    A,B [C, E]
    A,C [D, F]
    A,D [F, E]
    A,E [B, C, D]
    A,F [B, C, D, E, O]
    A,G [C, D, F, E]
    A,H [C, D, E, O]
    A,I [O]
    A,J [B, O]
    A,K [C, D]
    A,L [D, F, E]
    A,M [F, E]
    B,C [A]
    B,D [A, E]
    B,E [C]
    B,F [A, C, E]
    B,G [A, C, E]
    B,H [A, C, E]
    B,I [A]
    B,K [A, C]
    B,L [E]
    B,M [E]
    B,O [A]
    C,D [F, A]
    C,E [D]
    C,F [A, D]
    C,G [F, A, D]
    C,H [A, D]
    C,I [A]
    C,K [A, D]
    C,L [F, D]
    C,M [F]
    C,O [A, I]
    D,E [L]
    D,F [A, E]
    D,G [A, E, F]
    D,H [A, E]
    D,I [A]
    D,K [A]
    D,L [E, F]
    D,M [E, F]
    D,O [A]
    E,F [B, C, D, M]
    E,G [C, D]
    E,H [C, D]
    E,J [B]
    E,K [C, D]
    E,L [D]
    F,G [A, C, D, E]
    F,H [A, C, D, E, O]
    F,I [A, O]
    F,J [B, O]
    F,K [A, C, D]
    F,L [D, E]
    F,M [E]
    F,O [A]
    G,H [A, C, D, E]
    G,I [A]
    G,K [A, C, D]
    G,L [D, E, F]
    G,M [E, F]
    G,O [A]
    H,I [A, O]
    H,J [O]
    H,K [A, C, D]
    H,L [D, E]
    H,M [E]
    H,O [A]
    I,J [O]
    I,K [A]
    I,O [A]
    K,L [D]
    K,O [A]
    L,M [E, F]

    接下来我们思考:如何用MapReduce的程序进行求解呢?

    一般如果一个步骤解决不了的问题,我们通常会采用两个步骤来进行求解。在本题中,我们进行思考,让求任意两个人的共同粉丝,那么我们不妨先求得某一个人是哪些人的粉丝,比如:B是A,E,F,G的粉丝,这是第一步我们需要求的。第二步呢?我们就两两配对,AE共同粉丝有B,AF共同粉丝有B,AG共同粉丝有B......然后reduce合并一下即可。

    ShareFriendsStepOne.java:

    package com.darrenchan.sharefriends;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class ShareFriendsStepOne {
        
        public static class ShareFriendsStepOneMapper extends Mapper<LongWritable, Text, Text, Text>{
            Text keyText = new Text();
            Text valueText = new Text();
            /**
             * 拿到的数据格式是A:B,C,D,F,E,O
             */
            @Override
            protected void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                String line = value.toString();
                //按照":"进行分割
                String person = line.split(":")[0];
                String content = line.split(":")[1];
                //该person下的所有fans
                String[] fans = content.split(",");
                valueText.set(person);
                for (int i = 0; i < fans.length; i++) {
                    keyText.set(fans[i]);
                    context.write(keyText, valueText);
                }
            }
        }
        
        
        
        public static class ShareFriendsStepOneReducer extends Reducer<Text, Text, Text, Text>{
            /**
             * 拿到的数据格式是<B,A E F G>,即B是AEFG的粉丝
             */
            Text valueText = new Text();
            @Override
            protected void reduce(Text key, Iterable<Text> values, Context context)
                    throws IOException, InterruptedException {
                StringBuffer sb = new StringBuffer();
                for (Text fan : values) {
                    sb.append(fan).append(",");
                }
                //最后多了一个“,”,把它消掉
                String outFans = sb.substring(0, sb.length()-1);
                
                valueText.set(outFans);
                context.write(key, valueText);
            }
        }
        
        
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            
            job.setJarByClass(ShareFriendsStepOne.class);
            
            job.setMapperClass(ShareFriendsStepOneMapper.class);
            job.setReducerClass(ShareFriendsStepOneReducer.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            System.exit(job.waitForCompletion(true) ? 0 : 1);
            
        }
        
    }

    ShareFriendsStepTwo.java:

    package com.darrenchan.sharefriends;
    
    import java.io.IOException;
    import java.util.Arrays;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class ShareFriendsStepTwo {
    
        public static class ShareFriendsStepTwoMapper extends Mapper<LongWritable, Text, Text, Text> {
            Text keyText = new Text();
            Text valueText = new Text();
    
            /**
             * 拿到的数据格式是A I,K,C,B,G,F,H,O,D 即A是I,K,C,B,G,F,H,O,D的粉丝,然后将后面的两两配对
             */
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String line = value.toString();
                String fan = line.split("	")[0];
                String content = line.split("	")[1];
                String[] persons = content.split(",");
    
                // 将persons进行排序
                Arrays.sort(persons);
    
                valueText.set(fan);
                for (int i = 0; i < persons.length; i++) {
                    for (int j = i + 1; j < persons.length; j++) {
                        keyText.set(persons[i] + "," + persons[j]);
                        context.write(keyText, valueText);
                    }
                }
            }
        }
    
        public static class ShareFriendsStepTwoReducer extends Reducer<Text, Text, Text, Text> {
            /**
             * 拿到的数据格式是<AB,C E>,即AB之间的共同粉丝有CE
             */
            Text valueText = new Text();
    
            @Override
            protected void reduce(Text key, Iterable<Text> values, Context context)
                    throws IOException, InterruptedException {
                StringBuffer sb = new StringBuffer();
                sb.append("[");
                for (Text fan : values) {
                    sb.append(fan).append(",");
                }
                sb.append("]");
                //去掉多余的“,”
                sb.deleteCharAt(sb.length()-2);
                
                valueText.set(sb.toString());
                context.write(key, valueText);
            }
        }
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            job.setJarByClass(ShareFriendsStepTwo.class);
    
            job.setMapperClass(ShareFriendsStepTwoMapper.class);
            job.setReducerClass(ShareFriendsStepTwoReducer.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
    
        }
    
    }

    求得结果同上:

    A,B [E,C]
    A,C [D,F]
    A,D [E,F]
    A,E [D,B,C]
    A,F [O,B,C,D,E]
    A,G [F,E,C,D]
    A,H [E,C,D,O]
    A,I [O]
    A,J [O,B]
    A,K [D,C]
    A,L [F,E,D]
    A,M [E,F]
    B,C [A]
    B,D [A,E]
    B,E [C]
    B,F [E,A,C]
    B,G [C,E,A]
    B,H [A,E,C]
    B,I [A]
    B,K [C,A]
    B,L [E]
    B,M [E]
    B,O [A]
    C,D [A,F]
    C,E [D]
    C,F [D,A]
    C,G [D,F,A]
    C,H [D,A]
    C,I [A]
    C,K [A,D]
    C,L [D,F]
    C,M [F]
    C,O [I,A]
    D,E [L]
    D,F [A,E]
    D,G [E,A,F]
    D,H [A,E]
    D,I [A]
    D,K [A]
    D,L [E,F]
    D,M [F,E]
    D,O [A]
    E,F [D,M,C,B]
    E,G [C,D]
    E,H [C,D]
    E,J [B]
    E,K [C,D]
    E,L [D]
    F,G [D,C,A,E]
    F,H [A,D,O,E,C]
    F,I [O,A]
    F,J [B,O]
    F,K [D,C,A]
    F,L [E,D]
    F,M [E]
    F,O [A]
    G,H [D,C,E,A]
    G,I [A]
    G,K [D,A,C]
    G,L [D,F,E]
    G,M [E,F]
    G,O [A]
    H,I [O,A]
    H,J [O]
    H,K [A,C,D]
    H,L [D,E]
    H,M [E]
    H,O [A]
    I,J [O]
    I,K [A]
    I,O [A]
    K,L [D]
    K,O [A]
    L,M [E,F]

  • 相关阅读:
    【Java123】javapoet代码生成代码工具
    【Python123】OptionParser初使用
    【Spring123】JdbcTemplate初使用 以及 ORA-01858: a non-numeric character was found where a numeric was expected, ORA-00911: invalid character解决
    【Java123】解决javax.net.ssl.SSLPeerUnverifiedException: peer not authenticated
    git常用命令
    在虚拟机上搭建自己的 git 服务器并创建 git 仓库
    git用法
    Golang gRPC框架3-自签证书验证
    go _nsq
    mysql的备份和恢复
  • 原文地址:https://www.cnblogs.com/DarrenChan/p/6754506.html
Copyright © 2011-2022 走看看