本案例采用 MultipleInputs类 实现多路径输入的倒排索引。解读:MR多路径输入
package test0820; import java.io.IOException; import java.lang.reflect.Method; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WC0826 { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(WC0826.class); job.setMapperClass(IIMapper.class); job.setCombinerClass(IICombiner.class); job.setReducerClass(IIReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //使用MultipleInputs类指定多路径输入
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class); MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class); FileOutputFormat.setOutputPath(job, new Path(args[2])); System.exit(job.waitForCompletion(true)? 0:1); } //map public static class IIMapper extends Mapper<LongWritable, Text, Text, Text>{ String fileName; /** * 使用 MultipleInputs 获得 FileName 必须添加的类 */ private Path getFilePath(Context context) throws IOException { InputSplit split = context.getInputSplit(); Class<? extends InputSplit> splitClass = split.getClass(); FileSplit fileSplit = null; if (splitClass.equals(FileSplit.class)) { fileSplit = (FileSplit) split; } else if (splitClass.getName(). equals("org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit")) { // begin reflection hackery... try { Method getInputSplitMethod = splitClass.getDeclaredMethod("getInputSplit"); getInputSplitMethod.setAccessible(true); fileSplit = (FileSplit) getInputSplitMethod.invoke(split); } catch (Exception e) { // wrap and re-throw error throw new IOException(e); } // end reflection hackery } return fileSplit.getPath(); } @Override protected void setup(Context context) throws IOException, InterruptedException { //get file name fileName = getFilePath(context).getName(); } @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] splited = value.toString().split(" "); for(String word : splited){ Text word_fileName=new Text(word+"@"+fileName); context.write(word_fileName,new Text("1")); } } } //combiner public static class IICombiner extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> v2s, Context context) throws IOException, InterruptedException { Long sum = 0L; String value=new String(); String[] splited = key.toString().split("@"); for(Text vl :v2s){ sum += Long.parseLong(vl.toString()); value = splited[1]+"@"+sum.toString(); } context.write(new Text(splited[0]), new Text(value)); } } //reduce public static class IIReducer extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> v2s, Context context) throws IOException, InterruptedException { String value=new String(); for(Text text : v2s){ value = text.toString()+":"+value; } //去掉最后的":" context.write(key, new Text(value.substring(0, value.length()-1))); } } }
出现问题01:使用MultipleInputs类指定输入路径,当setup()方法中调用getInputSplit()方法获取当前split对应的FileName时会报IO异常:
Error: java.lang.ClassCastException: org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit cannot be cast to org.apache.hadoop.mapreduce.lib.input.FileSplit
问题原因01:filesplit实际上就是TaggedInputSplit中的成员变量inputSplit,而TaggedInputSplit类并不是public的(默认是default声明类型),所以不能直接获得对应的信息。
解决方案01:
- 第一种方法:在当前项目中新建对应的TaggedInputSplit类,并声明为public。即覆盖掉原有TaggedInputSplit类的声明类型。然后通过以下代码就可以正确调用:
(FileSplit)((TaggedInputSplit)reporter.getInputSplit()).getInputSplit();
- 第二种方法:通过反射机制。代码如下:
/** * 反射机制 * 使用 MultipleInputs 获得 FileName 必须添加的类 */ private Path getFilePath(Context context) throws IOException { InputSplit split = context.getInputSplit(); Class<? extends InputSplit> splitClass = split.getClass(); FileSplit fileSplit = null; if (splitClass.equals(FileSplit.class)) { fileSplit = (FileSplit) split; } else if (splitClass.getName(). equals("org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit")) { // begin reflection hackery... try { Method getInputSplitMethod = splitClass.getDeclaredMethod("getInputSplit"); getInputSplitMethod.setAccessible(true); fileSplit = (FileSplit) getInputSplitMethod.invoke(split); } catch (Exception e) { // wrap and re-throw error throw new IOException(e); } // end reflection hackery } return fileSplit.getPath(); }
出现问题02:
map<Object,Text,Text,IntWritble>
combiner<Text,IntWritble,Text,Text>
reduce<Text,Text,Text,Text>
这样设置,系统会异常。这是因为Combiner和Reducer其实是同一个函数,所以输入和输出类型必须保持一致。
Combiner实现对map端value的聚合,减少map 到 reudce 间数据传输,加快 shuffle 速度。牢记求平均值的MR不能使用Combiner。