zoukankan      html  css  js  c++  java
  • MapReduce编程之Map Join多种应用场景与使用

    Map Join 实现方式一:分布式缓存

    ● 使用场景:一张表十分小、一张表很大。

    ● 用法:

    在提交作业的时候先将小表文件放到该作业的DistributedCache中,然后从DistributeCache中取出该小表进行join (比如放到Hash Map等等容器中)。然后扫描大表,看大表中的每条记录的join key /value值是否能够在内存中找到相同join key的记录,如果有则直接输出结果。

    DistributedCache是分布式缓存的一种实现,它在整个MapReduce框架中起着相当重要的作用,他可以支撑我们写一些相当复杂高效的分布式程序。说回到这里,JobTracker在作业启动之前会获取到DistributedCache的资源uri列表,并将对应的文件分发到各个涉及到该作业的任务的TaskTracker上。另外,关于DistributedCache和作业的关系,比如权限、存储路径区分、public和private等属性。

    代码实现

    package com.hadoop.reducejoin.test;
    
    import java.io.BufferedReader;
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.URI;
    import java.util.Hashtable;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.Reducer.Context;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    
    /*
     * 通过分布式缓存实现 map join
     * 适用场景:一个小表,一个大表
     */
    public class MapJoinByDistributedCache extends Configured implements Tool {
    
        /*
         * 直接在map 端进行join合并
         */
        public static class MapJoinMapper extends
                Mapper<LongWritable, Text, Text, Text> {
            private Hashtable<String, String> table = new Hashtable<String, String>();// 定义Hashtable存放缓存数据
    
            /**
             * 获取分布式缓存文件
             */
            @SuppressWarnings("deprecation")
            protected void setup(Context context) throws IOException,
                    InterruptedException {
                Path[] localPaths = (Path[]) context.getLocalCacheFiles();// 返回本地文件路径
                if (localPaths.length == 0) {
                    throw new FileNotFoundException(
                            "Distributed cache file not found.");
                }
                FileSystem fs = FileSystem.getLocal(context.getConfiguration());// 获取本地
                                                                                // FileSystem
                                                                                // 实例
                FSDataInputStream in = null;
    
                in = fs.open(new Path(localPaths[0].toString()));// 打开输入流
                BufferedReader br = new BufferedReader(new InputStreamReader(in));// 创建BufferedReader读取器
                String infoAddr = null;
                while (null != (infoAddr = br.readLine())) {// 按行读取并解析气象站数据
                    String[] records = infoAddr.split("	");
                    table.put(records[0], records[1]);// key为stationID,value为stationName
                }
            }
    
            public void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                String line = value.toString();
                String[] valueItems = line.split("\s+");
    //            使用下面一行将没有数据, StringUtils不能接正则,只能接分隔符
    //            String[] valueItems = StringUtils.split(value.toString(), "\s+");
                String stationName = table.get(valueItems[0]);// 天气记录根据stationId
                                                                // 获取stationName
                if (null != stationName)
                    context.write(new Text(stationName), value);
            }
    
        }
    
    
    
        public int run(String[] args) throws Exception {
            // TODO Auto-generated method stub
            Configuration conf = new Configuration();
    
            Path out = new Path(args[2]);
            FileSystem hdfs = out.getFileSystem(conf);// 创建输出路径
            if (hdfs.isDirectory(out)) {
                hdfs.delete(out, true);
            }
            Job job = Job.getInstance();// 获取一个job实例
            job.setJarByClass(MapJoinByDistributedCache.class);
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[2]));
            // 添加分布式缓存文件 station.txt
            job.addCacheFile(new URI(args[1]));
            job.setMapperClass(MapJoinMapper.class);
    
            job.setOutputKeyClass(Text.class);// 输出key类型
            job.setOutputValueClass(Text.class);// 输出value类型
            return job.waitForCompletion(true) ? 0 : 1;
        }
    
        public static void main(String[] args0) throws Exception {
            String[] args = { "hdfs://sparks:9000/middle/reduceJoin/records.txt",
                    "hdfs://sparks:9000/middle/reduceJoin/station.txt",
                    "hdfs://sparks:9000/middle/reduceJoin/MapJoinByDistributedCache-out" };
    
            int ec = ToolRunner.run(new Configuration(),
                    new MapJoinByDistributedCache(), args);
            System.exit(ec);
        }
    }
    MapJoinByDistributedCache

    Map Join 实现方式二:数据库 join

    ● 使用场景:一张表在数据库、一张表很大。

    另外还有一种比较变态的Map Join方式,就是结合HBase来做Map Join操作。这种方式完全可以突破内存的控制,使你毫无忌惮的使用Map Join,而且效率也非常不错。

  • 相关阅读:
    KL散度(相对熵)和交叉熵的区别
    将模型从 PyTorch 导出到 ONNX 并使用 ONNX Runtime 运行
    Numpy Boolean Indexing Mask(Numpy 布尔索引掩码 )
    python PIL 图像处理库(Pillow)简介
    YOLO v3 网络结构和源码详解
    PyTorch 下使用 Tensorboard
    Python vars() 函数
    python 的 Tqdm 模块
    Pytorch 中的模式设置:model.eval() 和 model.train()
    Pytorch 中的 zero_grad 使用方法
  • 原文地址:https://www.cnblogs.com/LiCheng-/p/7353860.html
Copyright © 2011-2022 走看看