zoukankan      html  css  js  c++  java
  • MapReduce中的Join算法

           在关系型数据库中 Join 是非常常见的操作,各种优化手段已经到了极致。在海量数据的环境下,不可避免的也会碰到这种类型的需求, 例如在数据分析时需要连接从不同的数据源中获取到数据。不同于传统的单机模式,在分布式存储下采用MapReduce 编程模型,也有相应的处理措施和优化方法。

           现在假设有两个数据集:气象站数据库和天气记录数据库,并考虑如何合二为一。一个典型的查询是:输出气象站的历史信息,同时各行记录也包含气象站的元数据信息。

           气象站和天气记录合并之后的示意图如下所示。

           连接操作的具体实现技术取决于数据集的规模及分区方式。如果一个数据集很大而另外一个集合很小,以至于可以分发到集群中的每一个节点之中, 则可以执行一个 MapReduce 作业,将各个气象站的天气记录放到一块,从而实现连接。mapper 或 reducer 根据各气象站 ID 从较小的数据集合中找到气象站元数据,从而完成气象站数据和天气记录数据的合并。这种情况下,我们需要用到Hadoop的分布式缓存机制(在后面的分布式缓存DistributeCache中重点介绍),它能够在任务运行过程中及时地将文件和存档复制到任务节点以供使用。     

           连接操作如果由 mapper 执行,则称为 “map 端连接”;如果由 reducer执行,则称为 “reduce 端连接”。

           如果两个数据集的规模均很大,以至于没有哪个数据集可以被完全复制到集群的每个节点,我们仍然可以使用MapReduce 来进行连接,至于到底采用 map 端连接还是 reduce 端连接,则取决于数据的组织方式。 下面我们分别介绍 map 端连接和reduce 端连接。

    map端连接

          在两个大规模输入数据集之间的 map 端连接会在数据达到 map 函数之前就执行连接操作。为达到该目的,各 map 的输入数据必须先分区并且以特定方式排序。 各个输入数据集被划分成相同数量的分区,并且均按相同的键(连接键)排序。同一键的所有记录均会放在同一分区之中。听起来似乎要求非常严格,但这的确合乎MapReduce 作业的输出。

           map端连接操作可以连接多个作业的输出,只要这些作业的 reducer数量相同、键相同并且输出文件是不可切分的(例如,小于一个 HDFS 块,或 gzip 压缩)。 在上面讲的天气例子中,如果气象站文件以气象站ID部分排序,记录文件也以气象站 ID 部分排序,而且 reducer 的数量相同,则就满足了执行 map 端连接的前提条件。

          利用org.apache.hadoop.mapreduce.join 包中的CompositeInputFormat 类来运行一个 map 端连接。CompositeInputFormat类的输入源和连接类型(内连接或外连接)可以通过一个连接表达式进行配置, 连接表达式的语法简单。详情与示例可参考包文档。此种方法不常用,了解即可,这里不再赘述。

    reduce端连接

            由于 reduce 端连接并不要求输入数据集符合特定结构,因而 reduce端连接比 map 端连接更为常用。但是,由于两个数据集均需经过MapReduce 的 shuffle过程, 所以 reduce 端连接的效率往往要低一些。基本思路是 mapper 为各个记录标记源,并且使用连接键作为 map 输出键,使键相同的记录放在同一 reducer 中。我们通过下面两种技术实现 reduce 端连接。

        1、多输入

           数据集的输入源往往有多种格式,因此可以使用MultipleInputs 类来方便地解析和标注各个数据源。MultipleInputs的用法,在输入格式课程已经详细介绍,这里就不再赘述。

        2、二次排序

           如前所述,reducer 将两个数据源中选出键相同的记录并不介意这些记录是否已排好序。此外,为了更好地执行连接操作,先将某一个数据源传输到 reducer会非常重要。 还以上面的天气数据连接为例,当天气记录发送到 reducer的时候,与这些记录有相同键的气象站信息最好也已经放在 reducer,使得 reducer能够将气象站名称填到天气记录之中就马上输出。 虽然也可以不指定数据传输次序,并将待处理的记录缓存在内存之中,但应该尽量避免这种情况,因为其中任何一组的记录数量可能非常庞大,远远超出 reducer的可用内存容量。 因此我们用到二次排序技术,对 map 阶段输出的每个键的值进行排序,实现这一效果。

           我们使用 TextPair 类构建组合键,包括气象站 ID 和 “标记”。在这里,“标记” 是一个虚拟的字段,其唯一目的是对记录排序,使气象站记录比天气记录先到达reduce。一种简单的做法就是:对于气象站记录, “标记” 的值设为 0;对于天气记录,“标记” 的值设为1。

    分布式缓存

            当 MapReduce 处理大型数据集间的 join 操作时,此时如果一个数据集很大而另外一个集合很小,以至于可以分发到集群中的每个节点之中。 这种情况下,我们就用到了 Hadoop 的分布式缓存机制,它能够在任务运行过程中及时地将文件和存档复制到任务节点以供使用。为了节约网络宽带,在每一个作业中, 各个文件通常只需要复制到一个节点一次。

         1、用法

           Hadoop 命令行选项中,有三个命令可以实现文件复制分发到任务的各个节点。

           1)用户可以使用 -files 选项指定待分发的文件,文件内包含以逗号隔开的 URL 列表。文件可以存放在本地文件系统、HDFS、或其它 Hadoop 可读文件系统之中。 如果尚未指定文件系统,则这些文件被默认是本地的。即使默认文件系统并非本地文件系统,这也是成立的。

          2)用户可以使用 -archives 选项向自己的任务中复制存档文件,比如JAR 文件、ZIP 文件、tar 文件和 gzipped tar文件,这些文件会被解档到任务节点。

           3)用户可以使用 -libjars 选项把 JAR 文件添加到 mapper 和 reducer 任务的类路径中。如果作业 JAR 文件并非包含很多库 JAR 文件,这点会很有用。

         2、工作机制

           当用户启动一个作业,Hadoop 会把由 -files、-archives、和 -libjars 等选项所指定的文件复制到分布式文件系统之中。接着,在任务运行之前,tasktracker 将文件从分布式文件系统复制到本地磁盘(缓存)使任务能够访问文件。此时,这些文件就被视为 “本地化” 了。从任务的角度来看, 这些文件就已经在那儿了,它并不关心这些文件是否来自 HDFS 。此外,有 -libjars 指定的文件会在任务启动前添加到任务的类路径(classpath)中。

         3、分布式缓存 API

           由于可以通过 Hadoop 命令行间接使用分布式缓存,大多数应用不需要使用分布式缓存 API。然而,一些应用程序需要用到分布式缓存的更高级的特性,这就需要直接使用 API 了。 API 包括两部分:将数据放到缓存中的方法,以及从缓存中读取数据的方法。

           1)首先掌握数据放到缓存中的方法,以下列举 Job 中可将数据放入到缓存中的相关方法:

            public void addCacheFile(URI uri);

            public void addCacheArchive(URI uri);//以上两组方法将文件或文档添加到分布式缓存

            public void setCacheFiles(URI[] files);

            public void setCacheArchives(URI[] archives);//以上两组方法将一次性向分布式缓存中添加一组文件或文档

            public void addFileToClassPath(Path file);

            public void addArchiveToClassPath(Path archive);//以上两组方法将文件或文档添加到MapReduce 任务的类路径

            public void createSymlink();//为所有本地文件创建一个符号化的链接

           在缓存中可以存放两类对象:文件(files)和文档(achives)。文件被直接放置在任务节点上,而文档则会被解档之后再将具体文件放置在任务节点上。

           2)其次掌握在 map 或者 reduce 任务中,使用 API 从缓存中读取数据。

            public Path[] getLocalCacheFiles() throws IOException;

            public Path[] getLocalCacheArchives() throwsIOException;

            public Path[] getFileClassPaths();

            public Path[] getArchiveClassPaths();

           我们可以使用 getLocalCacheFiles()和getLocalCacheArchives()方法获取缓存中的文件或者存档的引用。 当处理存档时,将会返回一个包含解档文件的目的目录。相应的,用户可以通过 getFileClassPaths()和getArchivesClassPaths()方法获取被添加到任务的类路径下的文件和文档。

     

    以上就是博主为大家介绍的这一板块的主要内容,这都是博主自己的学习过程,希望能给大家带来一定的指导作用,有用的还望大家点个支持,如果对你没用也望包涵,有错误烦请指出。如有期待可关注博主以第一时间获取更新哦,谢谢!

     

     版权声明:本文为博主原创文章,未经博主允许不得转载。




  • 相关阅读:
    Revit二次开发示例:DisableCommand
    Revit二次开发示例:DesignOptions
    C# 非模式窗体show()和模式窗体showdialog()的区别
    Revit二次开发示例:DeleteObject
    被动永远做不好运维
    sudo开发常用命令总结
    ansible 配置了端口在host文件但是还要走22 ip:60001 ansible_ssh_port=60001
    ansible wc -l 对结果值取大小的操作
    mha切换脚本可用的
    mongoDB自动杀执行时间的连接
  • 原文地址:https://www.cnblogs.com/zimo-jing/p/9012842.html
Copyright © 2011-2022 走看看