zoukankan      html  css  js  c++  java
  • Map Join实战案例

                        Map Join实战案例

                                         作者:尹正杰

    版权声明:原创作品,谢绝转载!否则将追究法律责任。

    一.Map Join概述

      使用场景
        Map Join适用于一张表十分小、一张表很大的场景。
    
      优点:
        使用Map Join要比Reduce Join速度快的原因就是较少了Shuffle过程;
        思考:在Reduce端处理过多的表,非常容易产生数据倾斜,怎么办?
          在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。
        
      具体办法:采用DistributedCache
        (1)在Mapper的setup阶段,将文件读取到缓存集合中。
        (2)在驱动函数中加载缓存。

    二.Map Join案例

    1>.测试数据

      参考连接:
        https://www.cnblogs.com/yinzhengjie2020/p/12783256.html

    2>.实战案例代码

    package cn.org.yinzhengjie.mapjoin;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.*;
    import java.net.URI;
    import java.util.HashMap;
    import java.util.Map;
    
    public class MapJoinMapper extends Mapper<LongWritable,Text,Text,NullWritable> {
    
        //定义字典用户缓存文件数据
        private Map<String,String>  pMap = new HashMap<>();
    
        private Text k = new Text();
    
        //在各个MapTask开始运行之前做的初始化操作,仅运行一次
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
    
            //获取到缓存文件列表
            URI[] cacheFiles = context.getCacheFiles();
    
            //由于我们仅配置了一个缓存文件,因此默认取数组的第一个就可以拿到我们缓存的文件对象
            String path = cacheFiles[0].getPath().toString();
    
            //获取包装流
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(path),"UTF-8"));
    
            String line;
    
            while (StringUtils.isNotEmpty((line = bufferedReader.readLine()))){
                String[] fields = line.split("	");
                pMap.put(fields[0],fields[1]);
            }
    
            //关闭流(释放资源)
            IOUtils.closeStream(bufferedReader);
        }
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
            String[] fields = value.toString().split("	");
    
            //获取缓存数据
            String pname = pMap.get(fields[1]);
    
            if (pname == null) {
                pname = "NULL";
            }
    
            k.set(fields[0] + "	" + pname + "	" + fields[2]);
            context.write(k, NullWritable.get());
        }
    }
    MapJoinMapper.java
    package cn.org.yinzhengjie.mapjoin;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    import java.net.URI;
    
    public class MapJoinDriver {
        public static void main(String[] args) throws ClassNotFoundException, InterruptedException, IOException, IOException {
            //获取一个Job实例
            Job job = Job.getInstance(new Configuration());
    
            //设置我们的当前Driver类路径(classpath)
            job.setJarByClass(MapJoinDriver.class);
    
            //设置自定义的Mapper类路径(classpath)
            job.setMapperClass(MapJoinMapper.class);
    
            //由于咱们使用的是Map Join,压根不需要使用Reduce,因此我们不设置Reducer的类路径(classpath),但需要显示设置Reduce Task的数量为0
            job.setNumReduceTasks(0);
    
            //在驱动函数中加载缓存,缓存普通文件到Task运行节点。
            job.addCacheFile(URI.create("file:///E:/yinzhengjie/MapJoin/cache/pd.txt"));
    
            //设置输入数据
            FileInputFormat.setInputPaths(job,new Path(args[0]));
    
            //设置输出数据
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
    
            //提交我们的Job,返回结果是一个布尔值
            boolean result = job.waitForCompletion(true);
    
            //如果程序运行成功就打印"Task executed successfully!!!"
            if(result){
                System.out.println("Task executed successfully!!!");
            }else {
                System.out.println("Task execution failed...");
            }
    
            //如果程序是正常运行就返回0,否则就返回1
            System.exit(result ? 0 : 1);
        }
    }
    MapJoinDriver.java

    3>.运行MapJoinDriver.java代码

      参数设置:E:yinzhengjieMapJoininput E:yinzhengjieMapJoinoutput

  • 相关阅读:
    时间操作
    2021.10.9数据结构实验课作业
    2021.9.28数据结构实验课作业
    2021.9.15数据结构实验课作业
    2021.9.7数据结构实验课作业
    苹果ios开发,CocoaPods安装方法-2021/9/11
    写组件库文档常用的技术
    document.execCommand 的用法
    手写JSON.stringify
    Object常用方法
  • 原文地址:https://www.cnblogs.com/yinzhengjie2020/p/12811796.html
Copyright © 2011-2022 走看看