zoukankan      html  css  js  c++  java
  • Hadoop(二)自定义输出

    Hadoop提供了较为丰富的数据输入输出格式,可以满足很多的设计实现,但是在某些时候需要自定义输入输出格式。

    数据的输入格式用于描述MapReduce作业的数据输入规范,MapReduce框架依靠 数据输入格式完后输入规范检查(比如输入文件目录的检查),对数据文件进行输入分块(InputSpilt)以及提供从输入分快中将数据逐行的读出,并转 换为Map过程的输入键值对等功能。Hadoop提供了很多的输入格式,TextInputFormat和KeyValueInputFormat,对于 每个输入格式都有与之对应的RecordReader,LineRecordReader和KeyValueLineRecordReader。用户需要 自定义输入格式,主要实现InputFormat中的createRecordReader()和getSplit()方法,而在 RecordReader中实现getCurrentKey().....

    例如:

     

      1  package com.rpc.nefu;  
      2   
      3 import java.io.IOException;     
      4 import org.apache.hadoop.fs.FSDataInputStream;    
      5 import org.apache.hadoop.fs.FileSystem;    
      6 import org.apache.hadoop.fs.Path;     
      7 import org.apache.hadoop.io.IntWritable;    
      8 import org.apache.hadoop.io.Text;    
      9 import org.apache.hadoop.mapreduce.InputSplit;      
     10 import org.apache.hadoop.mapreduce.RecordReader;    
     11 import org.apache.hadoop.mapreduce.TaskAttemptContext;    
     12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;    
     13 import org.apache.hadoop.util.LineReader;    
     14 import org.apache.hadoop.mapreduce.lib.input.FileSplit;    
     15    
     16 //自定义的输入格式需要 继承FileInputFormat接口  
     17 public class ZInputFormat extends FileInputFormat<IntWritable,IntWritable>{    
     18             
     19         @Override  //实现RecordReader  
     20         public RecordReader<IntWritable, IntWritable> createRecordReader(    
     21                 InputSplit split, TaskAttemptContext context)    
     22                 throws IOException, InterruptedException {    
     23             return new ZRecordReader();                                                    
     24         }    
     25     
     26         //自定义的数据类型    
     27         public static class ZRecordReader extends RecordReader<IntWritable,IntWritable>    
     28         {    
     29             //data    
     30             private LineReader in;      //输入流    
     31             private boolean more = true;//提示后续还有没有数据    
     32                 
     33             private IntWritable key = null;    
     34             private IntWritable value = null;    
     35                 
     36             //这三个保存当前读取到位置(即文件中的位置)    
     37             private long start;    
     38             private long end;    
     39             private long pos;    
     40                 
     41             //private Log LOG = LogFactory.getLog(ZRecordReader.class);//日志写入系统,可加可不加    
     42                             
     43             @Override    
     44             public void initialize(InputSplit split, TaskAttemptContext context)    
     45                     throws IOException, InterruptedException {    
     46                 // 初始化函数    
     47                     
     48                 FileSplit inputsplit = (FileSplit)split;    
     49                 start = inputsplit.getStart();                      //得到此分片开始位置    
     50                 end   = start + inputsplit.getLength();//结束此分片位置    
     51                 final Path file = inputsplit.getPath();    
     52             
     53                 // 打开文件    
     54                 FileSystem fs = file.getFileSystem(context.getConfiguration());    
     55                 FSDataInputStream fileIn = fs.open(inputsplit.getPath());    
     56                     
     57                   
     58                 //将文件指针移动到当前分片,因为每次默认打开文件时,其指针指向开头    
     59                 fileIn.seek(start);    
     60                     
     61                 in = new LineReader(fileIn, context.getConfiguration());    
     62     
     63                 if (start != 0)     
     64                 {    
     65                   System.out.println("4");     
     66                    //如果这不是第一个分片,那么假设第一个分片是0——4,那么,第4个位置已经被读取,则需要跳过4,否则会产生读入错误,因为你回头又去读之前读过的地方    
     67                start += in.readLine(new Text(), 0, maxBytesToConsume(start));    
     68                 }    
     69                 pos = start;    
     70             }    
     71                 
     72             private int maxBytesToConsume(long pos)     
     73             {    
     74                     return (int) Math.min(Integer.MAX_VALUE, end - pos);    
     75              }    
     76                 
     77             @Override    
     78             public boolean nextKeyValue() throws IOException,    
     79                     InterruptedException {    
     80                 //下一组值    
     81                 //tips:以后在这种函数中最好不要有输出,费时    
     82                 //LOG.info("正在读取下一个,嘿嘿");    
     83                 if(null == key)    
     84                 {    
     85                     key = new IntWritable();    
     86                 }    
     87                 if(null == value)    
     88                 {    
     89                     value = new IntWritable();    
     90                 }    
     91                 Text nowline = new Text();//保存当前行的内容    
     92                 int readsize = in.readLine(nowline);    
     93                 //更新当前读取到位置    
     94                 pos += readsize;    
     95                 
     96                 //如果pos的值大于等于end,说明此分片已经读取完毕    
     97                 if(pos >= end)    
     98                 {    
     99                     more = false;    
    100                     return false;    
    101                 }    
    102                     
    103                 if(0 == readsize)    
    104                 {    
    105                     key = null;    
    106                     value = null;    
    107                     more = false;//说明此时已经读取到文件末尾,则more为false    
    108                     return false;    
    109                 }    
    110                 String[] keyandvalue = nowline.toString().split(",");    
    111                     
    112                 //排除第一行    
    113                 if(keyandvalue[0].endsWith(""CITING""))    
    114                 {    
    115                     readsize = in.readLine(nowline);    
    116                     //更新当前读取到位置    
    117                     pos += readsize;    
    118                     if(0 == readsize)    
    119                     {    
    120                         more = false;//说明此时已经读取到文件末尾,则more为false    
    121                         return false;    
    122                     }    
    123                     //重新划分    
    124                     keyandvalue = nowline.toString().split(",");    
    125                 }    
    126                     
    127                 //得到key和value    
    128                 //LOG.info("key is :" + key +"value is" + value);    
    129                 key.set(Integer.parseInt(keyandvalue[0]));    
    130                 value.set(Integer.parseInt(keyandvalue[1]));    
    131                     
    132                 return true;    
    133             }    
    134     
    135             @Override    
    136             public IntWritable getCurrentKey() throws IOException,    
    137                     InterruptedException {    
    138                 //得到当前的Key    
    139                 return key;    
    140             }    
    141     
    142             @Override    
    143             public IntWritable getCurrentValue() throws IOException,    
    144                     InterruptedException {    
    145                 //得到当前的value    
    146                 return value;    
    147             }    
    148     
    149             @Override    
    150             public float getProgress() throws IOException, InterruptedException {    
    151                 //计算对于当前片的处理进度    
    152                 if( false == more || end == start)    
    153                 {    
    154                     return 0f;    
    155                 }    
    156                 else    
    157                 {    
    158                     return Math.min(1.0f, (pos - start)/(end - start));    
    159                 }    
    160             }    
    161     
    162             @Override    
    163             public void close() throws IOException {    
    164                 //关闭此输入流    
    165                 if(null != in)    
    166                 {    
    167                     in.close();    
    168                 }    
    169             }    
    170                 
    171         }    
    172 } 
        package reverseIndex;  
          
        import java.io.IOException;  
          
        import org.apache.hadoop.io.Text;  
        import org.apache.hadoop.mapreduce.InputSplit;  
        import org.apache.hadoop.mapreduce.RecordReader;  
        import org.apache.hadoop.mapreduce.TaskAttemptContext;  
        import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
        import org.apache.hadoop.mapreduce.lib.input.FileSplit;  
        import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;  
          
        public class FileNameLocInputFormat extends FileInputFormat<Text, Text>{  
          
            @Override  
            public org.apache.hadoop.mapreduce.RecordReader<Text, Text> createRecordReader(  
                    org.apache.hadoop.mapreduce.InputSplit split, TaskAttemptContext context)  
                    throws IOException, InterruptedException {  
                // TODO Auto-generated method stub  
                return new FileNameLocRecordReader();  
            }  
            public static class FileNameLocRecordReader extends RecordReader<Text,Text>{  
                  
                String FileName;  
                LineRecordReader line = new LineRecordReader();  
                /** 
                 * ...... 
                 */   
          
                @Override  
                public Text getCurrentKey() throws IOException, InterruptedException {  
                    // TODO Auto-generated method stub  
                    return new Text("("+FileName+"@"+line.getCurrentKey()+")");  
                }  
          
                @Override  
                public Text getCurrentValue() throws IOException, InterruptedException {  
                    // TODO Auto-generated method stub  
                    return line.getCurrentValue();  
                }  
          
                  
          
                @Override  
                public void initialize(InputSplit split, TaskAttemptContext arg1)  
                        throws IOException, InterruptedException {  
                    // TODO Auto-generated method stub  
                    line.initialize(split, arg1);  
                    FileSplit inputsplit = (FileSplit)split;  
                    FileName = (inputsplit).getPath().getName();      
                }  
          
                @Override  
                public void close() throws IOException {  
                    // TODO Auto-generated method stub  
                      
                }  
          
                @Override  
                public float getProgress() throws IOException, InterruptedException {  
                    // TODO Auto-generated method stub  
                    return 0;  
                }  
          
                @Override  
                public boolean nextKeyValue() throws IOException, InterruptedException {  
                    // TODO Auto-generated method stub  
                    return false;  
                }  
            }  
        }  
    

    Hadoop中也内置了很多的输出格式与RecordWriter.输出格式完成输出规范检查,作业结果数据输出。

    自定义的输出格式:

     

     public static class AlphaOutputFormat extends multiformat<Text, IntWritable>{  
              
            @Override  
            protected String generateFileNameForKeyValue(Text key,  
                    IntWritable value, Configuration conf) {  
                // TODO Auto-generated method stub  
                char c = key.toString().toLowerCase().charAt(0);  
                if( c>='a' && c<='z'){  
                    return c+".txt";  
                }else{  
                    return "other.txt";  
                }  
            }  
              
        } 
    
        //设置输出格式  
                job.setOutputFormatClass(AlphaOutputFormat.class);  
    
     package com.rpc.nefu;  
    import java.io.DataOutputStream;    
    import java.io.IOException;    
    import java.util.HashMap;    
    import java.util.Iterator;    
    import org.apache.hadoop.conf.Configuration;    
    import org.apache.hadoop.fs.FSDataOutputStream;    
    import org.apache.hadoop.fs.Path;    
    import org.apache.hadoop.io.Writable;    
    import org.apache.hadoop.io.WritableComparable;    
    import org.apache.hadoop.io.compress.CompressionCodec;    
    import org.apache.hadoop.io.compress.GzipCodec;    
    import org.apache.hadoop.mapreduce.OutputCommitter;    
    import org.apache.hadoop.mapreduce.RecordWriter;    
    import org.apache.hadoop.mapreduce.TaskAttemptContext;    
    import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;    
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;    
    import org.apache.hadoop.util.ReflectionUtils;    
      
    public abstract class multiformat<K extends WritableComparable<?>, V extends Writable>    
            extends FileOutputFormat<K, V> {    
        private MultiRecordWriter writer = null;    
        public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException,    
                InterruptedException {    
            if (writer == null) {    
                writer = new MultiRecordWriter(job, getTaskOutputPath(job));    
            }    
            return writer;    
        }    
        private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {    
            Path workPath = null;    
            OutputCommitter committer = super.getOutputCommitter(conf);    
            if (committer instanceof FileOutputCommitter) {    
                workPath = ((FileOutputCommitter) committer).getWorkPath();    
            } else {    
                Path outputPath = super.getOutputPath(conf);    
                if (outputPath == null) {    
                    throw new IOException("Undefined job output-path");    
                }    
                workPath = outputPath;    
            }    
            return workPath;    
        }    
        /**通过key, value, conf来确定输出文件名(含扩展名)*/    
        protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf);    
        public class MultiRecordWriter extends RecordWriter<K, V> {    
            /**RecordWriter的缓存*/    
            private HashMap<String, RecordWriter<K, V>> recordWriters = null;    
            private TaskAttemptContext job = null;    
            /**输出目录*/    
            private Path workPath = null;    
            public MultiRecordWriter(TaskAttemptContext job, Path workPath) {    
                super();    
                this.job = job;    
                this.workPath = workPath;    
                recordWriters = new HashMap<String, RecordWriter<K, V>>();    
            }    
            @Override    
            public void close(TaskAttemptContext context) throws IOException, InterruptedException {    
                Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator();    
                while (values.hasNext()) {    
                    values.next().close(context);    
                }    
                this.recordWriters.clear();    
            }    
            @Override    
            public void write(K key, V value) throws IOException, InterruptedException {    
                //得到输出文件名    
                String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration());    
                RecordWriter<K, V> rw = this.recordWriters.get(baseName);    
                if (rw == null) {    
                    rw = getBaseRecordWriter(job, baseName);    
                    this.recordWriters.put(baseName, rw);    
                }    
                rw.write(key, value);    
            }    
            // ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension}    
            private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName)    
                    throws IOException, InterruptedException {    
                Configuration conf = job.getConfiguration();    
                boolean isCompressed = getCompressOutput(job);    
                String keyValueSeparator = ",";    
                RecordWriter<K, V> recordWriter = null;    
                if (isCompressed) {    
                    Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,    
                            GzipCodec.class);    
                    CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);    
                    Path file = new Path(workPath, baseName + codec.getDefaultExtension());    
                    FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);    
                    recordWriter = new lineRecordWrite<K, V>(new DataOutputStream(codec    
                            .createOutputStream(fileOut)), keyValueSeparator);    
                } else {    
                    Path file = new Path(workPath, baseName);    
                    FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);    
                    recordWriter = new lineRecordWrite<K, V>(fileOut, keyValueSeparator);    
                }    
                return recordWriter;    
            }    
        }    
    } 
    
  • 相关阅读:
    (转)MapReduce Design Patterns(chapter 3 (part 2))(六)
    (转)MapReduce Design Patterns(chapter 3 (part 1))(五)
    (转)MapReduce Design Patterns(chapter 2 (part 3))(四)
    (转)MapReduce Design Patterns(chapter 2 (part 2))(三)
    (转) MapReduce Design Patterns(chapter 2 (part 1))(二)
    (转)MapReduce Design Patterns(chapter 1)(一)
    No mapping found for HTTP request with URI异常的原因,<mvc:default-servlet-handler/>的作用
    forward和redirect请求方式
    SpringMVC国际化配置
    Spring的IOC/DI使用到的技术
  • 原文地址:https://www.cnblogs.com/zhoujingyu/p/5316096.html
Copyright © 2011-2022 走看看