zoukankan      html  css  js  c++  java
  • 继承FileInputFormat类来理解 FileInputFormat类

    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.hadoop.fs.BlockLocation;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.PathFilter;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputFormat;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
    import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
    //import org.apache.hadoop.mapreduce.lib.input.FileInputFormat.MultiPathFilter;
    import org.apache.hadoop.mapreduce.security.TokenCache;
    
    import com.google.common.base.Charsets;
    
    public class MyFileinput extends FileInputFormat<LongWritable, Text> {
    
    	private static final PathFilter hiddenFileFilter = new PathFilter() {
    		public boolean accept(Path p) {
    			String name = p.getName();
    			return ((!(name.startsWith("_"))) && (!(name.startsWith("."))));
    		}
    	};
    	
    	// 遍历文件列表, 过滤掉_ .开头的文件(可以自定义过滤)
    	protected List<FileStatus> listStatus(JobContext job) throws IOException {
    		System.out.println("*********************");
    		List result = new ArrayList();
    		Path[] dirs = getInputPaths(job);
    		System.out.println("dirs" + dirs);
    		System.out.println("dirs length = " + dirs.length);
    		for(Path p: dirs){
    			System.out.println("Path loop " + p);
    		}
    
    		if (dirs.length == 0) {
    			throw new IOException("No input paths specified in job");
    		}
    
    		TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
    				job.getConfiguration());
    
    		List errors = new ArrayList();
    
    		List filters = new ArrayList();
    		filters.add(hiddenFileFilter);
    		PathFilter jobFilter = getInputPathFilter(job);
    		if (jobFilter != null) {
    			filters.add(jobFilter);
    		}
    
    		// 过滤函数,可以拓展
    		PathFilter inputFilter = new MultiPathFilter(filters);
    
    		for (int i = 0; i < dirs.length; ++i) {
    			Path p = dirs[i];
    			FileSystem fs = p.getFileSystem(job.getConfiguration());
    			FileStatus[] matches = fs.globStatus(p, inputFilter);
    			System.out.println("matches=" + matches);
    			for(FileStatus match: matches){
    				System.out.println("loop matches" + match.getPath());
    			}
    			
    			if (matches == null)
    				errors.add(new IOException("Input path does not exist: " + p));
    			else if (matches.length == 0)
    				errors.add(new IOException("Input Pattern " + p
    						+ " matches 0 files"));
    			else {
    				for (FileStatus globStat : matches) {
    					System.out.println("globStat " + globStat);
    					if (globStat.isDirectory())
    						for (FileStatus stat : fs.listStatus(
    								globStat.getPath(), inputFilter)) {
    							result.add(stat);
    						}
    					else {
    						result.add(globStat);
    					}
    				}
    			}
    		}
    
    		if (!(errors.isEmpty())) {
    			throw new InvalidInputException(errors);
    		}
    		// LOG.info("Total input paths to process : " + result.size());
    		return result;
    	}
    	
    	// 计算分片大小,返回一个分片列表
    	public List<InputSplit> getSplits(JobContext job) throws IOException {
    		long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    		long maxSize = getMaxSplitSize(job);
    		
    		System.out.print("minSize " + minSize);
    		System.out.print("maxSize " + maxSize);
    		
    		List splits = new ArrayList();
    		// 获取输入目录下的文件列表(过滤文件)
    		List<FileStatus> files = listStatus(job);
    		for (FileStatus file : files) {
    			Path path = file.getPath();
    			long length = file.getLen();
    			System.out.println("path: " + path+ " file len = " + length);
    			if (length != 0L) {
    				// 通过路径找到块列表 
    				FileSystem fs = path.getFileSystem(job.getConfiguration());
    				BlockLocation[] blkLocations = fs.getFileBlockLocations(file,
    						0L, length);
    				
    				if (isSplitable(job, path)) {
    					long blockSize = file.getBlockSize();
    					System.out.println("blockSize:" + blockSize);
    					long splitSize = computeSplitSize(blockSize, minSize,
    							maxSize);
    					System.out.println("splitSize :" + splitSize);
    
    					long bytesRemaining = length;
    					System.out.println("bytesRemaining :" + bytesRemaining);
    					
    					System.out.println(bytesRemaining / splitSize);
    					// 定义为1.1D, 为避免一个分片过小, 也需要启动一个MAP来运行
    					// 最后剩余的文件大小只要不超过分片大小的1.1倍都会放入一个分片
    					while (bytesRemaining / splitSize > 1.1D) {
    						int blkIndex = getBlockIndex(blkLocations, length
    								- bytesRemaining);
    						System.out.println("blkIndex :" + blkIndex);
    						
    						// 添加到分片分片列表
    						splits.add(makeSplit(path, length - bytesRemaining,
    								splitSize, blkLocations[blkIndex].getHosts()));
    
    						bytesRemaining -= splitSize;
    					}
    					
    					// 文件尾 
    					if (bytesRemaining != 0L) {
    						Long remain = length - bytesRemaining;
    						System.out.println("文件尾大小" + bytesRemaining);
    						int blkIndex = getBlockIndex(blkLocations, length
    								- bytesRemaining);
    						splits.add(makeSplit(path, length - bytesRemaining,
    								bytesRemaining,
    								blkLocations[blkIndex].getHosts()));
    					}
    				} else {
    					splits.add(makeSplit(path, 0L, length,
    							blkLocations[0].getHosts()));
    				}
    			} else {
    				// 测试文件大小为0, 也会启动一个map
    				splits.add(makeSplit(path, 0L, length, new String[0]));
    			}
    		}
    			
    		job.getConfiguration().setLong(
    				"mapreduce.input.fileinputformat.numinputfiles", files.size());
    		// LOG.debug("Total # of splits: " + splits.size());
    		return splits;
    	}
    
    	private static class MultiPathFilter implements PathFilter {
    		private List<PathFilter> filters;
    
    		public MultiPathFilter(List<PathFilter> filters) {
    			this.filters = filters;
    		}
    
    		public boolean accept(Path path) {
    			for (PathFilter filter : this.filters) {
    				if (!(filter.accept(path))) {
    					return false;
    				}
    			}
    			return true;
    		}
    	}
    	
    	// 文件内容读取, 默认按行读取 
    	@Override
    	public RecordReader<LongWritable, Text> createRecordReader(
    			InputSplit split, TaskAttemptContext context) {
    		String delimiter = context.getConfiguration().get(
    				"textinputformat.record.delimiter");
    		
    		System.out.println("delimiter ==" + delimiter);
    		// 默认为空
    		byte[] recordDelimiterBytes = null;
    		if (null != delimiter)
    			recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
    		
    		return new LineRecordReader(recordDelimiterBytes);
    	}
    }
    

    主要功能是计算分片和按照分片给MAP任务读取内容

    public abstract class InputFormat<K, V> {
        public abstract List<InputSplit> getSplits(JobContext paramJobContext)
                throws IOException, InterruptedException;

        public abstract RecordReader<K, V> createRecordReader(
                InputSplit paramInputSplit,
                TaskAttemptContext paramTaskAttemptContext) throws IOException,
                InterruptedException;
    }

    从顶层的派生类提供的接口差不多也能看出来。

    最简单的Informat实现, 然后我们只要实现RecordReader就可以了

    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
    
    import com.google.common.base.Charsets;
    
    public class MySimpleInformat<V> extends FileInputFormat<LongWritable, V>
    {    
    	protected boolean isSplitable(JobContext context, Path filename) {
    		// 是否需要分片
    		return false;
    	}    
        
        @Override
        public RecordReader<LongWritable, V> createRecordReader(
                InputSplit split, TaskAttemptContext context) {
            String delimiter = context.getConfiguration().get(
                    "textinputformat.record.delimiter");
             
            System.out.println("delimiter ==" + delimiter);
            // 默认为空
            byte[] recordDelimiterBytes = null;
            if (null != delimiter)
                recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
             
            return (RecordReader<LongWritable, V>) new LineRecordReader(recordDelimiterBytes);
        }
    }
    
  • 相关阅读:
    CodeForces Gym 100500A A. Poetry Challenge DFS
    CDOJ 486 Good Morning 傻逼题
    CDOJ 483 Data Structure Problem DFS
    CDOJ 482 Charitable Exchange bfs
    CDOJ 481 Apparent Magnitude 水题
    Codeforces Gym 100637G G. #TheDress 暴力
    Gym 100637F F. The Pool for Lucky Ones 暴力
    Codeforces Gym 100637B B. Lunch 找规律
    Codeforces Gym 100637A A. Nano alarm-clocks 前缀和
    TC SRM 663 div2 B AABB 逆推
  • 原文地址:https://www.cnblogs.com/chengxin1982/p/3849142.html
Copyright © 2011-2022 走看看