zoukankan      html  css  js  c++  java
  • MR案例:定制InputFormat

    数据输入格式

    InputFormat类用于描述MR作业的输入规范,主要功能:输入规范检查(比如输入文件目录的检查)、对数据文件进行输入切分和从输入分块中将数据记录逐一读取出来、并转化为Map的输入键值对细节详见解读:标准输入/输出格式

    Hadoop中最常用的数据输入格式包括:TextInputFormat 和 KeyValueInputFormat。

      1). TextInputFormat 是系统默认的数据输入格式,可以将文件的每一行解析成一个键值对。其中,Key是当前行在整个文件中的字节偏移量,而Value就是该行的内容。默认的RecordReader是LineRecordReader

      2). KeyValueInputFormat是将一个按照<key,value>格式存放的文本文件逐行读出,并自动解析生成相应的key和value。默认是KeyValueLineRecordReader。 

    定制数据输入格式 

    用户可以从基类 InputFormat 和 RecordReader 开始定制过程,主要实现 InputFormat 中的 createRecordReader() 和 getSplits() 两个抽象方法,而 RecordReader 中则需要实现 gerCurrentKey() 和 getCurrentValue() 几个抽象方法。

    需求:为了能更细粒的记录每个单词在文档中出现时的行位置信息FileName@LineOffset。 

    • 方法一:基于默认的TextInputFormat和LineRecordReader 
    public static class IIMapper extends Mapper<Text, Text, Text, Text>{
            @Override
    
    //输出key:word 输出value:FileName@LineOffset protected void map(Text key, Text value,Context context) throws IOException, InterruptedException {
        //得到输入文件的文件名FileName(优化:应在setup方法中获取)     FileSplit fileSplit = (FileSplit)context.getInputSplit(); String name = fileSplit.getPath().getName();
        //组装拼接Value: FileName@LineOffset     Text fileName_lineOffset=new Text(name+"@"+key.toString());
    String[] splited
    = value.toString().split(" "); for(String word : splited){ context.write(new Text(word), fileName_lineOffset); } } }
    • 方法二:基于 TextInputFormat 和 LineRecordReader 定制 FileNameInputFormat FileNameRecordReader 
    package invertedIndex;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
    
    public class FileNameRecordReader extends RecordReader<Text, Text> {
    
        
        //成员变量
        String fileName;
    
    //实例化一个LineRecordReader实例 LineRecordReader lrr=new LineRecordReader(); @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { //调用LineRecordReader类的初始化方法 lrr.initialize(split, context); //获取当前InputSplit的文件名 fileName=((FileSplit)split).getPath().getName(); } @Override public Text getCurrentKey() throws IOException, InterruptedException { //调用LineRecordReader类的方法,拼接key //其中lrr.getCurrentKey()返回:当前行在整个文本文件中的字节偏移量 return new Text("("+fileName+"@"+lrr.getCurrentKey().toString()+")"); } @Override public Text getCurrentValue() throws IOException, InterruptedException { //调用LineRecordReader类的方法 return lrr.getCurrentValue(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { return lrr.nextKeyValue(); } @Override public float getProgress() throws IOException, InterruptedException { return lrr.getProgress(); } @Override public void close() throws IOException { lrr.close(); } } package invertedIndex; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class FileNameInputFormat extends FileInputFormat<Text, Text>{ @Override public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { FileNameRecordReader fnrr = new FileNameRecordReader(); //调用FileNameRecordReader的初始化方法 fnrr.initialize(split, context); return fnrr; } }
    •  使用自定义的 FileNameInputFormat FileNameRcordReader : 
    package invertedIndex;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class InvertedIndex {
        public static void main(String[] args) throws Exception {
            Job job = Job.getInstance(new Configuration());
            job.setJarByClass(InvertedIndex.class);
    
    //设置数据输入格式【使用自定义的InputFormat】 job.setInputFormatClass(FileNameInputFormat.class);
    job.setMapperClass(FFMapper.
    class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class);
    job.setNumReduceTasks(
    0); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class);
    FileInputFormat.addInputPath(job,
    new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));
    job.waitForCompletion(
    true); } public static class FFMapper extends Mapper<Text, Text, Text, Text>{ @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
    //分词 StringTokenizer st = new StringTokenizer(value.toString());
    for(;st.hasMoreTokens();){
    //key:单词word value:FileName+偏移量 context.write(new Text(st.nextToken()), key); } } } }

     输出结果为:key:单词,value:FileName@偏移量

    read (data1@0)

    file (data1@0)

    read (data1@11)

    data (data1@11)  

    数据输出格式 

    数据输出格式(OutputFormat)用于描述MR作业的数据输出规范。主要功能:输出规范检查(如检查输出目录是否存在),以及提供作业结果数据输出功能。 

    Hadoop默认的数据输出格式是TextOutputFormat,可以将结果以【key+ +value的形式逐行输出。默认的RecordWriter是LineRecordWriter。 

  • 相关阅读:
    最短路
    Codeforces Round #607 (Div. 2) C. Cut and Paste
    第三次训练赛
    训练赛
    day27-反射
    day26-网络编程
    tcp文件上传--多个客户端
    tcp图片上传
    tcp文件上传优化
    tcp文件上传
  • 原文地址:https://www.cnblogs.com/skyl/p/4732290.html
Copyright © 2011-2022 走看看