zoukankan      html  css  js  c++  java
  • 工作采坑札记:2. Hadoop中MultipleInputs的使用陷阱

    1. 背景

    近日在一个Hadoop项目中使用MultipleInputs增加多输入文件时,发现相同路径仅会加载一次,导致后续的统计任务严重失真。本博文旨在记录异常的排查及解决方案。

    2. 情景重现

    (1) 准备简版的输入文件test,文件内容为"i am ws",输入的HDFS路径为/work/justTest/test

    (2) 源码信息如下,主要是wordCount实现,其中/work/justTest/test作为输入路径,被输入两次:

      1 package com.ws.test;
      2 
      3 import java.io.IOException;
      4 
      5 import org.apache.hadoop.conf.Configuration;
      6 import org.apache.hadoop.fs.Path;
      7 import org.apache.hadoop.io.LongWritable;
      8 import org.apache.hadoop.io.Text;
      9 import org.apache.hadoop.mapreduce.Job;
     10 import org.apache.hadoop.mapreduce.Mapper;
     11 import org.apache.hadoop.mapreduce.Reducer;
     12 import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
     13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
     14 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     15 
     16 public class MutilInputTest {
     17 
     18 	public static void main(String[] args) {
     19 		testMultiInputs();
     20 	}
     21 
     22 	/**
     23 	 * 测试方法
     24 	 */
     25 	public static void testMultiInputs() {
     26 
     27 		Configuration conf = new Configuration();
     28 
     29 		conf.set("mapreduce.job.queuename", "default");
     30 		conf.setBoolean("mapreduce.map.output.compress", true);
     31 		conf.setFloat("mapreduce.job.reduce.slowstart.completedmaps", 0.995f);
     32 		conf.setInt("mapreduce.task.timeout",0);
     33 		conf.setFloat("mapreduce.reduce.shuffle.input.buffer.percent",0.40f);
     34 
     35 		String input = "/work/justTest/test";
     36 		try {
     37 			createMultiInputsTestJob(conf,
     38 					input , Test1Mapper.class,
     39 					input , Test2Mapper.class,
     40 					"/work/justTest/temp", 2, TestReduce.class)
     41 					.waitForCompletion(true);
     42 		} catch (Exception e) {
     43 			e.printStackTrace();
     44 		}
     45 	}
     46 
     47 	/**
     48 	 * 任务构建
     49 	 * @param conf
     50 	 * @param input1
     51 	 * @param mapper1
     52 	 * @param input2
     53 	 * @param mapper2
     54 	 * @param outputDir
     55 	 * @param reduceNum
     56 	 * @param reducer
     57 	 * @return
     58 	 */
     59 	static Job createMultiInputsTestJob(Configuration conf,
     60 			String input1, Class<? extends Mapper> mapper1,
     61 			String input2, Class<? extends Mapper> mapper2,
     62 			String outputDir,
     63 			int reduceNum, Class<? extends Reducer> reducer) {
     64 		try {
     65 			Job job = new Job(conf);
     66 			job.setJobName("MultiInputsTest");
     67 			job.setJarByClass(MutilInputTest.class);
     68 
     69 			job.setNumReduceTasks(reduceNum);
     70 			job.setReducerClass(reducer);
     71 
     72 			job.setInputFormatClass(TextInputFormat.class);
     73 			MultipleInputs.addInputPath(job, new Path(input1), TextInputFormat.class, mapper1);
     74 			MultipleInputs.addInputPath(job, new Path(input2), TextInputFormat.class, mapper2);
     75 
     76 			Path outputPath = new Path(outputDir);
     77 			outputPath.getFileSystem(conf).delete(outputPath, true);
     78 
     79 			job.setOutputFormatClass(TextOutputFormat.class);
     80 			TextOutputFormat.setOutputPath(job, outputPath);
     81 
     82 			job.setMapOutputKeyClass(Text.class);
     83 			job.setMapOutputValueClass(Text.class);
     84 
     85 			job.setOutputKeyClass(Text.class);
     86 			job.setOutputValueClass(Text.class);
     87 
     88 			return job;
     89 		} catch (Exception e) {
     90 			return null;
     91 		}
     92 	}
     93 
     94 	/**
     95 	 * Mapper类
     96 	 *
     97 	 */
     98 	static class Test1Mapper extends Mapper<LongWritable, Text, Text, Text> {
     99 		Context context;
    100 
    101 		String type;
    102 
    103 		@Override
    104 		protected void setup(Context context) throws IOException,
    105 				InterruptedException {
    106 			this.context = context;
    107 			this.type = getDataType();
    108 			super.setup(context);
    109 		}
    110 
    111 		@Override
    112 		protected void map(LongWritable key, Text value, Context context)
    113 				throws IOException, InterruptedException {
    114 			String[] words = value.toString().split("");
    115 			for(String word : words){
    116 				context.getCounter(this.type+"_map_total", "input").increment(1);
    117 				context.write(new Text(word), new Text("1"));
    118 			}
    119 		}
    120 
    121 		protected String getDataType(){
    122 			return "test1";
    123 		}
    124 	}
    125 
    126 	/**
    127 	 * Mapper类继承
    128 	 *
    129 	 */
    130 	static class Test2Mapper extends Test1Mapper{
    131 		@Override
    132 		protected String getDataType() {
    133 			return "test2";
    134 		}
    135 	}
    136 
    137 	/**
    138 	 * Reduce类
    139 	 *
    140 	 */
    141 	static class TestReduce extends Reducer<Text, Text, Text, Text> {
    142 		@Override
    143 		protected void reduce(Text key, Iterable<Text> values, Context context)
    144 				throws IOException, InterruptedException {
    145 			int total = 0;
    146 			while(values.iterator().hasNext()){
    147 				total += Integer.parseInt(values.iterator().next().toString());
    148 			}
    149 			context.getCounter("reduce_total", key.toString()+"_"+total).increment(1);
    150 		}
    151 	}
    152 
    153 }
    154 
    View Code

    (3) 任务执行记录如下:

      1 18/08/12 21:33:57 INFO client.RMProxy: Connecting to ResourceManager at bd-001/192.168.86.41:8032
      2 18/08/12 21:33:58 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
      3 18/08/12 21:33:59 INFO input.FileInputFormat: Total input paths to process : 1
      4 18/08/12 21:33:59 INFO mapreduce.JobSubmitter: number of splits:1
      5 18/08/12 21:34:00 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1527582903778_39623
      6 18/08/12 21:34:00 INFO impl.YarnClientImpl: Submitted application application_1527582903778_39623
      7 18/08/12 21:34:00 INFO mapreduce.Job: The url to track the job: http://bd-001:8088/proxy/application_1527582903778_39623/
      8 18/08/12 21:34:00 INFO mapreduce.Job: Running job: job_1527582903778_39623
      9 18/08/12 21:34:06 INFO mapreduce.Job: Job job_1527582903778_39623 running in uber mode : false
     10 18/08/12 21:34:06 INFO mapreduce.Job:  map 0% reduce 0%
     11 18/08/12 21:34:12 INFO mapreduce.Job:  map 100% reduce 0%
     12 18/08/12 21:34:17 INFO mapreduce.Job:  map 100% reduce 50%
     13 18/08/12 21:34:22 INFO mapreduce.Job:  map 100% reduce 100%
     14 18/08/12 21:34:22 INFO mapreduce.Job: Job job_1527582903778_39623 completed successfully
     15 18/08/12 21:34:22 INFO mapreduce.Job: Counters: 53
     16 	File System Counters
     17 		FILE: Number of bytes read=64
     18 		FILE: Number of bytes written=271730
     19 		FILE: Number of read operations=0
     20 		FILE: Number of large read operations=0
     21 		FILE: Number of write operations=0
     22 		HDFS: Number of bytes read=263
     23 		HDFS: Number of bytes written=0
     24 		HDFS: Number of read operations=9
     25 		HDFS: Number of large read operations=0
     26 		HDFS: Number of write operations=4
     27 	Job Counters
     28 		Launched map tasks=1
     29 		Launched reduce tasks=2
     30 		Rack-local map tasks=1
     31 		Total time spent by all maps in occupied slots (ms)=14760
     32 		Total time spent by all reduces in occupied slots (ms)=49344
     33 		Total time spent by all map tasks (ms)=3690
     34 		Total time spent by all reduce tasks (ms)=6168
     35 		Total vcore-seconds taken by all map tasks=3690
     36 		Total vcore-seconds taken by all reduce tasks=6168
     37 		Total megabyte-seconds taken by all map tasks=15114240
     38 		Total megabyte-seconds taken by all reduce tasks=50528256
     39 	Map-Reduce Framework
     40 		Map input records=1
     41 		Map output records=3
     42 		Map output bytes=14
     43 		Map output materialized bytes=48
     44 		Input split bytes=255
     45 		Combine input records=0
     46 		Combine output records=0
     47 		Reduce input groups=3
     48 		Reduce shuffle bytes=48
     49 		Reduce input records=3
     50 		Reduce output records=0
     51 		Spilled Records=6
     52 		Shuffled Maps =2
     53 		Failed Shuffles=0
     54 		Merged Map outputs=2
     55 		GC time elapsed (ms)=183
     56 		CPU time spent (ms)=3150
     57 		Physical memory (bytes) snapshot=1009094656
     58 		Virtual memory (bytes) snapshot=24295927808
     59 		Total committed heap usage (bytes)=2306867200
     60 	Shuffle Errors
     61 		BAD_ID=0
     62 		CONNECTION=0
     63 		IO_ERROR=0
     64 		WRONG_LENGTH=0
     65 		WRONG_MAP=0
     66 		WRONG_REDUCE=0
     67 	File Input Format Counters
     68 		Bytes Read=0
     69 	File Output Format Counters
     70 		Bytes Written=0
     71 	reduce_total
     72 		am_1=1
     73 		i_1=1
     74 		ws_1=1
     75 	test2_map_total
     76 		input=3
    View Code

    从日志中可以看出: 1)第三行显示"需要处理的总输入路径为1",2) map阶段的计数器显示总共的输入词数为3,且仅有test2相关计数,reduce阶段的计数器显示单词个数均为1。

    由此,会引发疑问,为什么明明输入两个相同文件,hadoop仅检测到只有一个文件呢?

    3. 原因排查

    既然到Map、Reduce时的文件已经是仅有一个,因此需要在创建任务的时候进行排查。遂查看了与输入路径相关的MultipleInputs源码:

      1 @SuppressWarnings("unchecked")
      2   public static void addInputPath(Job job, Path path,
      3       Class<? extends InputFormat> inputFormatClass,
      4       Class<? extends Mapper> mapperClass) {
      5 
      6     addInputPath(job, path, inputFormatClass);
      7     Configuration conf = job.getConfiguration();
      8     String mapperMapping = path.toString() + ";" + mapperClass.getName();
      9     String mappers = conf.get(DIR_MAPPERS);
     10     conf.set(DIR_MAPPERS, mappers == null ? mapperMapping
     11        : mappers + "," + mapperMapping);
     12 
     13     job.setMapperClass(DelegatingMapper.class);
     14   }
     15 
     16 
     17 public static void addInputPath(Job job, Path path,
     18       Class<? extends InputFormat> inputFormatClass) {
     19     String inputFormatMapping = path.toString() + ";"
     20        + inputFormatClass.getName();
     21     Configuration conf = job.getConfiguration();
     22     String inputFormats = conf.get(DIR_FORMATS);
     23     conf.set(DIR_FORMATS,
     24        inputFormats == null ? inputFormatMapping : inputFormats + ","
     25            + inputFormatMapping);
     26 
     27     job.setInputFormatClass(DelegatingInputFormat.class);
     28   }
     29 
    View Code

    通过源码可以观察到,在设置DIR_FORMATS和DIR_MAPPERS属性时,均以"输入路径;文件格式类名或Mapper类名"的格式进行创建,而在Job运行前,对于相同数据路径会仅保留其中一个,且若传入路径相同,则仅保存最后一个调用MultipleInputs.addInputPath对应的配置信息。因此相应的解决方案是传入不同的路径

    4. 解决方案

    将相同的输入内容文件,作为不同的路径传入。

    (1) 不同之处的代码如下:

      1 public static void testMultiInputs() {
      2 
      3 		Configuration conf = new Configuration();
      4 
      5 		conf.set("mapreduce.job.queuename", "default");
      6 		conf.setBoolean("mapreduce.map.output.compress", true);
      7 		conf.setFloat("mapreduce.job.reduce.slowstart.completedmaps", 0.995f);
      8 		conf.setInt("mapreduce.task.timeout",0);
      9 		conf.setFloat("mapreduce.reduce.shuffle.input.buffer.percent",0.40f);
     10 
     11 		String input = "/work/justTest/";
     12 		try {
     13 			createMultiInputsTestJob(conf,
     14 					input+"test1", Test1Mapper.class,
     15 					input+"test2", Test2Mapper.class,
     16 					input+"/temp", 2, TestReduce.class)
     17 					.waitForCompletion(true);
     18 		} catch (Exception e) {
     19 			e.printStackTrace();
     20 		}
     21 	}
    View Code

    (2) 运行日志如下所示:

      1 18/08/12 21:58:15 INFO client.RMProxy: Connecting to ResourceManager at bd-001/192.168.86.41:8032
      2 18/08/12 21:58:15 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
      3 18/08/12 21:58:16 INFO input.FileInputFormat: Total input paths to process : 1
      4 18/08/12 21:58:16 INFO input.FileInputFormat: Total input paths to process : 1
      5 18/08/12 21:58:16 INFO mapreduce.JobSubmitter: number of splits:2
      6 18/08/12 21:58:17 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1527582903778_39628
      7 18/08/12 21:58:17 INFO impl.YarnClientImpl: Submitted application application_1527582903778_39628
      8 18/08/12 21:58:17 INFO mapreduce.Job: The url to track the job: http://bd-001:8088/proxy/application_1527582903778_39628/
      9 18/08/12 21:58:17 INFO mapreduce.Job: Running job: job_1527582903778_39628
     10 18/08/12 21:58:22 INFO mapreduce.Job: Job job_1527582903778_39628 running in uber mode : false
     11 18/08/12 21:58:22 INFO mapreduce.Job:  map 0% reduce 0%
     12 18/08/12 21:58:28 INFO mapreduce.Job:  map 100% reduce 0%
     13 18/08/12 21:58:34 INFO mapreduce.Job:  map 100% reduce 100%
     14 18/08/12 21:58:35 INFO mapreduce.Job: Job job_1527582903778_39628 completed successfully
     15 18/08/12 21:58:35 INFO mapreduce.Job: Counters: 55
     16 	File System Counters
     17 		FILE: Number of bytes read=66
     18 		FILE: Number of bytes written=362388
     19 		FILE: Number of read operations=0
     20 		FILE: Number of large read operations=0
     21 		FILE: Number of write operations=0
     22 		HDFS: Number of bytes read=528
     23 		HDFS: Number of bytes written=0
     24 		HDFS: Number of read operations=12
     25 		HDFS: Number of large read operations=0
     26 		HDFS: Number of write operations=4
     27 	Job Counters
     28 		Launched map tasks=2
     29 		Launched reduce tasks=2
     30 		Data-local map tasks=1
     31 		Rack-local map tasks=1
     32 		Total time spent by all maps in occupied slots (ms)=27332
     33 		Total time spent by all reduces in occupied slots (ms)=59792
     34 		Total time spent by all map tasks (ms)=6833
     35 		Total time spent by all reduce tasks (ms)=7474
     36 		Total vcore-seconds taken by all map tasks=6833
     37 		Total vcore-seconds taken by all reduce tasks=7474
     38 		Total megabyte-seconds taken by all map tasks=27987968
     39 		Total megabyte-seconds taken by all reduce tasks=61227008
     40 	Map-Reduce Framework
     41 		Map input records=2
     42 		Map output records=6
     43 		Map output bytes=28
     44 		Map output materialized bytes=96
     45 		Input split bytes=512
     46 		Combine input records=0
     47 		Combine output records=0
     48 		Reduce input groups=3
     49 		Reduce shuffle bytes=96
     50 		Reduce input records=6
     51 		Reduce output records=0
     52 		Spilled Records=12
     53 		Shuffled Maps =4
     54 		Failed Shuffles=0
     55 		Merged Map outputs=4
     56 		GC time elapsed (ms)=272
     57 		CPU time spent (ms)=4440
     58 		Physical memory (bytes) snapshot=1346195456
     59 		Virtual memory (bytes) snapshot=29357146112
     60 		Total committed heap usage (bytes)=3084910592
     61 	Shuffle Errors
     62 		BAD_ID=0
     63 		CONNECTION=0
     64 		IO_ERROR=0
     65 		WRONG_LENGTH=0
     66 		WRONG_MAP=0
     67 		WRONG_REDUCE=0
     68 	File Input Format Counters
     69 		Bytes Read=0
     70 	File Output Format Counters
     71 		Bytes Written=0
     72 	reduce_total
     73 		am_2=1
     74 		i_2=1
     75 		ws_2=1
     76 	test1_map_total
     77 		input=3
     78 	test2_map_total
     79 		input=3
    View Code

    (3) 通过日志可以看到,运行结果符合原始目标。

  • 相关阅读:
    [编译原理读书笔记][第3章 词法分析]
    [编译原理读书笔记][第2章 一个简单的语法制导程序
    [编译原理读书笔记][第一章 引论]
    [Python爬虫笔记][随意找个博客入门(一)]
    大二下学习总结
    [操作系统][简单文件系统实现]
    Hadoop综合大作业
    hive基本操作与应用
    理解MapReduce计算构架
    熟悉HBase基本操作
  • 原文地址:https://www.cnblogs.com/mengrennwpu/p/9463458.html
Copyright © 2011-2022 走看看