zoukankan      html  css  js  c++  java
  • [大牛翻译系列]Hadoop(3)MapReduce 连接:半连接(Semijoin)

    4.1.3 半连接(Semi-join)

    假设一个场景,需要连接两个很大的数据集,例如,用户日志和OLTP的用户数据。任何一个数据集都不是足够小到可以缓存在map作业的内存中。这样看来,似乎就不能使用reduce端的连接了。尽管不是必须,可以思考以下问题:如果在数据集的连接操作中,一个数据集中有的记录由于因为无法连接到另一个数据集的记录,将会被移除。这样还需要将整个数据集放到内存中吗?在这个例子中,在用户日志中的用户仅仅是OLTP用户数据中的用户中的很小的一部分。那么就可以从OLTP用户数据中只取出存在于用户日志中的那部分用户的用户数据。然后就可以得到足够小到可以放在内存中的数据集。这种的解决方案就叫做半连接。

    图4.6说明了在半连接中将要执行的三个MapReduce作业(Job)。

    接下来介绍如何实现一个半连接。

    技术20 实现半连接

    当需要连接两个都很大的数据集时,很容易想到要用重分区连接(利用了整个MapReduce框架的reduce端的连接)。如果这么想了,又不能够将其中一个数据集过滤到一个较小的尺寸以便放到map端的内存中,那也就是想想而已。然而,如果能够将一个数据集减小到一个可管理的大小,也许就用不着使用重分区连接了。

    问题

    需要连接两个都很大的数据集,同时减少整理和排序阶段的消耗。

    解决方案

    在这个技术中,将会用到三个MapReduce作业来连接两个数据集,以此来减少reduce端连接的消耗。对于很大的数据集,这个技术非常有用。

    讨论

    在这个技术中,将会用到附录D.2中的复制连接(Replicated join)的代码来实现MapReduce作业中的最后两步(http://www.cnblogs.com/datacloud/p/3617078.html)。同时,在图4.6中的三个作业将会被分开来说明。

    作业1

    第一个MapReduce作业的功能是从日志文件中提取出用户名,用这些用户名生成一个用户名唯一的集合(Set)。这通过在map函数执行用户名的投影(projection)操作来实现。然后用reduce出用户名。为了减少在map阶段和reduce阶段之间传输的数据量,采用如下方法:在map任务中采用哈希集(HashSet)来保存用户名,在cleanup方法中输出哈希集的值。图4.7说明了这个作业的流程:

    作业1的map和reduce的代码如下:

     1 public static class Map extends Mapper<Text, Text, Text, NullWritable> {
     2 
     3     private Set<String> keys = new HashSet<String>();
     4     
     5     @Override
     6     protected void map(Text key, Text value, Context context)
     7         throws IOException, InterruptedException {
     8         keys.add(key.toString());
     9     }
    10     
    11     @Override
    12     protected void cleanup(Context context)
    13         throws IOException, InterruptedException {
    14         
    15         Text outputKey = new Text();
    16         
    17         for(String key: keys) {
    18             outputKey.set(key);
    19             context.write(outputKey, NullWritable.get());
    20         }
    21         
    22     }
    23     
    24 }
    25 
    26 public static class Reduce extends Reducer<Text, NullWritable, Text, NullWritable> {
    27 
    28     @Override
    29     protected void reduce(Text key, Iterable<NullWritable> values, Context context)
    30         throws IOException, InterruptedException {
    31         context.write(key, NullWritable.get());
    32     }
    33     
    34 }

    作业1的结果就是来自于日志文件中的所有用户的集合。集合中的用户名是唯一的。

    作业2

    作业2包含了复杂的过滤过程。目的是从全体用户的用户数据集中移除不存在于日志文件中的用户。这是一个只包含map的作业。它用到了复制连接来缓存出现在日志文件中的用户名,并把他们和全体用户的数据集连接。由于来自于作业1的用户唯一的数据集要远远小于全体用户的数据集,就把来自作业1的用户集放到缓存中了。图4.8说明了这个作业的流程:

    现在是个不错的时间去熟悉一下附录D中的复制连接框架。这个框架对KeyValueTextInputFormat和TextOutputFormat提供了内置支持,并假设 KeyValueTextInputFormat生成的键是连接键。同时,这也是数据被展开的过程。图4.9是这个框架的类图:

    GenericReplicatedJoin类是执行连接的类。如图4.9中所示,在GenericReplicatedJoin的类列表中前三个类是可扩展的,相对应的复制连接的行为也是可定制的。readFromInputFormat方法可以用于任意的输入类型(InputFormat)。getDistributedCacheReader方法可以被重载来支持来自于分布式缓存(distributed cache)的任意文件类型。在这一步中的核心是join方法。join方法将会生成作业的输出键和输出值。在默认的实现中,两个数据集的值将会被合并以生成最终的输出值。这个join方法可以自定义,可以指定仅仅输出来自于OLTP的用户表的值,如下所示:

    1 public class ReplicatedFilterJob extends GenericReplicatedJoin {
    2 
    3     @Override
    4     public Pair join(Pair inputSplitPair, Pair distCachePair) {
    5         return inputSplitPair;
    6     }
    7     
    8 }

    还需要把来自于作业1的文件放到分布式缓存中:

    1 for(FileStatus f: fs.listStatus(uniqueUserStatus)) {
    2     if(f.getPath().getName().startsWith("part")) {
    3         DistributedCache.addCacheFile(f.getPath().toUri(), conf);
    4     }
    5 }

    然后,在驱动(driver)代码中,调用GenericReplicatedJoin类:

     1 public class ReplicatedFilterJob extends GenericReplicatedJoin {
     2 
     3     public static void runJob(Path usersPath,
     4                                 Path uniqueUsersPath,
     5                                 Path outputPath)
     6         throws Exception {
     7         
     8         Configuration conf = new Configuration();
     9         
    10         for(FileStatus f: fs.listStatus(uniqueUsersPath)) {
    11             if(f.getPath().getName().startsWith("part")) {
    12                 DistributedCache.addCacheFile(f.getPath().toUri(), conf);
    13             }
    14         }
    15         
    16         Job job = new Job(conf);
    17         job.setJarByClass(ReplicatedFilterJob.class);
    18         job.setMapperClass(ReplicatedFilterJob.class);
    19         job.setNumReduceTasks(0);
    20         job.setInputFormatClass(KeyValueTextInputFormat.class);
    21         outputPath.getFileSystem(conf).delete(outputPath, true);
    22         FileInputFormat.setInputPaths(job, usersPath);
    23         FileOutputFormat.setOutputPath(job, outputPath);
    24         
    25         if(!job.waitForCompletion(true)) {
    26             throw new Exception("Job failed");
    27         }
    28         
    29     }
    30     
    31     @Override
    32     public Pair join(Pair inputSplitPair, Pair distCachePair) {
    33         return inputSplitPair;
    34     }
    35     
    36 }

    作业2的输出就是已被用户日志数据集的用户过滤过的用户集了。

    作业3

    在最后一步中,需要将作业2生成的已过滤的用户集和原始的用户日志合并了。表面上,已过滤的用户集是足够小到可以放到内存中,同样也可以放到分布式缓存中。图4.10说明了这个作业的流程:

    1 FileStatus usersStatus = fs.getFileStatus(usersPath);
    2 
    3 for(FileStatus f: fs.listStatus(usersPath)) {
    4 
    5     if(f.getPath().getName().startsWith("part")) {
    6         DistributedCache.addCacheFile(f.getPath().toUri(), conf);
    7     }
    8     
    9 ...

    这里要再次用到复制连接框架来执行连接。但这次不用自定义join方法的行为,因为两个数据集中的数据都要出现在最后的输出中。

    执行这个代码,观察前述步骤生成的输出。

    $ bin/run.sh com.manning.hip.ch4.joins.semijoin.Main users.txt user-logs.txt output
    
    $ hadoop fs -ls output
    /user/aholmes/output/filtered
    /user/aholmes/output/result
    /user/aholmes/output/unique
    
    $ hadoop fs -cat output/unique/part*
    bob
    jim
    marie
    mike
    
    $ hadoop fs -cat output/filtered/part*
    mike 69 VA
    marie 27 OR
    jim 21 OR
    bob 71 CA
    
    $ hadoop fs -cat output/result/part*
    jim logout 93.24.237.12 21 OR
    mike new_tweet 87.124.79.252 69 VA
    bob new_tweet 58.133.120.100 71 CA
    mike logout 55.237.104.36 69 VA
    jim new_tweet 93.24.237.12 21 OR
    marie view_user 122.158.130.90 27 OR
    jim login 198.184.237.49 21 OR
    marie login 58.133.120.100 27 OR

    这些输出说明了在半连接的作业中的逻辑进程和最终连接的输出。

    小结

    在这个技术中说明了如何使用半连接来合并两个数据集。半连接的创建包括了比其他连接类型更多的步骤。但它确实是一个处理大的数据集的map端连接的强大的工具。当然,这些很大的数据集要能够被减小到能够放到内存中。

  • 相关阅读:
    进制转换
    体验mssql-cli
    从Windows迁移SQL Server到Linux
    CentOS7脱机安装SQL Server 2017
    基础知识:数据类型优先级
    SQL Server 2016正式版安装(超多图)
    制造高CPU使用率的简单方法
    SQL Server启动的几种方法
    SQL Server 2016 RC0 安装(超多图)
    机器学习:Python实现单层Rosenblatt感知器
  • 原文地址:https://www.cnblogs.com/datacloud/p/3579975.html
Copyright © 2011-2022 走看看