zoukankan      html  css  js  c++  java
  • MR案例:Map-Join

    适用场景:一张表十分小【key不可重复】、一张表非常大。
    用法:在Job提交时,首先将小表加载到 DistributedCache 分布式缓存中,然后从DistributeCache中读取小表解析成 key/value 保存到内存中(可以放在Hash Map等容器中)。然后扫描大表中的每条记录的 key 是否能在内存中找到相同 join key 的记录,如果有则直接输出结果。

    package join.map;
    
    import java.io.BufferedReader;
    import java.io.FileReader;
    import java.io.IOException;
    import java.util.HashMap;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * map-join中小表的数据如下:
     * 
     *    1    Beijing
     *    2    Guangzhou
     *    3    Shenzhen
     *    4    Xian
     * 
     * 大表的数据如下:
     * 
     *    Beijing Red Star              1
     *    Shenzhen Thunder              3
     *    Guangzhou Honda               2
     *    Beijing Rising                1
     *    Guangzhou Development Bank    2
     *    Tencent                       3
     *    Back of Beijing               1
     */
    public class MapJoin {    
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            job.setJarByClass(MapJoin2.class);
    //此方法已过时,被job.addCacheFile()所取代 //DistributedCache.addCacheFile(new URI("hdfs://10.16.17.182:9000/test/in/address.txt"), conf);
    //加载小表到 分布式缓存DistributedCache      job.addCacheFile(new Path(args[0]).toUri());
    job.setMapperClass(MJMapper.
    class); job.setNumReduceTasks(0);
    job.setMapOutputKeyClass(Text.
    class); job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.
    class); job.setOutputValueClass(Text.class);
    FileInputFormat.addInputPath(job,
    new Path(args[1])); FileOutputFormat.setOutputPath(job, new Path(args[2]));
    System.exit(job.waitForCompletion(
    true)? 0:1); } public static class MJMapper extends Mapper<LongWritable, Text, Text, Text>{ /** * 此map是存放小表数据用的 * 注意小表的key是不能重复的,类似与数据库的外键表 * 在这里的小表,就相当于一个外键表 * **/ private HashMap<String, String> map=new HashMap<String, String>(); @Override protected void setup(Context context) throws IOException, InterruptedException { BufferedReader br=null; // 读取文件流 String line; // 获取DistributedCached里面 的共享文件         Path[] paths = context.getLocalCacheFiles(); for(Path path : paths){ if(path.getName().indexOf("address") >= 0){ //如果是 address文件             br=new BufferedReader(new FileReader(path.toString())); while((line=br.readLine()) != null){ //读取文件中的每一行                String[] splited = line.split(" "); map.put(splited[0], splited[1]); //将小表解析成 key/value 存放进map } } } } /** * map阶段读取并处理大表中的数据 * 小表中的数据是加载到HashMap中的,无需从hdfs读取 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if(value==null || ("").equals(value.toString())){ //跳过空值 return; } String[] splited = value.toString().split(" ");
    if(map.get(splited[1]) != null){ //map中大表的 key 对应的 value 不为空
    Text keyOut = new Text(splited[0]); //key=大表的第一列
    Text valueOut = new Text(map.get(splited[1])); //value=小表的第二列
    context.write(keyOut, valueOut); } } } }

    更多参考分布式缓存DistributedCache

  • 相关阅读:
    将博客部署到k3s中
    docker/docker swarm学习
    windows共享文件夹使用中的问题(SMB协议)
    洛谷P1280 尼克的任务 题解 动态规划/最短路
    CF1B.Spreadsheets(电子表格) 题解 模拟
    洛谷P1595 信封问题 题解 错排问题
    洛谷P1809 过河问题 经典贪心问题
    CF1238E.Keyboard Purchase 题解 状压/子集划分DP
    洛谷P2719 搞笑世界杯 题解 概率DP入门
    Treap(树堆)入门
  • 原文地址:https://www.cnblogs.com/skyl/p/4748121.html
Copyright © 2011-2022 走看看