zoukankan      html  css  js  c++  java
  • MapReduce并行编程模型

    一、课前准备

    1. 3节点hadoop集群

    2. 安装IDEA

    3. 安装maven并配置环境变量

    二、课堂主题

    1.  围绕MapReduce分布式计算讲解

    三、课堂目标

    1. 理解MapReduce编程模型

    2. 独立完成一个MapReduce程序并运行成功

    3. 了解MapReduce工程流程

    4. 掌握并描述出shuffle全过程(面试)

    5. 理解并解决数据倾斜

    四、知识要点

    1. MapReduce编程模型

    • MapReduce是采用一种分而治之的思想设计出来的分布式计算框架
    • 如一复杂或计算量大的任务,单台服务器无法胜任时,可将此大任务切分成一个个小的任务,

        小任务分别在不同的服务器上并行的执行,最终再汇总每个小任务的结果

    • MapReduce由两个阶段组成:Map阶段(切分成一个个小的任务)、Reduce阶段(汇总成小任务的结果)

    1.1 Map阶段

    • map()函数的输入是kv对,输出是一些列kv对,输出写入本地磁盘。

    1.2 Reduce阶段

    • reduce()函数的输入是kv对(即map的输出(kv对));输出是一些列kv对,最终写入HDFS

    1.3 Main程序入口

    2. MapReduce编程示例

    •  以词频统计为例:统计一批英文文章当中,各个单词出现的总次数

    2.1 MapReduce原理图

    2.2 MR参考代码

    2.2.1 Mapper代码

    • block对应一个分片splist,一个split对应一个map task
    • reduce task的个数由程序中编程指定

    3. WEB UI 查看结果

    3.1 Yarn

    浏览器url地址: rm 节点IP:8088

    3.2 HDFS结果

    4. Combiner

    • map端本地聚合;不论运行多少次Combiner操作,都不会影响最终的结果
    • 并非所有的mr都适合combiner操作,比如求平均值
    • WorldCountMap与WorkCountReduce代码不变
    • WordCountMain中,增加job.setCombinerClass(WorldCountReduce.class)
    • 详见工程代码

    5. Shuffle

     map task 向环形缓冲区写

    环形缓冲区 100M,当使用达到80%,会溢出写磁盘文件(将环形缓冲区文件写入到磁盘中)

     写的过程: 分区、排序(combine、压缩 、num>=3)

         分区:默认HashPartition =>getPartition()方法,

            (key.hashCode()&Integer.MAX_VALUE) % numReduceTasks 

        排序:每个分区内排序,根据key排序,

    最后将多个小的分区文件合并成一个大的分区文件(仍然是排序的)

    6. 自定义分区Partition

    •  MapReduce自带的分区器是HashPartitioner
    • 原理:先对map输出的key求hash值,再模上reduce task个数,根据结果,

        决定此输出kv对,被匹配的reduce取走

    6.1 默认分区

    • 读取文件customPartition.txt,内容如下:
    • 默认HashPartitioner分区时,查看结果(看代码)

      按照字典排序

    6.2 自定义分区

    现开户自定义分区功能,并设定reduce个数为4

    详见工程代码

    (就是,给一个key,操作key,返回一个数值型,即对应的分区number)

    7. 二次排序

    •  MapReduce中,根据key进行分区、排序、分组
    • MapReduce会根据基本类型对应的key进行排序,如int类型的IntWritable,默认升序排序
    • 为什么要自定义排序规则?
    • 现有需求,需要自定义key类型,并自定义key的排序规则,如按照人的salary降序排序,

        若相同,则再按age升序排序;若salary,age相同,则放入同一组

    详见工程代码

    8. MapReduce分区倾斜

     什么是数据倾斜?

    数据中不可避免地会出现离群值(outlier),并导致数据倾斜。这些离群值会显著地拖慢MapReduce的执行。

    常见的数据倾斜有以下几类:

    1 数据频率倾斜  某一个区域的数据量要远远大于其他区域。比如某一个key对应的键值对远远大于其他键的键值对

    2 数据大小倾斜  部分几类的大小远远大于平均值

    在map端好reduce端都有可能发生数据倾斜。在reduce端的数据倾斜常常来源于MapReduce的默认分区器。

    数据倾斜会导致map和reduce的任务执行时间大为延长,也会让需要缓存数据集的操作消耗更多的内存资源。

    8.1  如何诊断是否存在数据倾斜

    1. 关注由map的输出数据中的数据频率倾斜的问题。

    2. 如何诊断map输出中哪些键存在数据倾斜?

      在reduce方法中加入记录map输出键的详细情况的功能

      发现倾斜数据之后,有必要诊断造成数据倾斜的那些键。有一个简便方法就是在代码里实现追踪每个键的

    最大值。为了减少追踪量,可以设置数据量阈值,只追踪那些数据量大于阈值的键,并输出到日志中,

    实现代码如下:

    运行作业后就可以从日志中判断发生倾斜的键以及倾斜程序;跟踪倾斜数据是了解数据的重要一步,也是设计MapReduce

    作业的重要基础

    8.2 减缓Reduce数据倾斜

    1 Reduce数据倾斜一般是指map的输出数据中存在数据频率倾斜的状况,即部分输出键的数据量远远大于其他的输出键

    2. 如何减小reduce端数据倾斜的性能损失?常用方式有:

      1.  自定义分区

      基于输出键的北京知识进行自定义分区。例如,如果map输出键的单词来源于一本书。其中大部分必然是省略次(stopword)。

      那么就可以将自定义分区将这部分省略词发送给固定的一部分reduce实例。而将其他的都发给剩余的reduce实例。

      2. Combine

      使用Combine可以大量地减少数据频率倾斜和数据大小倾斜。在可能的情况下,combine的目的就是聚合并精简数据。

      3. 抽样和范围分区

      Hadoop默认的分区器是HashPartitioner,基于map端输出键的哈希值分区。这仅在数据分布比较均匀时比较好。

      在有数据倾斜时就很有问题。

      使用分区器需要首先了解数据的特性。TotalOrderPartitioner中,可以通过对原始数据进行抽样得到的结果集来预设分区边界值。

      TotalOrderPartitioner中的范围分区器可以通过预设的分区边界值进行分区。

      因此它也可以很好地用在矫正数据中的部分键的数据倾斜问题。

      4. 数据大小倾斜的自定义策略

      在map端或reduce端的数据大小倾斜都会对缓存造成较大的影响,乃至导致OutOfMemoryError异常。

      处理这种情况并不容易。可以参考以下方法。

      设置mapreduce.input.linerecordreader.line.maxlength来限制RecordReader读取的最大长度。

      RecordReader在TextInputFormat和KeyValueTextInputFormat类中使用。默认长度没有上限。

      

     五、拓展点

    1. 全排序、防止数据倾斜

      

  • 相关阅读:
    自定义udf添加一列
    spark执行命令 监控执行命令
    R链接hive/oracle/mysql
    [Hive_6] Hive 的内置函数应用
    [Hive_add_6] Hive 实现 Word Count
    [Hive_add_5] Hive 的 join 操作
    【爬坑】远程连接 MySQL 失败
    [Hive_add_4] Hive 命令行客户端 Beeline 的使用
    [Hive_5] Hive 的 JDBC 编程
    [Hive_add_3] Hive 进行简单数据处理
  • 原文地址:https://www.cnblogs.com/hanchaoyue/p/13185305.html
Copyright © 2011-2022 走看看