前面展示了 MapReduce 针对 小量 输入的 工作方式,
现在是时候 整体 了解 系统 并 进入 大数据 流 作为 输入了。
为简单起见,我们的例子 到目前为止 都使用 本地 文件系统 中的文件。
然而 , 为了 分布化,我们需要 把 数据 存储在 分布式文件 系统中, 典型的如 HDFS ,
以允许 Hadoop 把 MapReduce 的 计算 移到 承载 部分 数据的 各台机器。
下面 我们 就来 看看 这是如何 工作的。
数据流
首先是一些 术语 的 说明。
MapReduce 作业(job)是 客户端执行的单位: 它 包括输入数据、 MapReduce 程序 和 配置信息。
Hadoop 通过把 作业分成 若干个小任务 (task)来工作,其 包括 两种 类型的任务: map 任务 和 reduce 任务。
有两种类型的 节点 控制着 作业执行过程: jobtracker 和 多个 tasktracker 。
jobtracker 通过调度任务 在 tasktracker 上 运行,来协调 所有 运行在 系统上的 作业。
Tasktracker 运行任务的 同时, 把 进度 报告传送 到 jobtracker, jobtracker 则 记录着 每项 任务 的 整体 进展情况。
如果 其中一个任务 失败, jobtracker 可以 重新 调度任务 到另外 一个 tasktracker .
Hadoop 把 输入数据 划分成 等长 的 小数据 发送 到 MapReduce , 称为 输入 分片(input split) 或 分片。
Hadoop 为 每个 分片( split ) 创建一个map 任务, 由 它 来 运行 用户 自定义的 map 函数 来 分析 每个 分片 中的 记录。
拥有 许多 分片 就意味着 处理 每个 分片 的 时间 与 处理 整个 输入的 时间 相比 是比较小的。
因此, 如果 我们 并行 处理 每个 分片, 且 分片 是 小块的数据, 那么 处理过程 将 有 一个更好的 负载 平衡,
因为 更快 的 计算机 将 能够 比 一台 速度 较慢 的 机器 在 作业 过程 中 处理完 比例 更多 的 数据 分片。
即使是 相同的 机器, 没有 处理的 或 其他同时运行的 作业 也会 使 负载平衡得以实现,
并且 在 分片变得 更细时, 负载平衡 质量 也会 更佳。
另一方面,如果 分片太小,那么 管理 分片的总时间 和 map 任务 创建的 总时间 将 决定 作业的 执行的 总时间。
对于 大多数 作业, 一个 理想的 分片 大小往往 是 一个 HDFS 块 的 大小, 默认是 64 MB,
虽然 这 可以 根据 集群 进行 调整 (对于 所有 新建 文件) 或 在 新建 每个 文件 时 具体 进行 指定。
map 任务 的 执行 节点 和 输入 数据的 存储 节点 是 同一个 节点,
Hadoop 的 性能 达到最佳。 这就是 所谓的 data locality optimization ( 数据局部 性 优化 )。
现在我们应该 清楚 为什么 最佳 分片 的 大小 与 块 大小 相同: 它是 最大的 可 保证 存储在单个节点上的数据量。
如果 分区 跨越 两个 块, 那么 对于 任何一个 HDFS 节点 而言, 基本 不可能 同时 存储 这 两 数据块,
因此 此 分布的 某部分 必须 通过 网络 传输到节点, 这 与 使用 本地 数据 运行 map 任务 相比, 显然 效率 更低。
map 任务 把 输出 写入 本地 硬盘, 而不是 HDFS。 这是为什么?
因为 map 的 输出 作为 中间 输出: 而 中间输出 则 被 reduce 任务 处理 后 产生 最终的 输出, 一旦 作业 完成, map 的 输出 就可以 删除了。
因此 , 把 它 及其 副本 存储 在 HDFS 中, 难免 有些 小题 大做。
如果 该 节点 上 运行的 map 任务 在 map 输出 给 reduce 任务 处理 之前 崩溃,
那么 Hadoop 将 在另一个i 节点 上 重新 运行 map 任务 以 再次 创建 map 的 输出。
reduce 任务并不具备 数据 本地读取的 优势—— 一个 单一 的 reduce 任务 的 输入 往往 来自于 所有 mapper 的 输出。
在本例中,我们 有 一个 单独的 reduce 任务, 其 输入 是 由 所有 map 任务 的输出组成的。
因此 , 有序 map 的 输出 必须 通过 网络 传输 到 reduce 任务 运行的 节点, 并在 那里 进行 合并, 然后 传递到 用户 定义的 reduce 函数中。
为 增加 其可靠性, reduce 的输出 通常 存储 在 HDFS 中。
如第3章 所述, 对于 每个 reduce 输出 的 HDDFS 块, 第一个 副本 存储 在本地 节点上, 其他副本存储在其他机架节点中。
因此,编写reduce 的 输出确实 十分 占用 网络 带宽,但是 只是 和 正常的 HDFS 写 管线的 消耗一样。
一个单一的 reduce 任务的 整个 数据流 如图 2-2 所示。 虚线框 表示 节点,虚线箭头 表示 数据传输 到 一个 节点上,
而 实线的 箭头 表示 节点 之间的 数据传输。
reduce 任务的 数目 并不是 由 输入的 大小 来决定的, 而是 单独具体 指定的。
在第 7 章 的 7.1 节 中, 将介绍 如何 为一个 给定的作业选择reduce 任务数量。
如果有多个 reducer, map 任务 会对 其 输出 进行 分区, 为 每个 reduce 任务 创建 一个 分区(partition).
每个 分区 包含 许多 键 (及其 关联的值), 但 每个 键的 记录 都在 同一 分区中。
分区可以 通过 用户定义的 partitioner 来控制, 但 通常 是 默认的 分区 工具,
它 使用的 是 hash 函数 来形成的 “木桶” 键/值, 这种 方法效率 很高。
一般 情况下, 多个 reduce 任务 的数据流 如图 2-3 所示。
此图 清楚地 表明了 map 和 reduce 任务 之间 的 数据流 为什么要 称为 "shuffle"(洗牌),因为 每个 reduce 任务 的 输入 都由 许多 map 任务 来提供。
shuffle 其实 比 此图 所显示 的 更复杂, 并且 调整 它 可能 对作业的 执行 时间 产生 很大的影响, 详见 6.4 节。
最后,也有可能 不存在reduce 任务, 不需要 shuffle 的 时候, 这样的 情况 是 可能的,
因为 处理 可以并行 进行( 第 7章 有 几个例子 讨论了 这个 问题 )。
在这种情况下, 唯一的 非 本地 节点 数据传输 是 当 map 任务 写入到 HDFS 中 (见图 2-4).
集群的 可用 带宽 限制了 MapReduce 作业的 数量, 因此 map 和 reduce 任务 之间数据 传出的 代价 是 最小的。
Hadoop 允许 用户 声明 一个 combiner , 运行 在 map 的 输出 上 —— 该函数的 输出 作为 reduce 函数的 输入。
由于 combiner 是 一个 优化方法, 所以 Hadoop 不保证 对于 某个 map 的 输出记录 是否 调用 该方法,调用该方法多少次。
换言之, 不调用 该 方法或者 调用该方法多次, reducer 的 输出 结果 都一样。
combiner 的 规则限制这可用的 函数类型。我们将用一个例子 来 巧妙地 加以 说明。
以 前面 的 最高 气温例子 为例, 1950 年的 读数 有 两个 map 处理 ( 因为它们 在 不同的 分片中 )。
假设第一个 map 的 输出 如下:
..........
combiner 并不能取代 reduce 函数。( 为什么呢? reduce 函数 仍然 需要处理来自 不同的 map 给出的 相同记录。 )
但 它 可以帮助 减少 map 和 reduce 之间 的 数据传输量, 而正因为此, 是否在 MapReduce 作业中 使用 combiner 是需要 慎重考虑的。
运行 分布式 MapReduce 作业
同一个程序将在 一个 完整的 数据集 中 直接 运行 而不做 更改。这是 MapReduce 的 优势 之一: 它扩充 数据大小 和硬件 规模。
Hadoop流
Hadoop 提供了 一个 API 来 运行 MapReduce, 并允许你 用 除 java 以外 的 语言 来编写 自己 的 map 和 reduce 函数。
Hadoop 流 使用 Unit 标准 流 作为 Hadoop 和 程序之间 的接口,
所以 可以使用 任何语言, 只要 编写 的 MapReduce 程序 能够 读取 标准输入, 并写入 到 标准输出。
流 适用于 文字处理( 尽管 0.21.0 版本 也可以 处理 二进制流), 在文本模式下使用时, 它有一个 面向 行的 数据视图。
map 的 输入 数据 把 标准 输入流 传输到 map 函数,其中 是 一行一行的传输,然后 再把 行写入标准输出。
一个 map 输出的 键/值 对 是 以 单一 的 制表符 分隔的行来写入的。
reduce 函数的 输入具有 相同的 格式——通过 制表符 来 分隔的 键/值 对 —— 传输 标准输入流。
reduce 函数 从 标准输入流 读入行, 然后 为 保证 结果的 有序性 用 键来排序, 最后 将 结果 写入标准输出。