zoukankan      html  css  js  c++  java
  • MapReduce自定义InputFormat,RecordReader

    MapReduce默认的InputFormat是TextInputFormat,且key是偏移量,value是文本,自定义InputFormat需要实现FileInputFormat,并重写createRecorder方法,如果需要还可以重写isSplitable()来设置是否切片,重写了createRecordReader还需要自定义RecordReader,InputFormat规定了key,value是什么,而RecordReader则是具体的读取逻辑,下面的例子是合并小文件,最终输出的k是文件路径,v是文件二进制字节

    1.InputFormat

     1 /**
     2  * 自定义InputFormat规定读取文件的k,v
     3  * @author tele
     4  *
     5  */
     6 public class MyInputFormat extends FileInputFormat<NullWritable,BytesWritable>{
     7     /**
     8      * 设置不切片,把小文件作为一个整体
     9      */
    10     @Override
    11     protected boolean isSplitable(JobContext context, Path filename) {
    12         return false;
    13     }
    14     
    15     @Override
    16     public RecordReader<NullWritable,BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
    17             throws IOException, InterruptedException {
    18         MyRecordReader recordReader = new MyRecordReader();
    19         recordReader.initialize(split, context);
    20         return recordReader;
    21     }
    22 }

    2.RecordReader

     1 /**
     2  * recordreader用于读取文件内容,输出文件内容即可,文件路径信息保存在split中
     3  * @author tele
     4  *
     5  */
     6 public class MyRecordReader extends RecordReader<NullWritable,BytesWritable> {
     7     FileSplit split;
     8     BytesWritable value = new BytesWritable();
     9     boolean flag = false;
    10     Configuration conf;
    11     int count = 0;
    12     
    13     /**
    14      * 初始化
    15      */
    16     @Override
    17     public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
    18         this.split = (FileSplit) split;
    19         conf = context.getConfiguration();    conf = context.getConfiguration();
    20     }
    21 
    22     /**
    23      * 业务逻辑处理,这个方法用来判断是否还有文件内容需要读取,会进入两次,第一次读取内容存入value中,返回true,第二次调用返回false
    24      * 只要返回true,就会调用getCurrentKey().getCurrentValue()把内容返回给map
    25      * 
    26      */
    27     @Override
    28     public boolean nextKeyValue() throws IOException, InterruptedException {
    29         count++;
    30         if(!flag) {
    31             //获取fs
    32             FileSystem fs = FileSystem.get(conf);
    33             //开启流
    34             Path path = this.split.getPath();
    35             FSDataInputStream fsDataInputStream = fs.open(path);
    36             long length = this.split.getLength();
    37             byte[] buf = new byte[(int) length];
    38             
    39             //读取
    40             IOUtils.readFully(fsDataInputStream, buf, 0,buf.length);
    41             value.set(buf, 0, buf.length);
    42             
    43             //关闭流
    44             IOUtils.closeStream(fsDataInputStream);
    45             flag = true;
    46         }else {
    47             flag = false;
    48         }
    49         return flag;
    50     }
    51 
    52     @Override
    53     public NullWritable getCurrentKey() throws IOException, InterruptedException {
    54         return NullWritable.get();
    55     }
    56 
    57     @Override
    58     public BytesWritable getCurrentValue() throws IOException, InterruptedException {
    59         return value;
    60     }
    61 
    62     @Override
    63     public float getProgress() throws IOException, InterruptedException {
    64         return flag?1:0;
    65     }
    66 
    67     @Override
    68     public void close() throws IOException {
    69         
    70     }
    71 }

    3.Mapper

     1 /**
     2  * 把结果输出到SequenceFileOutPutFormat中,输出的key是文件路径,value为文件内容
     3  * @author tele
     4  *
     5  */
     6 public class InputformatMapper extends Mapper<NullWritable, BytesWritable, Text,BytesWritable/*Text*/> {
     7     Text k = new Text();      
     8 
     9     @Override
    10     protected void map(NullWritable key, BytesWritable value,
    11             Mapper<NullWritable, BytesWritable, Text, BytesWritable/*Text*/>.Context context)
    12             throws IOException, InterruptedException {
    13         FileSplit split = (FileSplit) context.getInputSplit();
    14         Path path = split.getPath();
    15         
    16         k.set(path.toString());
    17         
    18     /*    String result = new String(value.getBytes(),0,value.getLength());
    19         context.write(k,new Text(result));*/
    20         
    21         context.write(k, value);
    22     }
    23 }

    4.Driver(由于输出的是字节,需要指定OutputFormat为SequenceFileOutputFormat)

     1 /**
     2  * 驱动
     3  * @author tele
     4  *
     5  */
     6 public class InputformatDriver {
     7     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
     8         //1.获得job实例
     9         Configuration conf = new Configuration();
    10         Job job = Job.getInstance(conf);
    11         
    12         //2.关联class
    13         job.setJarByClass(InputformatDriver.class);
    14         job.setMapperClass(InputformatMapper.class);
    15         
    16         
    17         //4.设置format
    18         job.setInputFormatClass(MyInputFormat.class);
    19         //使用SequenceFileOutputFormat作为输出格式
    20         job.setOutputFormatClass(SequenceFileOutputFormat.class);
    21         
    22         //5.数据类型
    23         job.setOutputKeyClass(Text.class);
    24         job.setOutputValueClass(BytesWritable.class);
    25         
    26     //    job.setOutputValueClass(Text.class);
    27 
    28         //6.设置输入与输出路径
    29         FileInputFormat.setInputPaths(job,new Path(args[0]));
    30         FileOutputFormat.setOutputPath(job,new Path(args[1]));
    31         
    32         //7.提交
    33         boolean result = job.waitForCompletion(true);
    34         System.exit(result?0:1);
    35     }
    36 }

     

  • 相关阅读:
    设计模式-转载
    Java-类和对象基础练习
    Java-单例模式(singleton)-转载
    java-面向对象练习2
    Java-面向对象基础练习
    Java-字符串练习
    Java-数组练习5
    Java-数组练习4
    JAVA-初步认识-常用对象API(String类-常见功能-获取-1)
    JAVA-初步认识-常用对象API(String类-构造函数)
  • 原文地址:https://www.cnblogs.com/tele-share/p/9688174.html
Copyright © 2011-2022 走看看