zoukankan      html  css  js  c++  java
  • 【Hadoop离线基础总结】MapReduce 社交粉丝数据分析 求出哪些人两两之间有共同好友,及他俩的共同好友都有谁?

    MapReduce 社交粉丝数据分析


    求出哪些人两两之间有共同好友,及他俩的共同好友都有谁?

    • 用户及好友数据
    A:B,C,D,F,E,O
    B:A,C,E,K
    C:F,A,D,I
    D:A,E,F,L
    E:B,C,D,M,L
    F:A,B,C,D,E,O,M
    G:A,C,D,E,F
    H:A,C,D,E,O
    I:A,O
    J:B,O
    K:A,C,D
    L:D,E,F
    M:E,F,G
    O:A,H,I,J
    
    • java代码

    需要两步完成需求
    首先先创建第一步的package
    在package中定义main、Mapper、Reducer三个类

    定义一个Mapper类

    package cn.itcast.demo1.step1;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class Step1Mapper extends Mapper<LongWritable, Text, Text, Text> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //输入数据如下格式  A:B,C,D,E,O
            //将用户和好友列表分开
            String[] split = value.toString().split(":");
            //将好友列表分开,放到一个数组中去
            String[] friendList = split[1].split(",");
            //循环遍历,输出的k2,v2格式为 B [A,E]
            for (String friend : friendList) {
                context.write(new Text(friend), new Text(split[0]));
            }
        }
    }
    

    定义一个Reducer类

    package cn.itcast.demo1.step1;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class Step1Reducer extends Reducer<Text,Text,Text,Text> {
        /*
        reduce接收到数据是 B [A,E]
        B是好友,集合里面装的是多个用户
        将数据最终转换成这样的形式进行输出 A-B-E-F-G-H-K-  C
         */
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            //创建StringBuffer对象
            StringBuffer sb = new StringBuffer();
            //循环遍历得到v2并拼接成字符串
            for (Text value : values) {
                sb.append(value.toString()).append("-");
            }
            context.write(new Text(sb.toString()),key);
        }
    }
    

    程序main函数入口

    package cn.itcast.demo1.step1;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class Step1Main extends Configured implements Tool {
        @Override
        public int run(String[] args) throws Exception {
            //创建job对象
            Job job = Job.getInstance(super.getConf(), "step1");
            //输入数据,设置输入路径
            job.setInputFormatClass(TextInputFormat.class);
            TextInputFormat.setInputPaths(job, new Path("file:////Volumes/赵壮备份/大数据离线课程资料/5.大数据离线第五天/共同好友/input/friends.txt"));
    
            //自定义map逻辑
            job.setMapperClass(Step1Mapper.class);
            //设置k2,v2输出类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
    
            //自定义reduce逻辑
            job.setReducerClass(Step1Reducer.class);
            //设置k3,v3输出类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            //输出数据,设置输出路径
            job.setOutputFormatClass(TextOutputFormat.class);
            TextOutputFormat.setOutputPath(job, new Path("file:////Volumes/赵壮备份/大数据离线课程资料/5.大数据离线第五天/共同好友/step1_output"));
    
            //将任务提交至集群
            boolean b = job.waitForCompletion(true);
            return b ? 0 : 1;
        }
    
        public static void main(String[] args) throws Exception {
            int run = ToolRunner.run(new Configuration(), new Step1Main(), args);
            System.exit(run);
        }
    }
    

    运行完成后,得到第一步的数据

    F-D-O-I-H-B-K-G-C-	A
    E-A-J-F-	B
    K-A-B-E-F-G-H-	C
    G-K-C-A-E-L-F-H-	D
    G-F-M-B-H-A-L-D-	E
    M-D-L-A-C-G-	F
    M-	G
    O-	H
    C-O-	I
    O-	J
    B-	K
    E-D-	L
    F-E-	M
    J-I-H-A-F-	O
    

    创建第二步的package
    在package中定义main、Mapper、Reducer三个类

    定义一个Mapper类

    package cn.itcast.demo1.step2;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    import java.util.Arrays;
    
    public class Step2Mapper extends Mapper<LongWritable, Text, Text, Text> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //对拿到的数据进行分割,得到用户列表和好友
            String[] split = value.toString().split("	");
            //再对用户列表进行分割,得到用户列表数组
            String[] userList = split[0].split("-");
            //因为文件中的数据并不是按照字典顺序进行排序,所以有可能会出来A-E E-A的情况,reduceTask是无法将这种情况视为key相同的
            //所以需要进行排序
            Arrays.sort(userList);
            for (int i = 0; i < userList.length - 1; i++) {
                for (int j = i + 1; j < userList.length; j++) {
                    String userTwo = userList[i] + "-" + userList[j];
                    context.write(new Text(userTwo), new Text(split[1]));
                }
            }
        }
    }
    

    定义一个reducer类

    package cn.itcast.demo1.step2;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class Step2Reducer extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            //创建StringBuffer对象
            StringBuffer sb = new StringBuffer();
            for (Text value : values) {
                //获取共同好友列表
                sb.append(value.toString()).append("-");
            }
            context.write(key, new Text(sb.toString()));
        }
    }
    

    程序main函数入口

    package cn.itcast.demo1.step2;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class Step2Main extends Configured implements Tool {
        @Override
        public int run(String[] args) throws Exception {
            //创建job对象
            Job job = Job.getInstance(super.getConf(), "step2");
            //输入数据,设置输入路径
            job.setInputFormatClass(TextInputFormat.class);
            TextInputFormat.setInputPaths(job, new Path("file:////Volumes/赵壮备份/大数据离线课程资料/5.大数据离线第五天/共同好友/step1_output"));
    
            //自定义map逻辑
            job.setMapperClass(Step2Mapper.class);
            //设置k2,v2输出类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
    
            //自定义reduce逻辑
            job.setReducerClass(Step2Reducer.class);
            //设置k3,v3输出类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            //输出数据,设置输出路径
            job.setOutputFormatClass(TextOutputFormat.class);
            TextOutputFormat.setOutputPath(job, new Path("file:////Volumes/赵壮备份/大数据离线课程资料/5.大数据离线第五天/共同好友/step2_output"));
    
            //提交任务至集群
            boolean b = job.waitForCompletion(true);
            return b ? 0 : 1;
        }
    
        public static void main(String[] args) throws Exception {
            int run = ToolRunner.run(new Configuration(), new Step2Main(), args);
            System.exit(run);
        }
    }
    

    运行结果为

    A-B	C-E-
    A-C	D-F-
    A-D	F-E-
    A-E	C-B-D-
    A-F	D-O-E-B-C-
    A-G	C-D-F-E-
    A-H	E-C-O-D-
    A-I	O-
    A-J	O-B-
    A-K	C-D-
    A-L	E-D-F-
    A-M	F-E-
    B-C	A-
    B-D	E-A-
    B-E	C-
    B-F	E-A-C-
    B-G	A-E-C-
    B-H	E-C-A-
    B-I	A-
    B-K	A-C-
    B-L	E-
    B-M	E-
    B-O	A-
    C-D	F-A-
    C-E	D-
    C-F	A-D-
    C-G	F-D-A-
    C-H	D-A-
    C-I	A-
    C-K	A-D-
    C-L	D-F-
    C-M	F-
    C-O	I-A-
    D-E	L-
    D-F	A-E-
    D-G	F-A-E-
    D-H	A-E-
    D-I	A-
    D-K	A-
    D-L	F-E-
    D-M	F-E-
    D-O	A-
    E-F	M-C-B-D-
    E-G	C-D-
    E-H	C-D-
    E-J	B-
    E-K	C-D-
    E-L	D-
    F-G	A-D-E-C-
    F-H	D-O-C-E-A-
    F-I	O-A-
    F-J	B-O-
    F-K	A-D-C-
    F-L	D-E-
    F-M	E-
    F-O	A-
    G-H	E-A-C-D-
    G-I	A-
    G-K	C-D-A-
    G-L	D-E-F-
    G-M	E-F-
    G-O	A-
    H-I	O-A-
    H-J	O-
    H-K	D-A-C-
    H-L	E-D-
    H-M	E-
    H-O	A-
    I-J	O-
    I-K	A-
    I-O	A-
    K-L	D-
    K-O	A-
    L-M	F-E-
    
  • 相关阅读:
    AnyConnect使用说明(手机版)
    AnyConnect使用说明(电脑版Windows)
    Linux中MySQL中文乱码问题
    Redis的最常被问到知识点总结
    DML、DDL、DCL是什么?
    刨死你系列——手撕ArrayList
    刨死你系列——LinkedHashMap剖析(基于jdk1.8)
    mysql架构与存储引擎 (Myisam与Innodb)
    面试有关TCP常问的几个问题
    刨死你系列——HashMap剖析(基于jdk1.8)
  • 原文地址:https://www.cnblogs.com/zzzsw0412/p/12772489.html
Copyright © 2011-2022 走看看