zoukankan      html  css  js  c++  java
  • MapReduce之自定义InputFormat

    在企业开发中,Hadoop框架自带的InputFormat类型不能满足所有应用场景,需要自定义InputFormat来解决实际问题。
    自定义InputFormat步骤如下:

    • (1)自定义一个类继承FilelnputFormat
    • (2)自定义一个类继承RecordReader,实现一次读取一个完整文件,将文件名为key,文件内容为value。
    • (3)在输出时使用SequenceFileOutPutFormat输出合并文件。

    无论HDFS还是MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。可以自定义InputFormat实现小文件的合并

    1. 需求

    将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value(bytes) 对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value

    (1)输入数据
    在这里插入图片描述
    (2)期望输出文件格式
    在这里插入图片描述

    2. 需求分析

    1. 自定义一个类继承FileInputFormat
      (1)重写isSplitable()方法,返回false,让文件不可切,整个文件作为1片
      (2)重写createRecordReader(),返回自定义的RecordReader对象

    2. 自定义一个类继承RecordReader
      在RecordReader中,nextKeyValue()是最重要的方法,返回当前读取到的key-value,如果读到返回true,调用Mapper的map()来处理,否则返回false

    3. 编写程序

    MyInputFormat.java

    /*
     * 1. 改变切片策略,一个文件固定切1片,通过指定文件不可切
     * 
     * 2. 提供RR ,这个RR读取切片的文件名作为key,读取切片的内容封装到bytes作为value
     */
    public class MyInputFormat extends FileInputFormat {
    
    	@Override
    	public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
    		return new MyRecordReader();
    	}
    	
    	@Override
    	protected boolean isSplitable(JobContext context, Path filename) {
    		return false;
    	}
    }
    

    MyRecordReader.java

    /*
     * RecordReader从MapTask处理的当前切片中读取数据
     * 
     * XXXContext都是Job的上下文,通过XXXContext可以获取Job的配置Configuration对象
     */
    public class MyRecordReader extends RecordReader {
    	
    	private Text key;
    	private BytesWritable value;
    	
    	private String filename;
    	private int length;
    	
    	private FileSystem fs;
    	private Path path;
    	
    	private FSDataInputStream is;
    	
    	private boolean flag=true;
    
    	// MyRecordReader在创建后,在进入Mapper的run()之前,自动调用
    	// 文件的所有内容设置为1个切片,切片的长度等于文件的长度
    	@Override
    	public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
    
    		FileSplit fileSplit=(FileSplit) split;
    		
    		filename=fileSplit.getPath().getName();
    		
    		length=(int) fileSplit.getLength();
    		
    		path=fileSplit.getPath();
    		
    		//获取当前Job的配置对象
    		Configuration conf = context.getConfiguration();
    		
    		//获取当前Job使用的文件系统
    		fs=FileSystem.get(conf);
    		
    		 is = fs.open(path);
    		
    	}
    
    	// 读取一组输入的key-value,读到返回true,否则返回false
    	// 将文件的名称封装为key,将文件的内容封装为BytesWritable类型的value,返回true
    	// 第二次调用nextKeyValue()返回false
    	@Override
    	public boolean nextKeyValue() throws IOException, InterruptedException {
    		
    		if (flag) {
    			
    			//实例化对象
    			if (key==null) {
    				key=new Text();
    			}
    			
    			if (value==null) {
    				value=new BytesWritable();
    			}
    			
    			//赋值
    			//将文件名封装到key中
    			key.set(filename);
    			
    			// 将文件的内容读取到BytesWritable中
    			byte [] content=new byte[length];
    			
    			IOUtils.readFully(is, content, 0, length);
    			
    			value.set(content, 0, length);
    			
    			flag=false;
    			
    			return true;
    			
    		}
    		return false;
    	}
    
    	//返回当前读取到的key-value中的key
    	@Override
    	public Object getCurrentKey() throws IOException, InterruptedException {
    		return key;
    	}
    
    	//返回当前读取到的key-value中的value
    	@Override
    	public Object getCurrentValue() throws IOException, InterruptedException {
    		return value;
    	}
    
    	//返回读取切片的进度
    	@Override
    	public float getProgress() throws IOException, InterruptedException {
    		return 0;
    	}
    
    	// 在Mapper的输入关闭时调用,清理工作
    	@Override
    	public void close() throws IOException {
    		if (is != null) {
    			IOUtils.closeStream(is);
    		}	
    		if (fs !=null) {
    			fs.close();
    		}
    	}
    }
    

    CustomIFMapper.java

    public class CustomIFMapper extends Mapper<Text, BytesWritable, Text, BytesWritable>{
    
    }
    

    CustomIFReducer.java

    public class CustomIFReducer extends Reducer<Text, BytesWritable, Text, BytesWritable>{
    
    }
    

    CustomIFDriver.java

    public class CustomIFDriver {
    	
    	public static void main(String[] args) throws Exception {
    		
    		Path inputPath=new Path("e:/mrinput/custom");
    		Path outputPath=new Path("e:/mroutput/custom");
    		
    		//作为整个Job的配置
    		Configuration conf = new Configuration();
    		//保证输出目录不存在
    		FileSystem fs=FileSystem.get(conf);
    		
    		if (fs.exists(outputPath)) {
    			fs.delete(outputPath, true);
    		}
    		
    		// 创建Job
    		Job job = Job.getInstance(conf);
    
    		// 设置Job运行的Mapper,Reducer类型,Mapper,Reducer输出的key-value类型
    		job.setMapperClass(CustomIFMapper.class);
    		job.setReducerClass(CustomIFReducer.class);
    		
    		// Job需要根据Mapper和Reducer输出的Key-value类型准备序列化器,通过序列化器对输出的key-value进行序列化和反序列化
    		// 如果Mapper和Reducer输出的Key-value类型一致,直接设置Job最终的输出类型
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(BytesWritable.class);
    		
    		// 设置输入目录和输出目录
    		FileInputFormat.setInputPaths(job, inputPath);
    		FileOutputFormat.setOutputPath(job, outputPath);
    		
    		// 设置输入和输出格式
    		job.setInputFormatClass(MyInputFormat.class);
    		job.setOutputFormatClass(SequenceFileOutputFormat.class);
    		
    		// ③运行Job
    		job.waitForCompletion(true);
    			
    	}
    }
    
  • 相关阅读:
    drf 反序列化
    drf学习 第一天 序列化器
    flex学习之作用在items上的属性
    flex学习之align-content
    flex学习之flex-wrap
    flex学习之align-itmes
    flex弹性盒子中jstify-content
    将主机变为服务器,ssh连接出现access denied
    为什么用tensor不用array?
    深度学习之Epoch、Batch、Iteration
  • 原文地址:https://www.cnblogs.com/sunbr/p/13339671.html
Copyright © 2011-2022 走看看