zoukankan      html  css  js  c++  java
  • MapReduce之Map Join

    一 介绍

    之所以存在Reduce Join,是因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。

    Map Join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。

    为了支持文件的共享,Hadoop用到了分布式缓存的概念,在MapReduce中称为DistributedCache(目前已被标注为弃用,分布式缓存的API可在Job类本身调用),它可以方便Map Task之间或Reduce Task之间共享一些信息,同时也可以将第三方Jar包添加到其Classpass路径中。Hadoop会将缓存数据分发到集群中所有准备启动的节点上,复制到mapreduce.temp.dir中的配置目录。

    使用该类的方法如下:

    job.addArchiveToClassPath(archive); //缓存jar包到task运行节点的classpath中
    ob.addCacheArchive(uri); //缓存压缩包到task运行节点的工作目录
    job.addFileToClassPath(file); //缓存普通文件到task运行节点的classpath中
    job.addCacheFile(url); //将产品表文件缓存到task工作节点的工作目录中去
    

    传参格式:hdfs://namenode:9000/home/XXX/file,即Jar包、压缩包、普通文件所在hdfs路径。

    同时DistributedCache(分布式缓存)可用来解决join算法实现中的数据倾斜问题,例如两张表:订单表和产品表。

    订单表:

    订单号 时间 商品id 购买数量 
    1001,20170710,P0001,1 
    1002,20170710,P0001,3 
    1003,20170710,P0002,3 
    1004,20170710,P0002,4
    

    产品表: 

    商品id 商品名称 
    P0001,xiaomi
    P0002,huawei
    

    需求就是根据外键商品id来将两张表信息合并,拼接成 :

    1001 ,20170710,P0001,1 xiaomi
    1002,20170710,P0001,3 xiaomi
    1003,20170710,P0002,3,huawei
    1004,20170710,P0002,4,huawei

    考虑问题:在mapreduce程序中,如果某些产品非常畅销,肯定会产生很多订单,但是刚好这些订单信息都传到了一个reduce中(分区默认就是使用hashcode%reducetask数量,所以这种情况是正常的)。那么这个reducetask压力就很大了,而其他的reducetask处理的信息就很小,有的甚至就处理几条数据,这就出现了数据倾斜问题。

    解决方案:一般来说订单表的数据远远多于产品表数据,毕竟产品的种类就那些,所以我们可以把产品信息都交给Map Task就行了逻辑都让Map Task来处理,也就是说不使用Reduce了,而让每个Map Task持有个product.data(存储产品信息的文件)即可。那么maptask怎么获得这个文件呢?刚好hadoop提供了DistributedCache,我们将文件交给这个分布式缓存,它会将我们的文件放到Map Task的工作目录中,那么Map 端可以直接从工作目录中去拿。

    二 代码部分

      1 package mapreduce.DistributedCache;
      2 
      3 import java.io.BufferedReader;
      4 import java.io.FileInputStream;
      5 import java.io.IOException;
      6 import java.io.InputStream;
      7 import java.io.InputStreamReader;
      8 import java.net.URI;
      9 import java.util.HashMap;
     10 import java.util.Map;
     11 import org.apache.hadoop.conf.Configuration;
     12 import org.apache.hadoop.conf.Configured;
     13 import org.apache.hadoop.fs.Path;
     14 import org.apache.hadoop.io.LongWritable;
     15 import org.apache.hadoop.io.NullWritable;
     16 import org.apache.hadoop.io.Text;
     17 import org.apache.hadoop.mapreduce.Job;
     18 import org.apache.hadoop.mapreduce.Mapper;
     19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     20 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     21 import org.apache.hadoop.util.Tool;
     22 import org.apache.hadoop.util.ToolRunner;
     23 
     24 public class MapJoin extends Configured implements Tool{
     25     static class MapJoinMapper extends Mapper<LongWritable, Text, NullWritable, Text>{
     26         //用来缓存小文件(商品文件中的数据)
     27         Map<String, String> produceMap = new HashMap<String,String>();
     28         Text k = new Text();
     29         /*
     30          * 源码中能看到在循环执行map()之前会执行一次setUp方法,可以用来做初始化
     31          */
     32         @Override
     33         protected void setup(Context context)
     34                 throws IOException, InterruptedException {
     35 
     36             //将商品文件中的数据写到缓存中  
     37             FileInputStream fileInput = new FileInputStream("product.data");
     38             //read data
     39             InputStreamReader readFile = new InputStreamReader(fileInput );
     40             BufferedReader br = new BufferedReader(readFile);
     41             String line = null;
     42             while((line=br.readLine())!=null){
     43                 //一行数据格式为P0001,xiaomi(商品id,商品名称)
     44                 String[] fields = line.split(",");
     45                 produceMap.put(fields[0], fields[1]);
     46             }
     47         }
     48         @Override
     49         protected void map(LongWritable key, Text value, Context context)
     50                 throws IOException, InterruptedException {
     51             //一行订单数据    格式为 1001,20170710,P0001,1(订单id,创建时间,商品id,购买商品数量)
     52             String line = value.toString();
     53             String[] fields = line.split(",");
     54             //根据订单数据中商品id在缓存中找出来对应商品信息(商品名称),进行串接
     55             String productName = produceMap.get(fields[2]);
     56             k.set(line+","+productName);
     57             context.write(NullWritable.get(), k );
     58         }
     59     }
     60     
     61     public int run(String[] args) throws Exception {
     62 
     63     // step 1:get configuration
     64     Configuration conf = this.getConf();
     65         //set job
     66     Job job = Job.getInstance(conf);
     67         job.setJarByClass(MapJoin.class);
     68 
     69         job.setMapperClass(MapJoinMapper.class);
     70         job.setMapOutputKeyClass(Text.class);
     71         job.setMapOutputValueClass(NullWritable.class);
     72 
     73         //设置最终输出类型
     74         job.setOutputKeyClass(Text.class);
     75         job.setOutputValueClass(NullWritable.class);
     76 
     77         //将产品表文件缓存到task工作节点的工作目录中去
     78         //缓存普通文件到task运行节点的工作目录(hadoop帮我们完成)
     79         job.addCacheFile(new URI("hdfs://beifeng01:8020/user/beifeng01/mapreduce/input/mapjoin/product.data"));
     80 
     81         //不需要reduce,那么也就没有了shuffle过程
     82         job.setNumReduceTasks(0);
     83 
     84         FileInputFormat.setInputPaths(job, new Path(args[0]));
     85         FileOutputFormat.setOutputPath(job, new Path(args[1]));
     86 
     87         boolean isSuccess = job.waitForCompletion(true);
     88         
     89         return isSuccess ? 0 : 1;
     90     }
     91     
     92     public static void main(String[] args) throws Exception {
     93         args = new String[]{
     94                 "hdfs://beifeng01:8020/user/beifeng01/mapreduce/input/mapjoin/orderid.data",
     95                 "hdfs://beifeng01:8020/user/beifeng01/mapreduce/output4"
     96         };
     97         
     98         Configuration conf = new Configuration();
     99         
    100         // run mapreduce
    101         int status = ToolRunner.run(conf, new MapJoin(), args);
    102         
    103         // exit program
    104         System.exit(status);
    105     }
    106 }

    运行代码后查看输出结果

    [hadoop@beifeng01 hadoop-2.5.0-cdh5.3.6]$ bin/hdfs dfs -text /user/beifeng01/mapreduce/output4/p*
    1001,20170710,P0001,1,xiaomi 
    1002,20170710,P0001,3,xiaomi 
    1003,20170710,P0002,3,huawei
    1004,20170710,P0002,4,huawei
    

      

  • 相关阅读:
    windows系统切换jdk,修改java_home无效情况
    Cannot instantiate interface org.springframework.context.ApplicationListener
    MySQL分组查询获取每个学生前n条分数记录(分组查询前n条记录)
    ASP.NET Web API 使用Swagger生成在线帮助测试文档,支持多个GET
    EF TO MYSQL 无法查询中文的解决方法
    HttpWebRequest post请求获取webservice void数据信息
    This implementation is not part of the Windows Platform FIPS validated cryptographic algorithms. 此实现不是 Windows 平台 FIPS 验证的加密算法的一部分 解决方案
    MySQL 5.7.13解压版安装记录 mysql无法启动教程
    C# udpclient 发送数据断网后自动连接的方法
    汽车XX网站秒杀抢购代码
  • 原文地址:https://www.cnblogs.com/perfectdata/p/10125351.html
Copyright © 2011-2022 走看看