zoukankan      html  css  js  c++  java
  • 在Hadoop中重写FileInputFormat类以处理二进制格式存储的整数

    近期開始使用MapReduce,发现网上大部分样例都是对文本数据进行处理的,也就是说在读取输入数据时直接使用默认的TextInputFormat进行处理就可以。对于文本数据处理,这个类还是能满足一部分应用场景。可是假设要处理以二进制形式结构化记录存储的文件时,这些类就不再适合了。

    本文以一个简单的应用场景为例:对依照二进制格式存储的整数做频数统计。当然,也能够在此基础上实现排序之类的其它应用。实现该应用的主要难点就是怎样处理输入数据。參考《权威指南·第三版》得知须要继承FileInputFormat这个类,并实现下面三个方法:

    class MyInputFormat extends FileInputFormat<Type1, Type2> {
    	/*
    	 * 查询推断当前文件能否够分块?"true"为能够分块,"false"表示不进行分块
    	 */
    	protected boolean isSplitable(Configuration conf, Path path) {
    		
    	}
    	
    	/*
    	 * MapReduce的client调用此方法得到全部的分块,然后将分块发送给MapReduce服务端。
    	 * 注意,分块中不包括实际的信息,而仅仅是对实际信息的分块信息。详细的说,每一个分块中
    	 * 包括当前分块相应的文件路径,当前分块在该文件里起始位置,当前分块的长度以及相应的
    	 * 实际数据所在的机器列表。在实现这个函数时,将这些信息填上就可以。
    	 * */
    	public List<InputSplit> getSplits(Configuration conf) throws IOException {
    	}
    
    	/*
    	 * 类RecordReader是用来创建传给map函数的Key-Value序列,传给此类的參数有两个:一个分块(split)和作业的配置信息(context).
    	 * 在Mapper的run函数中能够看到MapReduce框架运行Map的逻辑:
    	 * public void run(Context context) throws IOException, InterruptedException {
    	 * 		setup(context);
    	 * 		调用RecordReader方法的nextKeyValue,生成新的键值对。假设当前分块(Split)中已经处理完成了,则nextKeyValue会返回false.退出run函数
    	 *		while (context.nextKeyValue()) {	
    	 *			map(context.getCurrentKey(), context.getCurrentValue(), context);
    	 *		}
    	 *		cleanup(context);
    	 * }
    	 **/
    	public RecordReader<LongWritable, IntWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
    			throws IOException, InterruptedException {
    	}
    }

    在RecordReader函数中实现下面几个接口:

    public class BinRecordReader extends RecordReader<LongWritable, IntWritable> {
    	/*关闭文件流
    	 * */
    	public void close() {}
    
    	/*
    	 * 获取处理进度
    	 **/
    	public float getProgress() {}
    
    	/*
    	 * 获取当前的Key
    	 * */
    	public LongWritable getCurrentKey() throws IOException,
    	InterruptedException {}
    
    	/* 获取当前的Value
    	 * */
    	public IntWritable getCurrentValue() throws IOException,InterruptedException {}
    
    	/*
    	 * 进行初始化工作,打开文件流,依据分块信息设置起始位置和长度等等
    	 * */
    	public void initialize(InputSplit inputSplit, TaskAttemptContext context)
    			throws IOException, InterruptedException {}
    
    	/*生成下一个键值对
    	 **/
    	public boolean nextKeyValue() throws IOException, InterruptedException {
    	}
    }

    下面为是三个文件的代码,首先是BinInputFormat.java的代码:

    package org.apache.hadoop.examples;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.BlockLocation;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.examples.BinRecordReader;
    
    class BinInputFormat extends FileInputFormat<LongWritable, IntWritable> {
    	
    	private static final double SPLIT_SLOP=1.1;
    	
    	/*
    	 * 查询推断当前文件能否够分块?"true"为能够分块,"false"表示不进行分块
    	 */
    	protected boolean isSplitable(Configuration conf, Path path) {
    		return true;
    	}
    	
    	/*
    	 * MapReduce的client调用此方法得到全部的分块,然后将分块发送给MapReduce服务端。
    	 * 注意,分块中不包括实际的信息,而仅仅是对实际信息的分块信息。详细的说,每一个分块中
    	 * 包括当前分块相应的文件路径,当前分块在该文件里起始位置,当前分块的长度以及相应的
    	 * 实际数据所在的机器列表。在实现这个函数时,将这些信息填上就可以。
    	 * */
    	public List<InputSplit> getSplits(Configuration conf) throws IOException {
    		List<InputSplit> splits = new ArrayList<InputSplit>();
    		long minSplitSize = conf.getLong("mapred.min.split.size",1);
    		long maxSplitSize = conf.getLong("mapred.max.split.size", 1);
    		long blockSize = conf.getLong("dfs.block.size",1);
    		long splitSize = Math.max(minSplitSize, Math.min(maxSplitSize, blockSize));
    		FileSystem fs = FileSystem.get(conf);
    		String path = conf.get(INPUT_DIR);
    		FileStatus[] files = fs.listStatus(new Path(path));
    
    		for (int fileIndex = 0; fileIndex < files.length; fileIndex++) {
    			FileStatus file = files[fileIndex];
    			System.out.println("input file: " + file.getPath().toString());
    			long length = file.getLen();
    			FileSystem fsin = file.getPath().getFileSystem(conf);
    		    BlockLocation[] blkLocations = fsin.getFileBlockLocations(file, 0, length);
    		    if ((length != 0) && isSplitable(conf, file.getPath())) {
    		        long bytesRemaining = length;
    		        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
    		          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
    		          splits.add(new FileSplit(file.getPath(), length-bytesRemaining, splitSize,
    		                                   blkLocations[blkIndex].getHosts()));
    		          bytesRemaining -= splitSize;
    		        }
    		        
    		        if (bytesRemaining != 0) {
    		          splits.add(new FileSplit(file.getPath(), length-bytesRemaining, bytesRemaining,
    		                     blkLocations[blkLocations.length-1].getHosts()));
    		        }
    		      } else if (length != 0) {
    		        splits.add(new FileSplit(file.getPath(), 0, length, blkLocations[0].getHosts()));
    		      } else {
    		        //Create empty hosts array for zero length files
    		        splits.add(new FileSplit(file.getPath(), 0, length, new String[0]));
    		      }
    		}
    		return splits;
    	}
    	
    	/*
    	 * 类RecordReader是用来创建传给map函数的Key-Value序列,传给此类的參数有两个:一个分块(split)和作业的配置信息(context).
    	 * 在Mapper的run函数中能够看到MapReduce框架运行Map的逻辑:
    	 * public void run(Context context) throws IOException, InterruptedException {
    	 * 		setup(context);
    	 * 		调用RecordReader方法的nextKeyValue,生成新的键值对。假设当前分块(Split)中已经处理完成了,则nextKeyValue会返回false.退出run函数
    	 *		while (context.nextKeyValue()) {	
    	 *			map(context.getCurrentKey(), context.getCurrentValue(), context);
    	 *		}
    	 *		cleanup(context);
    	 * }
    	 **/
    	public RecordReader<LongWritable, IntWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
    			throws IOException, InterruptedException {
    		// TODO Auto-generated method stub
    		BinRecordReader reader = new BinRecordReader();
    		reader.initialize(split,context);
    		return reader;
    	}
    }

    下面为BinRecordReader.java的代码:

    package org.apache.hadoop.examples;
    
    import java.io.IOException;
    
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.mapreduce.RecordReader;
    
    /**
     * Return a single record (filename, "") where the filename is taken from
     * the file split.
     */
    public class BinRecordReader extends RecordReader<LongWritable, IntWritable> {
    	private FSDataInputStream inputStream = null;
    	private long start,end,pos;
    	private Configuration conf = null;
    	private FileSplit fileSplit = null;
    	private LongWritable key = new LongWritable();
    	private IntWritable value = new IntWritable();
    	private boolean processed = false;
    	public BinRecordReader() throws IOException {
    	}
    
    	/*关闭文件流
    	 * */
    	public void close() {
    		try {
    			if(inputStream != null)
    				inputStream.close();
    		} catch (IOException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	}
    
    	/*
    	 * 获取处理进度
    	 **/
    	public float getProgress() {
    		return ((processed == true)? 1.0f : 0.0f);
    	}
    
    	/*
    	 * 获取当前的Key
    	 * */
    	public LongWritable getCurrentKey() throws IOException,
    	InterruptedException {
    		// TODO Auto-generated method stub
    		return key;
    	}
    
    	/* 获取当前的Value
    	 * */
    	public IntWritable getCurrentValue() throws IOException,InterruptedException {
    		// TODO Auto-generated method stub
    		return value;
    	}
    
    	/*
    	 * 进行初始化工作,打开文件流,依据分块信息设置起始位置和长度等等
    	 * */
    	public void initialize(InputSplit inputSplit, TaskAttemptContext context)
    			throws IOException, InterruptedException {
    		// TODO Auto-generated method stub
    		fileSplit = (FileSplit)inputSplit;
    		conf = context.getConfiguration();
    
    		this.start = fileSplit.getStart();
    		this.end = this.start + fileSplit.getLength();
    
    		try{
    			Path path = fileSplit.getPath();
    			FileSystem fs = path.getFileSystem(conf);
    			this.inputStream = fs.open(path);
    			inputStream.seek(this.start);
    			this.pos = this.start;
    		}	catch(IOException e)	{
    			e.printStackTrace();
    		}
    	}
    
    	/*生成下一个键值对
    	 **/
    	public boolean nextKeyValue() throws IOException, InterruptedException {
    		// TODO Auto-generated method stub
    		if(this.pos < this.end) {
    			key.set(this.pos);
    			value.set(Integer.reverseBytes(inputStream.readInt()));
    			this.pos = inputStream.getPos();
    			return true;
    		} else {
    			processed = true;
    			return false;
    		}
    	}
    }
    

    下面是主文件BinCount.java的代码

    package org.apache.hadoop.examples;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.examples.BinInputFormat;
    
    public class IntCount {
    	public static class TokenizerMapper extends Mapper<LongWritable, IntWritable, Text, IntWritable>{
    
    		private final static IntWritable one = new IntWritable(1); 
    		private Text intNum = new Text();                             
    
    		public void map(LongWritable key, IntWritable value, Context context
    				) throws IOException, InterruptedException {
    			intNum.set(Integer.toString(value.get()));                              
    			context.write(intNum, one);                            
    		}
    	}
    
    	public static class IntSumReducer 
    	extends Reducer<Text,IntWritable,Text,IntWritable> { 
    		private IntWritable result = new IntWritable();              
    
    		public void reduce(Text key, Iterable<IntWritable> values, 
    				Context context
    				) throws IOException, InterruptedException {
    			int sum = 0;                                                 
    			for (IntWritable val : values) {
    				sum += val.get();                                         
    
    			}
    			result.set(sum);                                                                              
    			context.write(key, result);                                
    		}
    	}
    
    	public static void main(String[] args) throws Exception {
    		System.out.println("testing1");
    		Configuration conf = new Configuration();
    		String[] newArgs = new String[]{"hdfs://localhost:9000/read","hdfs://localhost:9000/data/wc_output21"};
    		String[] otherArgs = new GenericOptionsParser(conf, newArgs).getRemainingArgs();
    		if (otherArgs.length != 2) {
    			System.err.println("Usage: wordcount <in> <out>");
    			System.exit(2);
    		}
    		Job job = new Job(conf, "IntCount");                
    		job.setJarByClass(IntCount.class);
    		job.setMapperClass(TokenizerMapper.class);  
    		job.setCombinerClass(IntSumReducer.class);  
    		job.setReducerClass(IntSumReducer.class); 
    		//设置自己定义的输入类
    		job.setInputFormatClass(BinInputFormat.class);
    		job.setOutputKeyClass(Text.class);     
    		job.setOutputValueClass(IntWritable.class);   
    		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    		System.exit(job.waitForCompletion(true) ? 0 : 1);
    	}
    }
    接着我们用一段C语言生成二进制格式存储的文件,C语言代码例如以下:

    #include<stdio.h>
    int main(){
    	FILE * fp = fopen("tmpfile","wb");
    	int i,j;
    	for(i=0;i<10;i++) {
    		for(j=0;j<10;j++)
    			fwrite(&j,sizeof(int),1,fp);
    	}
    	fclose(fp);
    	return 0;
    }

    将生成的文件复制到/read/下,接着启动IntCount这个MapReduce程序,打开执行结果:




  • 相关阅读:
    精彩回顾 | Serverless Developer Meetup 12.04 深圳站
    Dubbo3 Triple 协议简介与选型思考
    阿里云 FaaS 架构设计与创新实践
    KubeDL 0.4.0 Kubernetes AI 模型版本管理与追踪
    shell脚本awk的用法
    case用法ping命令脚本(工作中常用的)
    linux时间与internet时间同步
    bootStrap表单验证插件的使用
    状态模式之观察者模式
    20211125
  • 原文地址:https://www.cnblogs.com/bhlsheji/p/3783970.html
Copyright © 2011-2022 走看看