zoukankan      html  css  js  c++  java
  • MapReduce之Map Join

    一 介绍

    之所以存在Reduce Join,是因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。

    Map Join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。

    为了支持文件的共享,Hadoop用到了分布式缓存的概念,在MapReduce中称为DistributedCache(目前已被标注为弃用,分布式缓存的API可在Job类本身调用),它可以方便Map Task之间或Reduce Task之间共享一些信息,同时也可以将第三方Jar包添加到其Classpass路径中。Hadoop会将缓存数据分发到集群中所有准备启动的节点上,复制到mapreduce.temp.dir中的配置目录。

    使用该类的方法如下:

    job.addArchiveToClassPath(archive); //缓存jar包到task运行节点的classpath中
    ob.addCacheArchive(uri); //缓存压缩包到task运行节点的工作目录
    job.addFileToClassPath(file); //缓存普通文件到task运行节点的classpath中
    job.addCacheFile(url); //将产品表文件缓存到task工作节点的工作目录中去
    

    传参格式:hdfs://namenode:9000/home/XXX/file,即Jar包、压缩包、普通文件所在hdfs路径。

    同时DistributedCache(分布式缓存)可用来解决join算法实现中的数据倾斜问题,例如两张表:订单表和产品表。

    订单表:

    订单号 时间 商品id 购买数量 
    1001,20170710,P0001,1 
    1002,20170710,P0001,3 
    1003,20170710,P0002,3 
    1004,20170710,P0002,4
    

    产品表: 

    商品id 商品名称 
    P0001,xiaomi
    P0002,huawei
    

    需求就是根据外键商品id来将两张表信息合并,拼接成 :

    1001 ,20170710,P0001,1 xiaomi
    1002,20170710,P0001,3 xiaomi
    1003,20170710,P0002,3,huawei
    1004,20170710,P0002,4,huawei

    考虑问题:在mapreduce程序中,如果某些产品非常畅销,肯定会产生很多订单,但是刚好这些订单信息都传到了一个reduce中(分区默认就是使用hashcode%reducetask数量,所以这种情况是正常的)。那么这个reducetask压力就很大了,而其他的reducetask处理的信息就很小,有的甚至就处理几条数据,这就出现了数据倾斜问题。

    解决方案:一般来说订单表的数据远远多于产品表数据,毕竟产品的种类就那些,所以我们可以把产品信息都交给Map Task就行了逻辑都让Map Task来处理,也就是说不使用Reduce了,而让每个Map Task持有个product.data(存储产品信息的文件)即可。那么maptask怎么获得这个文件呢?刚好hadoop提供了DistributedCache,我们将文件交给这个分布式缓存,它会将我们的文件放到Map Task的工作目录中,那么Map 端可以直接从工作目录中去拿。

    二 代码部分

      1 package mapreduce.DistributedCache;
      2 
      3 import java.io.BufferedReader;
      4 import java.io.FileInputStream;
      5 import java.io.IOException;
      6 import java.io.InputStream;
      7 import java.io.InputStreamReader;
      8 import java.net.URI;
      9 import java.util.HashMap;
     10 import java.util.Map;
     11 import org.apache.hadoop.conf.Configuration;
     12 import org.apache.hadoop.conf.Configured;
     13 import org.apache.hadoop.fs.Path;
     14 import org.apache.hadoop.io.LongWritable;
     15 import org.apache.hadoop.io.NullWritable;
     16 import org.apache.hadoop.io.Text;
     17 import org.apache.hadoop.mapreduce.Job;
     18 import org.apache.hadoop.mapreduce.Mapper;
     19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     20 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     21 import org.apache.hadoop.util.Tool;
     22 import org.apache.hadoop.util.ToolRunner;
     23 
     24 public class MapJoin extends Configured implements Tool{
     25     static class MapJoinMapper extends Mapper<LongWritable, Text, NullWritable, Text>{
     26         //用来缓存小文件(商品文件中的数据)
     27         Map<String, String> produceMap = new HashMap<String,String>();
     28         Text k = new Text();
     29         /*
     30          * 源码中能看到在循环执行map()之前会执行一次setUp方法,可以用来做初始化
     31          */
     32         @Override
     33         protected void setup(Context context)
     34                 throws IOException, InterruptedException {
     35 
     36             //将商品文件中的数据写到缓存中  
     37             FileInputStream fileInput = new FileInputStream("product.data");
     38             //read data
     39             InputStreamReader readFile = new InputStreamReader(fileInput );
     40             BufferedReader br = new BufferedReader(readFile);
     41             String line = null;
     42             while((line=br.readLine())!=null){
     43                 //一行数据格式为P0001,xiaomi(商品id,商品名称)
     44                 String[] fields = line.split(",");
     45                 produceMap.put(fields[0], fields[1]);
     46             }
     47         }
     48         @Override
     49         protected void map(LongWritable key, Text value, Context context)
     50                 throws IOException, InterruptedException {
     51             //一行订单数据    格式为 1001,20170710,P0001,1(订单id,创建时间,商品id,购买商品数量)
     52             String line = value.toString();
     53             String[] fields = line.split(",");
     54             //根据订单数据中商品id在缓存中找出来对应商品信息(商品名称),进行串接
     55             String productName = produceMap.get(fields[2]);
     56             k.set(line+","+productName);
     57             context.write(NullWritable.get(), k );
     58         }
     59     }
     60     
     61     public int run(String[] args) throws Exception {
     62 
     63     // step 1:get configuration
     64     Configuration conf = this.getConf();
     65         //set job
     66     Job job = Job.getInstance(conf);
     67         job.setJarByClass(MapJoin.class);
     68 
     69         job.setMapperClass(MapJoinMapper.class);
     70         job.setMapOutputKeyClass(Text.class);
     71         job.setMapOutputValueClass(NullWritable.class);
     72 
     73         //设置最终输出类型
     74         job.setOutputKeyClass(Text.class);
     75         job.setOutputValueClass(NullWritable.class);
     76 
     77         //将产品表文件缓存到task工作节点的工作目录中去
     78         //缓存普通文件到task运行节点的工作目录(hadoop帮我们完成)
     79         job.addCacheFile(new URI("hdfs://beifeng01:8020/user/beifeng01/mapreduce/input/mapjoin/product.data"));
     80 
     81         //不需要reduce,那么也就没有了shuffle过程
     82         job.setNumReduceTasks(0);
     83 
     84         FileInputFormat.setInputPaths(job, new Path(args[0]));
     85         FileOutputFormat.setOutputPath(job, new Path(args[1]));
     86 
     87         boolean isSuccess = job.waitForCompletion(true);
     88         
     89         return isSuccess ? 0 : 1;
     90     }
     91     
     92     public static void main(String[] args) throws Exception {
     93         args = new String[]{
     94                 "hdfs://beifeng01:8020/user/beifeng01/mapreduce/input/mapjoin/orderid.data",
     95                 "hdfs://beifeng01:8020/user/beifeng01/mapreduce/output4"
     96         };
     97         
     98         Configuration conf = new Configuration();
     99         
    100         // run mapreduce
    101         int status = ToolRunner.run(conf, new MapJoin(), args);
    102         
    103         // exit program
    104         System.exit(status);
    105     }
    106 }

    运行代码后查看输出结果

    [hadoop@beifeng01 hadoop-2.5.0-cdh5.3.6]$ bin/hdfs dfs -text /user/beifeng01/mapreduce/output4/p*
    1001,20170710,P0001,1,xiaomi 
    1002,20170710,P0001,3,xiaomi 
    1003,20170710,P0002,3,huawei
    1004,20170710,P0002,4,huawei
    

      

  • 相关阅读:
    关于post和get的区别
    修改ubuntu系统时区
    修改 Ubuntu 下 Mysql 编码
    C++程序设计实践指导1.10二维数组元素换位改写要求实现
    C++程序设计实践指导1.7超长数列中n个数排序改写要求实现
    C++程序设计实践指导1.15找出回文数改写要求实现
    C++程序设计实践指导1.14字符串交叉插入改写要求实现
    C++程序设计实践指导1.13自然数集中找合数改写要求实现
    C++程序设计实践指导1.12数组中数据线性变换改写要求实现
    C++程序设计实践指导1.9统计与替换字符串中的关键字改写要求实现
  • 原文地址:https://www.cnblogs.com/perfectdata/p/10125351.html
Copyright © 2011-2022 走看看