zoukankan      html  css  js  c++  java
  • 工作采坑札记:2. Hadoop中MultipleInputs的使用陷阱

    1. 背景

    近日在一个Hadoop项目中使用MultipleInputs增加多输入文件时,发现相同路径仅会加载一次,导致后续的统计任务严重失真。本博文旨在记录异常的排查及解决方案。

    2. 情景重现

    (1) 准备简版的输入文件test,文件内容为"i am ws",输入的HDFS路径为/work/justTest/test

    (2) 源码信息如下,主要是wordCount实现,其中/work/justTest/test作为输入路径,被输入两次:

      1 package com.ws.test;
      2 
      3 import java.io.IOException;
      4 
      5 import org.apache.hadoop.conf.Configuration;
      6 import org.apache.hadoop.fs.Path;
      7 import org.apache.hadoop.io.LongWritable;
      8 import org.apache.hadoop.io.Text;
      9 import org.apache.hadoop.mapreduce.Job;
     10 import org.apache.hadoop.mapreduce.Mapper;
     11 import org.apache.hadoop.mapreduce.Reducer;
     12 import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
     13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
     14 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     15 
     16 public class MutilInputTest {
     17 
     18 	public static void main(String[] args) {
     19 		testMultiInputs();
     20 	}
     21 
     22 	/**
     23 	 * 测试方法
     24 	 */
     25 	public static void testMultiInputs() {
     26 
     27 		Configuration conf = new Configuration();
     28 
     29 		conf.set("mapreduce.job.queuename", "default");
     30 		conf.setBoolean("mapreduce.map.output.compress", true);
     31 		conf.setFloat("mapreduce.job.reduce.slowstart.completedmaps", 0.995f);
     32 		conf.setInt("mapreduce.task.timeout",0);
     33 		conf.setFloat("mapreduce.reduce.shuffle.input.buffer.percent",0.40f);
     34 
     35 		String input = "/work/justTest/test";
     36 		try {
     37 			createMultiInputsTestJob(conf,
     38 					input , Test1Mapper.class,
     39 					input , Test2Mapper.class,
     40 					"/work/justTest/temp", 2, TestReduce.class)
     41 					.waitForCompletion(true);
     42 		} catch (Exception e) {
     43 			e.printStackTrace();
     44 		}
     45 	}
     46 
     47 	/**
     48 	 * 任务构建
     49 	 * @param conf
     50 	 * @param input1
     51 	 * @param mapper1
     52 	 * @param input2
     53 	 * @param mapper2
     54 	 * @param outputDir
     55 	 * @param reduceNum
     56 	 * @param reducer
     57 	 * @return
     58 	 */
     59 	static Job createMultiInputsTestJob(Configuration conf,
     60 			String input1, Class<? extends Mapper> mapper1,
     61 			String input2, Class<? extends Mapper> mapper2,
     62 			String outputDir,
     63 			int reduceNum, Class<? extends Reducer> reducer) {
     64 		try {
     65 			Job job = new Job(conf);
     66 			job.setJobName("MultiInputsTest");
     67 			job.setJarByClass(MutilInputTest.class);
     68 
     69 			job.setNumReduceTasks(reduceNum);
     70 			job.setReducerClass(reducer);
     71 
     72 			job.setInputFormatClass(TextInputFormat.class);
     73 			MultipleInputs.addInputPath(job, new Path(input1), TextInputFormat.class, mapper1);
     74 			MultipleInputs.addInputPath(job, new Path(input2), TextInputFormat.class, mapper2);
     75 
     76 			Path outputPath = new Path(outputDir);
     77 			outputPath.getFileSystem(conf).delete(outputPath, true);
     78 
     79 			job.setOutputFormatClass(TextOutputFormat.class);
     80 			TextOutputFormat.setOutputPath(job, outputPath);
     81 
     82 			job.setMapOutputKeyClass(Text.class);
     83 			job.setMapOutputValueClass(Text.class);
     84 
     85 			job.setOutputKeyClass(Text.class);
     86 			job.setOutputValueClass(Text.class);
     87 
     88 			return job;
     89 		} catch (Exception e) {
     90 			return null;
     91 		}
     92 	}
     93 
     94 	/**
     95 	 * Mapper类
     96 	 *
     97 	 */
     98 	static class Test1Mapper extends Mapper<LongWritable, Text, Text, Text> {
     99 		Context context;
    100 
    101 		String type;
    102 
    103 		@Override
    104 		protected void setup(Context context) throws IOException,
    105 				InterruptedException {
    106 			this.context = context;
    107 			this.type = getDataType();
    108 			super.setup(context);
    109 		}
    110 
    111 		@Override
    112 		protected void map(LongWritable key, Text value, Context context)
    113 				throws IOException, InterruptedException {
    114 			String[] words = value.toString().split("");
    115 			for(String word : words){
    116 				context.getCounter(this.type+"_map_total", "input").increment(1);
    117 				context.write(new Text(word), new Text("1"));
    118 			}
    119 		}
    120 
    121 		protected String getDataType(){
    122 			return "test1";
    123 		}
    124 	}
    125 
    126 	/**
    127 	 * Mapper类继承
    128 	 *
    129 	 */
    130 	static class Test2Mapper extends Test1Mapper{
    131 		@Override
    132 		protected String getDataType() {
    133 			return "test2";
    134 		}
    135 	}
    136 
    137 	/**
    138 	 * Reduce类
    139 	 *
    140 	 */
    141 	static class TestReduce extends Reducer<Text, Text, Text, Text> {
    142 		@Override
    143 		protected void reduce(Text key, Iterable<Text> values, Context context)
    144 				throws IOException, InterruptedException {
    145 			int total = 0;
    146 			while(values.iterator().hasNext()){
    147 				total += Integer.parseInt(values.iterator().next().toString());
    148 			}
    149 			context.getCounter("reduce_total", key.toString()+"_"+total).increment(1);
    150 		}
    151 	}
    152 
    153 }
    154 
    View Code

    (3) 任务执行记录如下:

      1 18/08/12 21:33:57 INFO client.RMProxy: Connecting to ResourceManager at bd-001/192.168.86.41:8032
      2 18/08/12 21:33:58 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
      3 18/08/12 21:33:59 INFO input.FileInputFormat: Total input paths to process : 1
      4 18/08/12 21:33:59 INFO mapreduce.JobSubmitter: number of splits:1
      5 18/08/12 21:34:00 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1527582903778_39623
      6 18/08/12 21:34:00 INFO impl.YarnClientImpl: Submitted application application_1527582903778_39623
      7 18/08/12 21:34:00 INFO mapreduce.Job: The url to track the job: http://bd-001:8088/proxy/application_1527582903778_39623/
      8 18/08/12 21:34:00 INFO mapreduce.Job: Running job: job_1527582903778_39623
      9 18/08/12 21:34:06 INFO mapreduce.Job: Job job_1527582903778_39623 running in uber mode : false
     10 18/08/12 21:34:06 INFO mapreduce.Job:  map 0% reduce 0%
     11 18/08/12 21:34:12 INFO mapreduce.Job:  map 100% reduce 0%
     12 18/08/12 21:34:17 INFO mapreduce.Job:  map 100% reduce 50%
     13 18/08/12 21:34:22 INFO mapreduce.Job:  map 100% reduce 100%
     14 18/08/12 21:34:22 INFO mapreduce.Job: Job job_1527582903778_39623 completed successfully
     15 18/08/12 21:34:22 INFO mapreduce.Job: Counters: 53
     16 	File System Counters
     17 		FILE: Number of bytes read=64
     18 		FILE: Number of bytes written=271730
     19 		FILE: Number of read operations=0
     20 		FILE: Number of large read operations=0
     21 		FILE: Number of write operations=0
     22 		HDFS: Number of bytes read=263
     23 		HDFS: Number of bytes written=0
     24 		HDFS: Number of read operations=9
     25 		HDFS: Number of large read operations=0
     26 		HDFS: Number of write operations=4
     27 	Job Counters
     28 		Launched map tasks=1
     29 		Launched reduce tasks=2
     30 		Rack-local map tasks=1
     31 		Total time spent by all maps in occupied slots (ms)=14760
     32 		Total time spent by all reduces in occupied slots (ms)=49344
     33 		Total time spent by all map tasks (ms)=3690
     34 		Total time spent by all reduce tasks (ms)=6168
     35 		Total vcore-seconds taken by all map tasks=3690
     36 		Total vcore-seconds taken by all reduce tasks=6168
     37 		Total megabyte-seconds taken by all map tasks=15114240
     38 		Total megabyte-seconds taken by all reduce tasks=50528256
     39 	Map-Reduce Framework
     40 		Map input records=1
     41 		Map output records=3
     42 		Map output bytes=14
     43 		Map output materialized bytes=48
     44 		Input split bytes=255
     45 		Combine input records=0
     46 		Combine output records=0
     47 		Reduce input groups=3
     48 		Reduce shuffle bytes=48
     49 		Reduce input records=3
     50 		Reduce output records=0
     51 		Spilled Records=6
     52 		Shuffled Maps =2
     53 		Failed Shuffles=0
     54 		Merged Map outputs=2
     55 		GC time elapsed (ms)=183
     56 		CPU time spent (ms)=3150
     57 		Physical memory (bytes) snapshot=1009094656
     58 		Virtual memory (bytes) snapshot=24295927808
     59 		Total committed heap usage (bytes)=2306867200
     60 	Shuffle Errors
     61 		BAD_ID=0
     62 		CONNECTION=0
     63 		IO_ERROR=0
     64 		WRONG_LENGTH=0
     65 		WRONG_MAP=0
     66 		WRONG_REDUCE=0
     67 	File Input Format Counters
     68 		Bytes Read=0
     69 	File Output Format Counters
     70 		Bytes Written=0
     71 	reduce_total
     72 		am_1=1
     73 		i_1=1
     74 		ws_1=1
     75 	test2_map_total
     76 		input=3
    View Code

    从日志中可以看出: 1)第三行显示"需要处理的总输入路径为1",2) map阶段的计数器显示总共的输入词数为3,且仅有test2相关计数,reduce阶段的计数器显示单词个数均为1。

    由此,会引发疑问,为什么明明输入两个相同文件,hadoop仅检测到只有一个文件呢?

    3. 原因排查

    既然到Map、Reduce时的文件已经是仅有一个,因此需要在创建任务的时候进行排查。遂查看了与输入路径相关的MultipleInputs源码:

      1 @SuppressWarnings("unchecked")
      2   public static void addInputPath(Job job, Path path,
      3       Class<? extends InputFormat> inputFormatClass,
      4       Class<? extends Mapper> mapperClass) {
      5 
      6     addInputPath(job, path, inputFormatClass);
      7     Configuration conf = job.getConfiguration();
      8     String mapperMapping = path.toString() + ";" + mapperClass.getName();
      9     String mappers = conf.get(DIR_MAPPERS);
     10     conf.set(DIR_MAPPERS, mappers == null ? mapperMapping
     11        : mappers + "," + mapperMapping);
     12 
     13     job.setMapperClass(DelegatingMapper.class);
     14   }
     15 
     16 
     17 public static void addInputPath(Job job, Path path,
     18       Class<? extends InputFormat> inputFormatClass) {
     19     String inputFormatMapping = path.toString() + ";"
     20        + inputFormatClass.getName();
     21     Configuration conf = job.getConfiguration();
     22     String inputFormats = conf.get(DIR_FORMATS);
     23     conf.set(DIR_FORMATS,
     24        inputFormats == null ? inputFormatMapping : inputFormats + ","
     25            + inputFormatMapping);
     26 
     27     job.setInputFormatClass(DelegatingInputFormat.class);
     28   }
     29 
    View Code

    通过源码可以观察到,在设置DIR_FORMATS和DIR_MAPPERS属性时,均以"输入路径;文件格式类名或Mapper类名"的格式进行创建,而在Job运行前,对于相同数据路径会仅保留其中一个,且若传入路径相同,则仅保存最后一个调用MultipleInputs.addInputPath对应的配置信息。因此相应的解决方案是传入不同的路径

    4. 解决方案

    将相同的输入内容文件,作为不同的路径传入。

    (1) 不同之处的代码如下:

      1 public static void testMultiInputs() {
      2 
      3 		Configuration conf = new Configuration();
      4 
      5 		conf.set("mapreduce.job.queuename", "default");
      6 		conf.setBoolean("mapreduce.map.output.compress", true);
      7 		conf.setFloat("mapreduce.job.reduce.slowstart.completedmaps", 0.995f);
      8 		conf.setInt("mapreduce.task.timeout",0);
      9 		conf.setFloat("mapreduce.reduce.shuffle.input.buffer.percent",0.40f);
     10 
     11 		String input = "/work/justTest/";
     12 		try {
     13 			createMultiInputsTestJob(conf,
     14 					input+"test1", Test1Mapper.class,
     15 					input+"test2", Test2Mapper.class,
     16 					input+"/temp", 2, TestReduce.class)
     17 					.waitForCompletion(true);
     18 		} catch (Exception e) {
     19 			e.printStackTrace();
     20 		}
     21 	}
    View Code

    (2) 运行日志如下所示:

      1 18/08/12 21:58:15 INFO client.RMProxy: Connecting to ResourceManager at bd-001/192.168.86.41:8032
      2 18/08/12 21:58:15 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
      3 18/08/12 21:58:16 INFO input.FileInputFormat: Total input paths to process : 1
      4 18/08/12 21:58:16 INFO input.FileInputFormat: Total input paths to process : 1
      5 18/08/12 21:58:16 INFO mapreduce.JobSubmitter: number of splits:2
      6 18/08/12 21:58:17 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1527582903778_39628
      7 18/08/12 21:58:17 INFO impl.YarnClientImpl: Submitted application application_1527582903778_39628
      8 18/08/12 21:58:17 INFO mapreduce.Job: The url to track the job: http://bd-001:8088/proxy/application_1527582903778_39628/
      9 18/08/12 21:58:17 INFO mapreduce.Job: Running job: job_1527582903778_39628
     10 18/08/12 21:58:22 INFO mapreduce.Job: Job job_1527582903778_39628 running in uber mode : false
     11 18/08/12 21:58:22 INFO mapreduce.Job:  map 0% reduce 0%
     12 18/08/12 21:58:28 INFO mapreduce.Job:  map 100% reduce 0%
     13 18/08/12 21:58:34 INFO mapreduce.Job:  map 100% reduce 100%
     14 18/08/12 21:58:35 INFO mapreduce.Job: Job job_1527582903778_39628 completed successfully
     15 18/08/12 21:58:35 INFO mapreduce.Job: Counters: 55
     16 	File System Counters
     17 		FILE: Number of bytes read=66
     18 		FILE: Number of bytes written=362388
     19 		FILE: Number of read operations=0
     20 		FILE: Number of large read operations=0
     21 		FILE: Number of write operations=0
     22 		HDFS: Number of bytes read=528
     23 		HDFS: Number of bytes written=0
     24 		HDFS: Number of read operations=12
     25 		HDFS: Number of large read operations=0
     26 		HDFS: Number of write operations=4
     27 	Job Counters
     28 		Launched map tasks=2
     29 		Launched reduce tasks=2
     30 		Data-local map tasks=1
     31 		Rack-local map tasks=1
     32 		Total time spent by all maps in occupied slots (ms)=27332
     33 		Total time spent by all reduces in occupied slots (ms)=59792
     34 		Total time spent by all map tasks (ms)=6833
     35 		Total time spent by all reduce tasks (ms)=7474
     36 		Total vcore-seconds taken by all map tasks=6833
     37 		Total vcore-seconds taken by all reduce tasks=7474
     38 		Total megabyte-seconds taken by all map tasks=27987968
     39 		Total megabyte-seconds taken by all reduce tasks=61227008
     40 	Map-Reduce Framework
     41 		Map input records=2
     42 		Map output records=6
     43 		Map output bytes=28
     44 		Map output materialized bytes=96
     45 		Input split bytes=512
     46 		Combine input records=0
     47 		Combine output records=0
     48 		Reduce input groups=3
     49 		Reduce shuffle bytes=96
     50 		Reduce input records=6
     51 		Reduce output records=0
     52 		Spilled Records=12
     53 		Shuffled Maps =4
     54 		Failed Shuffles=0
     55 		Merged Map outputs=4
     56 		GC time elapsed (ms)=272
     57 		CPU time spent (ms)=4440
     58 		Physical memory (bytes) snapshot=1346195456
     59 		Virtual memory (bytes) snapshot=29357146112
     60 		Total committed heap usage (bytes)=3084910592
     61 	Shuffle Errors
     62 		BAD_ID=0
     63 		CONNECTION=0
     64 		IO_ERROR=0
     65 		WRONG_LENGTH=0
     66 		WRONG_MAP=0
     67 		WRONG_REDUCE=0
     68 	File Input Format Counters
     69 		Bytes Read=0
     70 	File Output Format Counters
     71 		Bytes Written=0
     72 	reduce_total
     73 		am_2=1
     74 		i_2=1
     75 		ws_2=1
     76 	test1_map_total
     77 		input=3
     78 	test2_map_total
     79 		input=3
    View Code

    (3) 通过日志可以看到,运行结果符合原始目标。

  • 相关阅读:
    BZOJ 1191 HNOI2006 超级英雄hero
    BZOJ 2442 Usaco2011 Open 修建草坪
    BZOJ 1812 IOI 2005 riv
    OJ 1159 holiday
    BZOJ 1491 NOI 2007 社交网络
    NOIP2014 D1 T3
    BZOJ 2423 HAOI 2010 最长公共子序列
    LCA模板
    NOIP 2015 D1T2信息传递
    数据结构
  • 原文地址:https://www.cnblogs.com/mengrennwpu/p/9463458.html
Copyright © 2011-2022 走看看