zoukankan      html  css  js  c++  java
  • mapreduce学习工程之五---map端join连接

    实验环境 win7 hadoop2.7.3本地模式

    实验数据:订单数据orders.txt,商品数据pdts.txt

    order.txt

    1001    pd001    300
    1002    pd002    20
    1003    pd003    40
    1004    pd002    50

    pdts.txt

    pd001    apple
    pd002    xiaomi
    pd003    cuizi

    实验解决的问题:解决mapreduce连接过程中的数据倾斜的问题,典型应用场景如下:在电商平台中,买小米手机和买苹果手机的订单数量很多,买锤子手机的订单数量很少,如

    果根据传统的mapreduce方法,3个reduce的数据将不均衡。比如接受小米的reduce接收到的数据会很多,接受锤子数据的reduce接收到的数据就会很少

    实验解决的思路:采用map端连接,直接将排序过程在map中执行,将商品信息加载在map信息中,引入mapreduce的输入缓存机制

    代码如图所示:

    package com.tianjie.mapsidejoin;
    
    import java.io.BufferedReader;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.URI;
    import java.net.URISyntaxException;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class MapSideJoin {
    
        static class MapSideJoinMappe extends Mapper<LongWritable, Text, Text, NullWritable>{
            
            
            //map 商品的订单信息k v key为商品编号,v为商品名称
            Map<String,String>     pdInfoMap = new HashMap<String, String>();
            Text ktext = new Text();
            
            
            /*setup 函数用来加载文件到hadoop缓存中
             * */
            protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context)
                    throws IOException, InterruptedException {
                
                //打开输入文本文件的路径,获得一个输入流
                BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("C:/Users/admin/Desktop/join/cache/pdts.txt")));
                String line;
                while(StringUtils.isNotEmpty(line = br.readLine())){
                    //获得商品信息表 k为商品编号,value为商品名称
                    String[] split = line.split("	");
                    pdInfoMap.put(split[0], split[1]);
                    
                }
                
            }
            /*
             * hadoop 的缓冲机制*/
            
            
            /*
             * map 函数的输入key value ,其中默认输入为TextInputFormat,
             *     key 为输入文本的偏移量,value为输入文本的值
             *     Text,NullWriable为map文件输入的值
             *     */
            protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
                    throws IOException, InterruptedException {
                
                //获得文本文件的一行
                String orderline  = value.toString();
                //将文本文件按照制表符切分
                String[] fields = orderline.split("	");
                //更具商品编号,获得商品名称
                String pdName = pdInfoMap.get(fields[1]);
                //获得商品的名字,将商品名称追加在文本文件中
                ktext.set(orderline+"	"+pdName);
                //将新的文本文件写出
                context.write(ktext, NullWritable.get());
            }
            
        }
            
        
        public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
            
            //得到hadoop的一个配置参数
            Configuration conf = new Configuration();
            //获取一个job实例
            Job job = Job.getInstance(conf);
            //加载job的运行类
            job.setJarByClass(MapSideJoin.class);
            
            //加载mapper的类
            job.setMapperClass(MapSideJoinMappe.class);
            //设置mapper类的输出类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
            
            //设置文件输入的路径
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            
            //设置文件的输出路径
            FileSystem fs = FileSystem.get(conf);
            Path path = new Path(args[1]);
            if(fs.isDirectory(path)){
                fs.delete(path, true);
            }
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            //指定需要缓冲一个文件到所有maptask运行节点工作目录
            //job.addArchiveToClassPath(""); 缓存jar包到task运行节点的classpath中
            //job.addFileToClassPath(file); 缓存普通文件到task运行节点的classpath中
            //job.addCacheArchive(uri);      缓存压缩包文件到task运行节点的工作目录中
            
            //1:缓存普通文件到task运行节点的工作目录
            job.addCacheFile(new URI("file:///C:/Users/admin/Desktop/join/cache/pdts.txt")); 
            
            //2:指定map端的加入逻辑不需要reduce阶段,设置reducetask数量为0
            job.setNumReduceTasks(0);
            
            //提交job任务,等待job任务的结束
            boolean res =job.waitForCompletion(true);
            System.exit(res?1:0);
            
            }
    }

    需要注意的点有:

    1:采用map端连接时,可以不适用reduce,这个时候可以设置reducetask 的数量为0:

    2:程序运行的结果:

  • 相关阅读:
    Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十六)Structured Streaming:WARN clients.NetworkClient: Error while fetching metadata with correlation id 1 : {my-topic=LEADER_NOT_AVAILABLE}
    Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十五)Structured Streaming:同一个topic中包含一组数据的多个部分,按照key它们拼接为一条记录(以及遇到的问题)。
    Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十四)Structured Streaming:Encoder
    Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十三)Structured Streaming遇到问题:Set(TopicName-0) are gone. Some data may have been missed
    Structured Streaming编程向导
    Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十二)Spark Streaming接收流数据及使用窗口函数
    Linux:磁盘挂载
    Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十一)NIFI1.7.1安装
    Spark参数设置的方式
    mydumper安装、原理介绍
  • 原文地址:https://www.cnblogs.com/fengdashen/p/6610953.html
Copyright © 2011-2022 走看看