zoukankan      html  css  js  c++  java
  • MR案例:倒排索引 && MultipleInputs

    本案例采用 MultipleInputs类 实现多路径输入的倒排索引。解读:MR多路径输入

    package test0820;
    
    import java.io.IOException;
    import java.lang.reflect.Method;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class WC0826 {
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            job.setJarByClass(WC0826.class);      
    
            job.setMapperClass(IIMapper.class);
            job.setCombinerClass(IICombiner.class);
            job.setReducerClass(IIReducer.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            //使用MultipleInputs类指定多路径输入
    MultipleInputs.addInputPath(job,
    new Path(args[0]), TextInputFormat.class); MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class); FileOutputFormat.setOutputPath(job, new Path(args[2])); System.exit(job.waitForCompletion(true)? 0:1); } //map public static class IIMapper extends Mapper<LongWritable, Text, Text, Text>{ String fileName; /** * 使用 MultipleInputs 获得 FileName 必须添加的类 */ private Path getFilePath(Context context) throws IOException { InputSplit split = context.getInputSplit(); Class<? extends InputSplit> splitClass = split.getClass(); FileSplit fileSplit = null; if (splitClass.equals(FileSplit.class)) { fileSplit = (FileSplit) split; } else if (splitClass.getName(). equals("org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit")) { // begin reflection hackery... try { Method getInputSplitMethod = splitClass.getDeclaredMethod("getInputSplit"); getInputSplitMethod.setAccessible(true); fileSplit = (FileSplit) getInputSplitMethod.invoke(split); } catch (Exception e) { // wrap and re-throw error throw new IOException(e); } // end reflection hackery } return fileSplit.getPath(); } @Override protected void setup(Context context) throws IOException, InterruptedException { //get file name fileName = getFilePath(context).getName(); } @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] splited = value.toString().split(" "); for(String word : splited){ Text word_fileName=new Text(word+"@"+fileName); context.write(word_fileName,new Text("1")); } } } //combiner public static class IICombiner extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> v2s, Context context) throws IOException, InterruptedException { Long sum = 0L; String value=new String(); String[] splited = key.toString().split("@"); for(Text vl :v2s){ sum += Long.parseLong(vl.toString()); value = splited[1]+"@"+sum.toString(); } context.write(new Text(splited[0]), new Text(value)); } } //reduce public static class IIReducer extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> v2s, Context context) throws IOException, InterruptedException { String value=new String(); for(Text text : v2s){ value = text.toString()+":"+value; } //去掉最后的":" context.write(key, new Text(value.substring(0, value.length()-1))); } } }

    出现问题01使用MultipleInputs类指定输入路径,当setup()方法中调用getInputSplit()方法获取当前split对应的FileName时会报IO异常:

    Error: java.lang.ClassCastException: org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit cannot be cast to org.apache.hadoop.mapreduce.lib.input.FileSplit

    问题原因01:filesplit实际上就是TaggedInputSplit中的成员变量inputSplit,而TaggedInputSplit类并不是public的(默认是default声明类型),所以不能直接获得对应的信息。

    解决方案01:

    • 第一种方法:在当前项目中新建对应的TaggedInputSplit类,并声明为public。即覆盖掉原有TaggedInputSplit类的声明类型。然后通过以下代码就可以正确调用:
    (FileSplit)((TaggedInputSplit)reporter.getInputSplit()).getInputSplit(); 
    • 第二种方法:通过反射机制。代码如下:
    /**
     * 反射机制
     * 使用 MultipleInputs 获得 FileName 必须添加的类
     */
    private Path getFilePath(Context context) throws IOException {
        
        InputSplit split = context.getInputSplit();
        Class<? extends InputSplit> splitClass = split.getClass();
        
        FileSplit fileSplit = null;
        if (splitClass.equals(FileSplit.class)) {
            fileSplit = (FileSplit) split;
        } else if (splitClass.getName().
                equals("org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit")) {
            
            // begin reflection hackery...
            try {
                Method getInputSplitMethod = splitClass.getDeclaredMethod("getInputSplit");
                getInputSplitMethod.setAccessible(true);
                fileSplit = (FileSplit) getInputSplitMethod.invoke(split);
            } catch (Exception e) {                    
                // wrap and re-throw error
                throw new IOException(e);
            }
            // end reflection hackery
        }
        return fileSplit.getPath();
    }

    出现问题02:

    map<Object,Text,Text,IntWritble>
    combiner<Text,IntWritble,Text,Text>
    reduce<Text,Text,Text,Text>

    这样设置,系统会异常。这是因为Combiner和Reducer其实是同一个函数,所以输入和输出类型必须保持一致。

    Combiner实现对map端value的聚合,减少map 到 reudce 间数据传输,加快 shuffle 速度。牢记求平均值的MR不能使用Combiner。

  • 相关阅读:
    批量修改文件的名字
    字节码指令以及操作数栈的分析
    字节码文件的分析
    类加载器详解
    类的加载-连接-初始化
    电商订单ElasticSearch同步解决方案--使用logstash
    springboot整合Mangodb实现crud,高级查询,分页,排序,简单聚合
    mongodb安装教程(亲测有效)
    Azure : 通过 SendGrid 发送邮件
    用java实现删除目录
  • 原文地址:https://www.cnblogs.com/skyl/p/4760072.html
Copyright © 2011-2022 走看看