zoukankan      html  css  js  c++  java
  • 继承FileInputFormat类来理解 FileInputFormat类

    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.hadoop.fs.BlockLocation;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.PathFilter;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputFormat;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
    import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
    //import org.apache.hadoop.mapreduce.lib.input.FileInputFormat.MultiPathFilter;
    import org.apache.hadoop.mapreduce.security.TokenCache;
    
    import com.google.common.base.Charsets;
    
    public class MyFileinput extends FileInputFormat<LongWritable, Text> {
    
    	private static final PathFilter hiddenFileFilter = new PathFilter() {
    		public boolean accept(Path p) {
    			String name = p.getName();
    			return ((!(name.startsWith("_"))) && (!(name.startsWith("."))));
    		}
    	};
    	
    	// 遍历文件列表, 过滤掉_ .开头的文件(可以自定义过滤)
    	protected List<FileStatus> listStatus(JobContext job) throws IOException {
    		System.out.println("*********************");
    		List result = new ArrayList();
    		Path[] dirs = getInputPaths(job);
    		System.out.println("dirs" + dirs);
    		System.out.println("dirs length = " + dirs.length);
    		for(Path p: dirs){
    			System.out.println("Path loop " + p);
    		}
    
    		if (dirs.length == 0) {
    			throw new IOException("No input paths specified in job");
    		}
    
    		TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
    				job.getConfiguration());
    
    		List errors = new ArrayList();
    
    		List filters = new ArrayList();
    		filters.add(hiddenFileFilter);
    		PathFilter jobFilter = getInputPathFilter(job);
    		if (jobFilter != null) {
    			filters.add(jobFilter);
    		}
    
    		// 过滤函数,可以拓展
    		PathFilter inputFilter = new MultiPathFilter(filters);
    
    		for (int i = 0; i < dirs.length; ++i) {
    			Path p = dirs[i];
    			FileSystem fs = p.getFileSystem(job.getConfiguration());
    			FileStatus[] matches = fs.globStatus(p, inputFilter);
    			System.out.println("matches=" + matches);
    			for(FileStatus match: matches){
    				System.out.println("loop matches" + match.getPath());
    			}
    			
    			if (matches == null)
    				errors.add(new IOException("Input path does not exist: " + p));
    			else if (matches.length == 0)
    				errors.add(new IOException("Input Pattern " + p
    						+ " matches 0 files"));
    			else {
    				for (FileStatus globStat : matches) {
    					System.out.println("globStat " + globStat);
    					if (globStat.isDirectory())
    						for (FileStatus stat : fs.listStatus(
    								globStat.getPath(), inputFilter)) {
    							result.add(stat);
    						}
    					else {
    						result.add(globStat);
    					}
    				}
    			}
    		}
    
    		if (!(errors.isEmpty())) {
    			throw new InvalidInputException(errors);
    		}
    		// LOG.info("Total input paths to process : " + result.size());
    		return result;
    	}
    	
    	// 计算分片大小,返回一个分片列表
    	public List<InputSplit> getSplits(JobContext job) throws IOException {
    		long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    		long maxSize = getMaxSplitSize(job);
    		
    		System.out.print("minSize " + minSize);
    		System.out.print("maxSize " + maxSize);
    		
    		List splits = new ArrayList();
    		// 获取输入目录下的文件列表(过滤文件)
    		List<FileStatus> files = listStatus(job);
    		for (FileStatus file : files) {
    			Path path = file.getPath();
    			long length = file.getLen();
    			System.out.println("path: " + path+ " file len = " + length);
    			if (length != 0L) {
    				// 通过路径找到块列表 
    				FileSystem fs = path.getFileSystem(job.getConfiguration());
    				BlockLocation[] blkLocations = fs.getFileBlockLocations(file,
    						0L, length);
    				
    				if (isSplitable(job, path)) {
    					long blockSize = file.getBlockSize();
    					System.out.println("blockSize:" + blockSize);
    					long splitSize = computeSplitSize(blockSize, minSize,
    							maxSize);
    					System.out.println("splitSize :" + splitSize);
    
    					long bytesRemaining = length;
    					System.out.println("bytesRemaining :" + bytesRemaining);
    					
    					System.out.println(bytesRemaining / splitSize);
    					// 定义为1.1D, 为避免一个分片过小, 也需要启动一个MAP来运行
    					// 最后剩余的文件大小只要不超过分片大小的1.1倍都会放入一个分片
    					while (bytesRemaining / splitSize > 1.1D) {
    						int blkIndex = getBlockIndex(blkLocations, length
    								- bytesRemaining);
    						System.out.println("blkIndex :" + blkIndex);
    						
    						// 添加到分片分片列表
    						splits.add(makeSplit(path, length - bytesRemaining,
    								splitSize, blkLocations[blkIndex].getHosts()));
    
    						bytesRemaining -= splitSize;
    					}
    					
    					// 文件尾 
    					if (bytesRemaining != 0L) {
    						Long remain = length - bytesRemaining;
    						System.out.println("文件尾大小" + bytesRemaining);
    						int blkIndex = getBlockIndex(blkLocations, length
    								- bytesRemaining);
    						splits.add(makeSplit(path, length - bytesRemaining,
    								bytesRemaining,
    								blkLocations[blkIndex].getHosts()));
    					}
    				} else {
    					splits.add(makeSplit(path, 0L, length,
    							blkLocations[0].getHosts()));
    				}
    			} else {
    				// 测试文件大小为0, 也会启动一个map
    				splits.add(makeSplit(path, 0L, length, new String[0]));
    			}
    		}
    			
    		job.getConfiguration().setLong(
    				"mapreduce.input.fileinputformat.numinputfiles", files.size());
    		// LOG.debug("Total # of splits: " + splits.size());
    		return splits;
    	}
    
    	private static class MultiPathFilter implements PathFilter {
    		private List<PathFilter> filters;
    
    		public MultiPathFilter(List<PathFilter> filters) {
    			this.filters = filters;
    		}
    
    		public boolean accept(Path path) {
    			for (PathFilter filter : this.filters) {
    				if (!(filter.accept(path))) {
    					return false;
    				}
    			}
    			return true;
    		}
    	}
    	
    	// 文件内容读取, 默认按行读取 
    	@Override
    	public RecordReader<LongWritable, Text> createRecordReader(
    			InputSplit split, TaskAttemptContext context) {
    		String delimiter = context.getConfiguration().get(
    				"textinputformat.record.delimiter");
    		
    		System.out.println("delimiter ==" + delimiter);
    		// 默认为空
    		byte[] recordDelimiterBytes = null;
    		if (null != delimiter)
    			recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
    		
    		return new LineRecordReader(recordDelimiterBytes);
    	}
    }
    

    主要功能是计算分片和按照分片给MAP任务读取内容

    public abstract class InputFormat<K, V> {
        public abstract List<InputSplit> getSplits(JobContext paramJobContext)
                throws IOException, InterruptedException;

        public abstract RecordReader<K, V> createRecordReader(
                InputSplit paramInputSplit,
                TaskAttemptContext paramTaskAttemptContext) throws IOException,
                InterruptedException;
    }

    从顶层的派生类提供的接口差不多也能看出来。

    最简单的Informat实现, 然后我们只要实现RecordReader就可以了

    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
    
    import com.google.common.base.Charsets;
    
    public class MySimpleInformat<V> extends FileInputFormat<LongWritable, V>
    {    
    	protected boolean isSplitable(JobContext context, Path filename) {
    		// 是否需要分片
    		return false;
    	}    
        
        @Override
        public RecordReader<LongWritable, V> createRecordReader(
                InputSplit split, TaskAttemptContext context) {
            String delimiter = context.getConfiguration().get(
                    "textinputformat.record.delimiter");
             
            System.out.println("delimiter ==" + delimiter);
            // 默认为空
            byte[] recordDelimiterBytes = null;
            if (null != delimiter)
                recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
             
            return (RecordReader<LongWritable, V>) new LineRecordReader(recordDelimiterBytes);
        }
    }
    
  • 相关阅读:
    基于.net 4.0框架的Cipher演示程序
    文件校验 加解密
    Base64加解密
    RSA加解密
    UBT框架加解密工具项目 UBT.Framework.Encryption
    AESTest
    message-digest algorithm 5
    Aforge.net
    Winform非UI线程更新UI界面的各种方法小结
    Hadoop-2.6.5安装
  • 原文地址:https://www.cnblogs.com/chengxin1982/p/3849142.html
Copyright © 2011-2022 走看看