1. 背景
近日在一个Hadoop项目中使用MultipleInputs增加多输入文件时,发现相同路径仅会加载一次,导致后续的统计任务严重失真。本博文旨在记录异常的排查及解决方案。
2. 情景重现
(1) 准备简版的输入文件test,文件内容为"i am ws",输入的HDFS路径为/work/justTest/test
(2) 源码信息如下,主要是wordCount实现,其中/work/justTest/test作为输入路径,被输入两次:
1 package com.ws.test; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; 13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 15 16 public class MutilInputTest { 17 18 public static void main(String[] args) { 19 testMultiInputs(); 20 } 21 22 /** 23 * 测试方法 24 */ 25 public static void testMultiInputs() { 26 27 Configuration conf = new Configuration(); 28 29 conf.set("mapreduce.job.queuename", "default"); 30 conf.setBoolean("mapreduce.map.output.compress", true); 31 conf.setFloat("mapreduce.job.reduce.slowstart.completedmaps", 0.995f); 32 conf.setInt("mapreduce.task.timeout",0); 33 conf.setFloat("mapreduce.reduce.shuffle.input.buffer.percent",0.40f); 34 35 String input = "/work/justTest/test"; 36 try { 37 createMultiInputsTestJob(conf, 38 input , Test1Mapper.class, 39 input , Test2Mapper.class, 40 "/work/justTest/temp", 2, TestReduce.class) 41 .waitForCompletion(true); 42 } catch (Exception e) { 43 e.printStackTrace(); 44 } 45 } 46 47 /** 48 * 任务构建 49 * @param conf 50 * @param input1 51 * @param mapper1 52 * @param input2 53 * @param mapper2 54 * @param outputDir 55 * @param reduceNum 56 * @param reducer 57 * @return 58 */ 59 static Job createMultiInputsTestJob(Configuration conf, 60 String input1, Class<? extends Mapper> mapper1, 61 String input2, Class<? extends Mapper> mapper2, 62 String outputDir, 63 int reduceNum, Class<? extends Reducer> reducer) { 64 try { 65 Job job = new Job(conf); 66 job.setJobName("MultiInputsTest"); 67 job.setJarByClass(MutilInputTest.class); 68 69 job.setNumReduceTasks(reduceNum); 70 job.setReducerClass(reducer); 71 72 job.setInputFormatClass(TextInputFormat.class); 73 MultipleInputs.addInputPath(job, new Path(input1), TextInputFormat.class, mapper1); 74 MultipleInputs.addInputPath(job, new Path(input2), TextInputFormat.class, mapper2); 75 76 Path outputPath = new Path(outputDir); 77 outputPath.getFileSystem(conf).delete(outputPath, true); 78 79 job.setOutputFormatClass(TextOutputFormat.class); 80 TextOutputFormat.setOutputPath(job, outputPath); 81 82 job.setMapOutputKeyClass(Text.class); 83 job.setMapOutputValueClass(Text.class); 84 85 job.setOutputKeyClass(Text.class); 86 job.setOutputValueClass(Text.class); 87 88 return job; 89 } catch (Exception e) { 90 return null; 91 } 92 } 93 94 /** 95 * Mapper类 96 * 97 */ 98 static class Test1Mapper extends Mapper<LongWritable, Text, Text, Text> { 99 Context context; 100 101 String type; 102 103 @Override 104 protected void setup(Context context) throws IOException, 105 InterruptedException { 106 this.context = context; 107 this.type = getDataType(); 108 super.setup(context); 109 } 110 111 @Override 112 protected void map(LongWritable key, Text value, Context context) 113 throws IOException, InterruptedException { 114 String[] words = value.toString().split(""); 115 for(String word : words){ 116 context.getCounter(this.type+"_map_total", "input").increment(1); 117 context.write(new Text(word), new Text("1")); 118 } 119 } 120 121 protected String getDataType(){ 122 return "test1"; 123 } 124 } 125 126 /** 127 * Mapper类继承 128 * 129 */ 130 static class Test2Mapper extends Test1Mapper{ 131 @Override 132 protected String getDataType() { 133 return "test2"; 134 } 135 } 136 137 /** 138 * Reduce类 139 * 140 */ 141 static class TestReduce extends Reducer<Text, Text, Text, Text> { 142 @Override 143 protected void reduce(Text key, Iterable<Text> values, Context context) 144 throws IOException, InterruptedException { 145 int total = 0; 146 while(values.iterator().hasNext()){ 147 total += Integer.parseInt(values.iterator().next().toString()); 148 } 149 context.getCounter("reduce_total", key.toString()+"_"+total).increment(1); 150 } 151 } 152 153 } 154
(3) 任务执行记录如下:
1 18/08/12 21:33:57 INFO client.RMProxy: Connecting to ResourceManager at bd-001/192.168.86.41:8032 2 18/08/12 21:33:58 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 3 18/08/12 21:33:59 INFO input.FileInputFormat: Total input paths to process : 1 4 18/08/12 21:33:59 INFO mapreduce.JobSubmitter: number of splits:1 5 18/08/12 21:34:00 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1527582903778_39623 6 18/08/12 21:34:00 INFO impl.YarnClientImpl: Submitted application application_1527582903778_39623 7 18/08/12 21:34:00 INFO mapreduce.Job: The url to track the job: http://bd-001:8088/proxy/application_1527582903778_39623/ 8 18/08/12 21:34:00 INFO mapreduce.Job: Running job: job_1527582903778_39623 9 18/08/12 21:34:06 INFO mapreduce.Job: Job job_1527582903778_39623 running in uber mode : false 10 18/08/12 21:34:06 INFO mapreduce.Job: map 0% reduce 0% 11 18/08/12 21:34:12 INFO mapreduce.Job: map 100% reduce 0% 12 18/08/12 21:34:17 INFO mapreduce.Job: map 100% reduce 50% 13 18/08/12 21:34:22 INFO mapreduce.Job: map 100% reduce 100% 14 18/08/12 21:34:22 INFO mapreduce.Job: Job job_1527582903778_39623 completed successfully 15 18/08/12 21:34:22 INFO mapreduce.Job: Counters: 53 16 File System Counters 17 FILE: Number of bytes read=64 18 FILE: Number of bytes written=271730 19 FILE: Number of read operations=0 20 FILE: Number of large read operations=0 21 FILE: Number of write operations=0 22 HDFS: Number of bytes read=263 23 HDFS: Number of bytes written=0 24 HDFS: Number of read operations=9 25 HDFS: Number of large read operations=0 26 HDFS: Number of write operations=4 27 Job Counters 28 Launched map tasks=1 29 Launched reduce tasks=2 30 Rack-local map tasks=1 31 Total time spent by all maps in occupied slots (ms)=14760 32 Total time spent by all reduces in occupied slots (ms)=49344 33 Total time spent by all map tasks (ms)=3690 34 Total time spent by all reduce tasks (ms)=6168 35 Total vcore-seconds taken by all map tasks=3690 36 Total vcore-seconds taken by all reduce tasks=6168 37 Total megabyte-seconds taken by all map tasks=15114240 38 Total megabyte-seconds taken by all reduce tasks=50528256 39 Map-Reduce Framework 40 Map input records=1 41 Map output records=3 42 Map output bytes=14 43 Map output materialized bytes=48 44 Input split bytes=255 45 Combine input records=0 46 Combine output records=0 47 Reduce input groups=3 48 Reduce shuffle bytes=48 49 Reduce input records=3 50 Reduce output records=0 51 Spilled Records=6 52 Shuffled Maps =2 53 Failed Shuffles=0 54 Merged Map outputs=2 55 GC time elapsed (ms)=183 56 CPU time spent (ms)=3150 57 Physical memory (bytes) snapshot=1009094656 58 Virtual memory (bytes) snapshot=24295927808 59 Total committed heap usage (bytes)=2306867200 60 Shuffle Errors 61 BAD_ID=0 62 CONNECTION=0 63 IO_ERROR=0 64 WRONG_LENGTH=0 65 WRONG_MAP=0 66 WRONG_REDUCE=0 67 File Input Format Counters 68 Bytes Read=0 69 File Output Format Counters 70 Bytes Written=0 71 reduce_total 72 am_1=1 73 i_1=1 74 ws_1=1 75 test2_map_total 76 input=3
从日志中可以看出: 1)第三行显示"需要处理的总输入路径为1",2) map阶段的计数器显示总共的输入词数为3,且仅有test2相关计数,reduce阶段的计数器显示单词个数均为1。
由此,会引发疑问,为什么明明输入两个相同文件,hadoop仅检测到只有一个文件呢?
3. 原因排查
既然到Map、Reduce时的文件已经是仅有一个,因此需要在创建任务的时候进行排查。遂查看了与输入路径相关的MultipleInputs源码:
1 @SuppressWarnings("unchecked") 2 public static void addInputPath(Job job, Path path, 3 Class<? extends InputFormat> inputFormatClass, 4 Class<? extends Mapper> mapperClass) { 5 6 addInputPath(job, path, inputFormatClass); 7 Configuration conf = job.getConfiguration(); 8 String mapperMapping = path.toString() + ";" + mapperClass.getName(); 9 String mappers = conf.get(DIR_MAPPERS); 10 conf.set(DIR_MAPPERS, mappers == null ? mapperMapping 11 : mappers + "," + mapperMapping); 12 13 job.setMapperClass(DelegatingMapper.class); 14 } 15 16 17 public static void addInputPath(Job job, Path path, 18 Class<? extends InputFormat> inputFormatClass) { 19 String inputFormatMapping = path.toString() + ";" 20 + inputFormatClass.getName(); 21 Configuration conf = job.getConfiguration(); 22 String inputFormats = conf.get(DIR_FORMATS); 23 conf.set(DIR_FORMATS, 24 inputFormats == null ? inputFormatMapping : inputFormats + "," 25 + inputFormatMapping); 26 27 job.setInputFormatClass(DelegatingInputFormat.class); 28 } 29
通过源码可以观察到,在设置DIR_FORMATS和DIR_MAPPERS属性时,均以"输入路径;文件格式类名或Mapper类名"的格式进行创建,而在Job运行前,对于相同数据路径会仅保留其中一个,且若传入路径相同,则仅保存最后一个调用MultipleInputs.addInputPath对应的配置信息。因此相应的解决方案是传入不同的路径。
4. 解决方案
将相同的输入内容文件,作为不同的路径传入。
(1) 不同之处的代码如下:
1 public static void testMultiInputs() { 2 3 Configuration conf = new Configuration(); 4 5 conf.set("mapreduce.job.queuename", "default"); 6 conf.setBoolean("mapreduce.map.output.compress", true); 7 conf.setFloat("mapreduce.job.reduce.slowstart.completedmaps", 0.995f); 8 conf.setInt("mapreduce.task.timeout",0); 9 conf.setFloat("mapreduce.reduce.shuffle.input.buffer.percent",0.40f); 10 11 String input = "/work/justTest/"; 12 try { 13 createMultiInputsTestJob(conf, 14 input+"test1", Test1Mapper.class, 15 input+"test2", Test2Mapper.class, 16 input+"/temp", 2, TestReduce.class) 17 .waitForCompletion(true); 18 } catch (Exception e) { 19 e.printStackTrace(); 20 } 21 }
(2) 运行日志如下所示:
1 18/08/12 21:58:15 INFO client.RMProxy: Connecting to ResourceManager at bd-001/192.168.86.41:8032 2 18/08/12 21:58:15 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 3 18/08/12 21:58:16 INFO input.FileInputFormat: Total input paths to process : 1 4 18/08/12 21:58:16 INFO input.FileInputFormat: Total input paths to process : 1 5 18/08/12 21:58:16 INFO mapreduce.JobSubmitter: number of splits:2 6 18/08/12 21:58:17 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1527582903778_39628 7 18/08/12 21:58:17 INFO impl.YarnClientImpl: Submitted application application_1527582903778_39628 8 18/08/12 21:58:17 INFO mapreduce.Job: The url to track the job: http://bd-001:8088/proxy/application_1527582903778_39628/ 9 18/08/12 21:58:17 INFO mapreduce.Job: Running job: job_1527582903778_39628 10 18/08/12 21:58:22 INFO mapreduce.Job: Job job_1527582903778_39628 running in uber mode : false 11 18/08/12 21:58:22 INFO mapreduce.Job: map 0% reduce 0% 12 18/08/12 21:58:28 INFO mapreduce.Job: map 100% reduce 0% 13 18/08/12 21:58:34 INFO mapreduce.Job: map 100% reduce 100% 14 18/08/12 21:58:35 INFO mapreduce.Job: Job job_1527582903778_39628 completed successfully 15 18/08/12 21:58:35 INFO mapreduce.Job: Counters: 55 16 File System Counters 17 FILE: Number of bytes read=66 18 FILE: Number of bytes written=362388 19 FILE: Number of read operations=0 20 FILE: Number of large read operations=0 21 FILE: Number of write operations=0 22 HDFS: Number of bytes read=528 23 HDFS: Number of bytes written=0 24 HDFS: Number of read operations=12 25 HDFS: Number of large read operations=0 26 HDFS: Number of write operations=4 27 Job Counters 28 Launched map tasks=2 29 Launched reduce tasks=2 30 Data-local map tasks=1 31 Rack-local map tasks=1 32 Total time spent by all maps in occupied slots (ms)=27332 33 Total time spent by all reduces in occupied slots (ms)=59792 34 Total time spent by all map tasks (ms)=6833 35 Total time spent by all reduce tasks (ms)=7474 36 Total vcore-seconds taken by all map tasks=6833 37 Total vcore-seconds taken by all reduce tasks=7474 38 Total megabyte-seconds taken by all map tasks=27987968 39 Total megabyte-seconds taken by all reduce tasks=61227008 40 Map-Reduce Framework 41 Map input records=2 42 Map output records=6 43 Map output bytes=28 44 Map output materialized bytes=96 45 Input split bytes=512 46 Combine input records=0 47 Combine output records=0 48 Reduce input groups=3 49 Reduce shuffle bytes=96 50 Reduce input records=6 51 Reduce output records=0 52 Spilled Records=12 53 Shuffled Maps =4 54 Failed Shuffles=0 55 Merged Map outputs=4 56 GC time elapsed (ms)=272 57 CPU time spent (ms)=4440 58 Physical memory (bytes) snapshot=1346195456 59 Virtual memory (bytes) snapshot=29357146112 60 Total committed heap usage (bytes)=3084910592 61 Shuffle Errors 62 BAD_ID=0 63 CONNECTION=0 64 IO_ERROR=0 65 WRONG_LENGTH=0 66 WRONG_MAP=0 67 WRONG_REDUCE=0 68 File Input Format Counters 69 Bytes Read=0 70 File Output Format Counters 71 Bytes Written=0 72 reduce_total 73 am_2=1 74 i_2=1 75 ws_2=1 76 test1_map_total 77 input=3 78 test2_map_total 79 input=3
(3) 通过日志可以看到,运行结果符合原始目标。