zoukankan      html  css  js  c++  java
  • 大数据学习16_shuffle阶段&求共同好友项目实战

    shuffle阶段

    概念

    shuffle 是 Mapreduce 的核心,它分布在 Mapreduce 的 map 阶段和 reduce 阶段。一般把从 Map 产生输出开始到 Reduce 取得数据作为输入之前的过程称作 shuffle。

    一张图看懂Mapreduce全过程

    概念解释

    1. Collect阶段 :将 MapTask 的结果输出到默认大小为 100M 的环形缓冲区,保存的是 key/value,Partition 分区信息等。
    2. Spill阶段 :当内存中的数据量达到一定的阀值的时候,就会将数据写入本地磁盘, 在将数据写入磁盘之前需要对数据进行一次排序的操作,如果配置了 combiner,还会将 有相同分区号和 key 的数据进行排序。
    3. Merge阶段 :把所有溢出的临时文件进行一次合并操作,以确保一个 MapTask 最终只 产生一个中间数据文件。
    4. Copy阶段 :ReduceTask 启动 Fetcher 线程到已经完成 MapTask 的节点上复制一份属于 自己的数据,这些数据默认会保存在内存的缓冲区中,当内存的缓冲区达到一定的阀值 的时候,就会将数据写到磁盘之上。
    5. Merge阶段 :在 ReduceTask 远程复制数据的同时,会在后台开启两个线程对内存到本 地的数据文件进行合并操作。
    6. Sort阶段 :在对数据进行合并的同时,会进行排序操作,由于 MapTask 阶段已经对数 据进行了局部的排序,ReduceTask 只需保证 Copy 的数据的最终整体有效性即可。 Shule 中的缓冲区大小会影响到 mapreduce 程序的执行效率,原则上说,缓冲区越大, 磁盘io的次数越少,执行速度就越快 缓冲区的大小可以通过参数调整, 参数:mapreduce.task.io.sort.mb 默认100M

    共同好友案例

    需求分析

    以下是qq的好友列表数据,冒号前是一个用户,冒号后是该用户的所有好友(数据中的好友 关系是单向的)

     求出哪些人两两之间有共同好友,及他俩的共同好友都有谁?

    过程分析

    mapreduce最重要的是根据自己的项目确定  K2和V2的类型,此案例中要用俩个mapreduce实现,将第一个mapreduce的输出作为第二个mapreduce的输入,第一个mapreduce要实现A-O分别是谁的好友,例如:A-E B;A-C-E D   B是A和E 的好友,D是A,C和E的好友。我们求的是俩俩之间的好友,要把A-C-E拆分成A-C A-E C-E;第二个阶段实现共同好友的求取。 

    一个图看懂实现过程

    代码实现

    第一个mapper

     1 package Commond_friends;
     2 
     3 import org.apache.hadoop.io.LongWritable;
     4 import org.apache.hadoop.io.Text;
     5 import org.apache.hadoop.mapreduce.Mapper;
     6 
     7 import java.io.IOException;
     8 
     9 //   K1行偏移量没有意义 ,V1是每行的内容
    10 public class FirstMapper extends Mapper<LongWritable, Text,Text,Text> {
    11     @Override
    12     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    13         //1:以冒号拆分行文本数据: 冒号左边就是V2
    14         String[] split = value.toString().split(":");
    15         String userStr = split[0];
    16         //2:将冒号右边的字符串以逗号拆分,每个成员就是K2
    17         String[] split1 = split[1].split(",");
    18         for (String s : split1) {
    19             //3:将K2和v2写入上下文中
    20             context.write(new Text(s),new Text(userStr));
    21         }
    22     }
    23 }

    第一个reduce

     1 package Commond_friends;
     2 
     3 import org.apache.hadoop.io.Text;
     4 import org.apache.hadoop.mapreduce.Reducer;
     5 
     6 import java.io.IOException;
     7 
     8 public class FirstReduce extends Reducer<Text,Text,Text,Text> {
     9     @Override
    10     protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    11         //1:遍历集合,并将每一个元素拼接,得到K3
    12         StringBuffer buffer = new StringBuffer();
    13         for (Text value : values) {
    14             buffer.append(value.toString()+"-");
    15         }
    16         //2:K2就是V3
    17         //3:将K3和V3写入上下文中
    18         context.write(new Text(buffer.toString().substring(0,buffer.length()-1)),key);
    19     }
    20 }

    第一个jabmain

     1 package Commond_friends;
     2 
     3 import org.apache.hadoop.conf.Configuration;
     4 import org.apache.hadoop.conf.Configured;
     5 import org.apache.hadoop.fs.Path;
     6 import org.apache.hadoop.io.Text;
     7 import org.apache.hadoop.mapreduce.Job;
     8 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
     9 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    10 import org.apache.hadoop.util.Tool;
    11 import org.apache.hadoop.util.ToolRunner;
    12 
    13 public class jobMain extends Configured implements Tool {
    14     @Override
    15     public int run(String[] strings) throws Exception {
    16         //1:获取Job对象
    17         Job job = Job.getInstance(super.getConf(), "common_friends_step1_job");
    18 
    19         //2:设置job任务
    20         //第一步:设置输入类和输入路径
    21         job.setInputFormatClass(TextInputFormat.class);
    22         TextInputFormat.addInputPath(job, new Path("file:///D:\learnpath\Bigdata\project\Mapreduce\input\input.txt"));
    23 
    24         //第二步:设置Mapper类和数据类型
    25         job.setMapperClass(FirstMapper.class);
    26         job.setMapOutputKeyClass(Text.class);
    27         job.setMapOutputValueClass(Text.class);
    28 
    29         //第三,四,五,六
    30 
    31         //第七步:设置Reducer类和数据类型
    32         job.setReducerClass(FirstReduce.class);
    33         job.setOutputKeyClass(Text.class);
    34         job.setOutputValueClass(Text.class);
    35 
    36         //第八步:设置输出类和输出的路径
    37         job.setOutputFormatClass(TextOutputFormat.class);
    38         TextOutputFormat.setOutputPath(job, new Path("file:///D:\learnpath\Bigdata\project\Mapreduce\input\input2"));
    39 
    40         //3:等待job任务结束
    41         boolean bl = job.waitForCompletion(true);
    42 
    43 
    44 
    45         return bl ? 0: 1;
    46     }
    47 
    48     public static void main(String[] args) throws Exception {
    49         Configuration configuration = new Configuration();
    50         //启动job任务
    51         int run = ToolRunner.run(configuration, new jobMain(), args);
    52         System.exit(run);
    53     }
    54 
    55 }

    第二个mapper

     1 package common_friends_step2;
     2 
     3 import org.apache.hadoop.io.LongWritable;
     4 import org.apache.hadoop.io.Text;
     5 import org.apache.hadoop.mapreduce.Mapper;
     6 
     7 import java.io.IOException;
     8 import java.util.Arrays;
     9 
    10 public class scendMapper extends Mapper<LongWritable, Text,Text,Text> {
    11     /*
    12      K1           V1
    13      0           A-F-C-J-E- B
    14     ----------------------------------
    15      K2             V2
    16      A-C           B
    17      A-E           B
    18      A-F           B
    19      C-E           B
    20      */
    21 
    22     @Override
    23     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    24         //1:拆分行文本数据,结果的第二部分可以得到V2
    25         String[] split = value.toString().split("	");
    26         String friend = split[1];
    27 
    28         //2:继续以'-'为分隔符拆分行文本数据第一部分,得到数组
    29         String[] userStr = split[0].split("-");
    30         Arrays.sort(userStr);
    31         //4:对数组中的元素进行两两组合,得到K2
    32         /*
    33           A-E-C -----> A C E
    34           A C E
    35             A C E
    36          */
    37         for(int i=0;i<userStr.length-1;i++)
    38         {
    39             for(int j=i+1;j<userStr.length;j++)
    40             {
    41                 context.write(new Text(userStr[i]+"-"+userStr[j]),new Text(friend));
    42             }
    43         }
    44 
    45 
    46     }
    47 }

    第二个reduce

     1 package common_friends_step2;
     2 
     3 import org.apache.hadoop.io.Text;
     4 import org.apache.hadoop.mapreduce.Reducer;
     5 
     6 import java.io.IOException;
     7 
     8 public class secondReduce extends Reducer<Text,Text,Text,Text> {
     9     @Override
    10     protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    11         StringBuffer stringBuffer = new StringBuffer();
    12         for (Text value : values) {
    13             stringBuffer.append(value.toString()).append("-");
    14         }
    15         context.write(key,new Text(stringBuffer.toString().substring(0,stringBuffer.length()-1)));
    16     }
    17 }

    第二个jobmain

     1 package common_friends_step2;
     2 
     3 import org.apache.hadoop.conf.Configuration;
     4 import org.apache.hadoop.conf.Configured;
     5 import org.apache.hadoop.fs.Path;
     6 import org.apache.hadoop.io.Text;
     7 import org.apache.hadoop.mapreduce.Job;
     8 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
     9 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    10 import org.apache.hadoop.util.Tool;
    11 import org.apache.hadoop.util.ToolRunner;
    12 
    13 public class jobMain extends Configured implements Tool {
    14     @Override
    15     public int run(String[] strings) throws Exception {
    16         //1:获取Job对象
    17         Job job = Job.getInstance(super.getConf(), "common_friends_step2_job");
    18 
    19         //2:设置job任务
    20         //第一步:设置输入类和输入路径
    21         job.setInputFormatClass(TextInputFormat.class);
    22         TextInputFormat.addInputPath(job, new Path("file:///D:\learnpath\Bigdata\project\Mapreduce\input\input2"));
    23 
    24         //第二步:设置Mapper类和数据类型
    25         job.setMapperClass(scendMapper.class);
    26         job.setMapOutputKeyClass(Text.class);
    27         job.setMapOutputValueClass(Text.class);
    28 
    29         //第三,四,五,六
    30 
    31         //第七步:设置Reducer类和数据类型
    32         job.setReducerClass(secondReduce.class);
    33         job.setOutputKeyClass(Text.class);
    34         job.setOutputValueClass(Text.class);
    35 
    36         //第八步:设置输出类和输出的路径
    37         job.setOutputFormatClass(TextOutputFormat.class);
    38         TextOutputFormat.setOutputPath(job, new Path("file:///D:\learnpath\Bigdata\project\Mapreduce\out"));
    39 
    40         //3:等待job任务结束
    41         boolean bl = job.waitForCompletion(true);
    42 
    43 
    44 
    45         return bl ? 0: 1;
    46     }
    47 
    48     public static void main(String[] args) throws Exception {
    49         Configuration configuration = new Configuration();
    50 
    51         //启动job任务
    52         int run = ToolRunner.run(configuration, new jobMain(), args);
    53 
    54         System.exit(run);
    55     }
    56 }

     运行截图

    初始文件表示一个用户的好友列表

     结果文件

    示例:A-B  C-E  表示A和B 的共同好友有C和E

  • 相关阅读:
    org.apache.catalina.mbeans.ServerLifecycleListener
    模拟log4j获取日志对象调用所在的类名、方法名及行号
    菜鸟从零学编程(八)——将MyEclipse项目导入到Eclipse
    Oracle DB 使用调度程序自动执行任务
    无线网卡的查看与配置——iw,iwconfig,ethtool
    如何从iTunes Connect中提款呢?
    tomcat解压war包的一点例外
    Shared File System Master Slave 全配置以及测试
    [置顶] “欢子说事”微信公众平台上线,做大学生第一交流平台
    android编程之ExpandableListView使用总结
  • 原文地址:https://www.cnblogs.com/g414056667/p/13762524.html
Copyright © 2011-2022 走看看