zoukankan      html  css  js  c++  java
  • hadoop2.x之IO:基于文件的数据结构

    备注

    二进制文件广义上讲是所有文件(在物理上所有文件都是二进制编码)。狭义上是指文本文件以外的文件
    。而文本文件又是指ASCII或unicode编码的文件,二者在物理上没有本质的区别,只是逻辑上的概念。所以二进制文件在这里指的是所有文件。

    Hadoop主要处理日志文件,其中每一行文本代表一条日志记录。在MapReduce的数据处理中,处理结果是用key-value的格式传递给下一过程的。我们可以看到,在前面的"气象数据集"的例子中 ,Mapper类的参数是以key-value的形式传入的,并且是逐行传入。Hadoop提供了一个用来为二进制键值对提供持久数据结构的类SequenceFile。他主要解决了两个问题:

    1. 将二进制文件(例如日志)转化为键值对的方式存储。
    2. 可以作为小文件的容器,可以将小文件包装起来,作为一个大文件处理。使HDFS和MapReduce效率更高。

    1. SequenceFile写操作

    通过静态方法个SequenceFile.createWriter()方法可以创建一个SequenceFile对象,并返回一个个SequenceFile.Writer实例,该示例可以通过append()方法在文件末尾追加键值对。

    例如:

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.SequenceFile;
    import org.apache.hadoop.io.SequenceFile.Writer;
    import org.apache.hadoop.io.SequenceFile.Writer.Option;
    import org.apache.hadoop.io.Text;
    
    
    public class SequenceFileWriterDemo{
    	
    	private static final String[] DATAS = new String[]{
    		"Hello World","きょうはいいてんきですね","It's a Nice Day Today","今天天气很好啊","[Tomcat 2.x] xxx xxx, xxx"
    	};
    	
    	public static void main(String[] args) throws Exception {
    		String uri = args[0];
    		Path path = new Path(uri);
    		Configuration conf = new Configuration();
    		
    		IntWritable key = new IntWritable();
    		Text value = new Text();
    		SequenceFile.Writer writer = null;
    		try {
    			Option[] options = new Option[]{
    				Writer.file(path),
    				Writer.keyClass(key.getClass()),
    				Writer.valueClass(value.getClass())
    			};
    			writer = SequenceFile.createWriter(conf,options);
    
    			for (int i = 0; i < DATAS.length; i++) {
    				key.set(i + 1);
    				value.set(DATAS[i].getBytes());
    				System.out.println("["+writer.getLength()+"]"+"	"+key+"	"+value);
    				writer.append(key, value);
    			}
    			writer.close();
    		} catch (Exception e) {
    			IOUtils.closeStream(writer);
    		}
    		
    	}
    	
    
    

    运行结果:

    [grid@tiny01 ~]$ hadoop SequenceFileWriterDemo file:///home/grid/result.seq
    [128]   1       Hello World
    [160]   2       きょうはいいてんきですね
    [213]   3       It's a Nice Day Today
    [255]   4       今天天气很好啊
    [297]   5       [Tomcat 2.x] xxx xxx, xxx
    

    由于这是二进制文件我们无法直接读取,下面会介绍读取方式。

    2. SequenceFile读操作

    上面我们将数据存储到了result.seq中,现在我们就要取出他来。SequenceFile使用next()函数来遍历文件的,如果读到文件末尾则返回false。

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.SequenceFile;
    import org.apache.hadoop.io.SequenceFile.Reader;
    import org.apache.hadoop.io.SequenceFile.Reader.Option;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.util.ReflectionUtils;
    
    
    public class SequenceFileReaderDemo{
    	
    	public static void main(String[] args) throws Exception {
    		String uri = args[0];
    		Path path = new Path(uri);
    		Configuration conf = new Configuration();
    		
    		SequenceFile.Reader reader = null;
    		try {
    			Option[] options = new Option[]{
    				Reader.file(path),
    			};
    			reader = new SequenceFile.Reader(conf,options);
    			
    			Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
    			Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
    			
    			long position = reader.getPosition();
    			while (reader.next(key, value)) {
    				System.out.println("["+position+"]"+"	"+key+"	"+value);
    				position = reader.getPosition();
    			}
    			reader.close();
    		} catch (Exception e) {
    			IOUtils.closeStream(reader);
    		}
    		
    	}
    	
    }
    

    运行结果:

    [grid@tiny01 ~]$ hadoop SequenceFileReaderDemo file:///home/grid/result.seq
    [128]   1       Hello World
    [160]   2       きょうはいいてんきですね
    [213]   3       It's a Nice Day Today
    [255]   4       今天天气很好啊
    [297]   5       [Tomcat 2.x] xxx xxx, xxx
    

    我们就可以按行读取了。

    3.MapFile

    下面我们要介绍一下MapFile。MapFile是已经排过序的SequenceFile,在上面的程序中我们虽然给二进制文件添加了key,但是我们没有排序和创建索引。而MapFile则在SequenceFile的基础上排序并创建了索引。我们可以看看上面的程序中如果将key变为随机值后创建的文件的结果:

    key.set((int)(Math.random()*1000));
    

    写入后读取的文件([hadoop fs -text 文件名]是Hadoop自带的SequenceFile读取程序)

    grid@tiny01 input]$ hadoop SequenceFileWriterDemo2 file:///home/grid/result2.seq
    
    [128]   615     Hello World
    [160]   156     きょうはいいてんきですね
    [213]   460     It's a Nice Day Today
    [255]   823     今天天气很好啊
    [297]   288     [Tomcat 2.x] xxx xxx, xxx
    
    [grid@tiny01 input]$ hadoop fs -text file:///home/grid/result2.seq
    615     Hello World
    156     きょうはいいてんきですね
    460     It's a Nice Day Today
    823     今天天气很好啊
    288     [Tomcat 2.x] xxx xxx, xxx
    
    

    我们可以看见是没有排序的。

    4. MapFile的写操作

    实际上MapFile的写操作和SequenceFile相同,先新建一个MapFile.Writer实例,再调用append()方法写入文件。

    需要注意的是MapFile的键必须是WritableComparable类型的实例。值必须是Writable类型的实例。

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.MapFile;
    import org.apache.hadoop.io.MapFile.Writer;
    import org.apache.hadoop.io.SequenceFile.Writer.Option;
    import org.apache.hadoop.io.Text;
    
    
    public class MapFileWriterDemo{
    	
    	private static final String[] DATAS = new String[]{
    		"Hello World",
    		"きょうはいいてんきですね",
    		"It's a Nice Day Today",
    		"今天天气很好啊",
    		"[Tomcat 2.x] xxx xxx, xxx"
    	};
    	
    	public static void main(String[] args) throws Exception {
    		String uri = args[0];
    		Path path = new Path(uri);
    		Configuration conf = new Configuration();
    		
    		IntWritable key = new IntWritable();
    		Text value = new Text();
    		MapFile.Writer writer = null;
    		try {
    			Option[] options = new Option[]{
    				Writer.keyClass(key.getClass()),
    				Writer.valueClass(value.getClass())
    			};
    			writer = new MapFile.Writer(conf, path, options);
    
    			for (int i = 0; i < DATAS.length * 100; i++) {
    				key.set(i + 1);
    				value.set(DATAS[i%DATAS.length].getBytes());
    				writer.append(key, value);
    			}
    			writer.close();
    		} catch (Exception e) {
    			IOUtils.closeStream(writer);
    			e.printStackTrace();
    		}
    		
    	}
    	
    }
    

    注意使用:SequenceFile.Writer.Option

    在hadoop下运行:

    [grid@tiny01 ~]$ hadoop MapFileWriterDemo file:///home/grid/result3.map
    17/08/11 02:28:59 INFO compress.CodecPool: Got brand-new compressor [.deflate]
    17/08/11 02:28:59 INFO compress.CodecPool: Got brand-new compressor [.deflate]
    

    运行后创建一个名为result3.map的文件夹,下面会有两个文件:index和data都是SequenceFile文件。

    [grid@tiny01 result3.map]$ pwd
    /home/grid/result3.map
    [grid@tiny01 result3.map]$ ll
    总用量 28
    -rw-r--r--. 1 grid grid 21328 8月  11 02:44 data
    -rw-r--r--. 1 grid grid   229 8月  11 02:44 index
    [grid@tiny01 result3.map]$ hadoop fs -text file:///home/grid/result3.map/data | head -10
    1       Hello World
    2       きょうはいいてんきですね
    3       It's a Nice Day Today
    4       今天天气很好啊
    5       [Tomcat 2.x] xxx xxx, xxx
    6       Hello World
    7       きょうはいいてんきですね
    8       It's a Nice Day Today
    9       今天天气很好啊
    10      [Tomcat 2.x] xxx xxx, xxx
    
    [grid@tiny01 result3.map]$ hadoop fs -text file:///home/grid/result3.map/index
    1       128
    129     5545
    257     10970
    385     16417
    
    

    解释一下
    MapFile是包含data和index两个文件的文件夹。这两个文件都是SequenceFile,data中包含所有数据,index包含一部分键和数据行的偏移量的映射。在上面的例子中,每隔128个键会创建一个索引。记录下该键的偏移量。
    我们可以设置间隔。当间隔大的时候我们需要更少的内存,但是由于间隔大所以访问效率较低;当间隔小的时候,我们需要更多的内存但是访问效率就会提高。

    5. MapFile的读操作

    对于MapFile的遍历和上面的SequenceFile中是相同的,因此不作介绍了。

    这里介绍MapFile特有的get()方法,用于随机访问文件中的数据:

    public Writable get(WritableComparable key ,Writable val )
    

    如果目标条目存在则返回目标条目,否则返回null

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.MapFile;
    import org.apache.hadoop.io.SequenceFile.Reader;
    import org.apache.hadoop.io.SequenceFile.Reader.Option;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    
    
    public class MapFileGetReaderDemo{
    	
    	public static void main(String[] args) throws Exception {
    		int key = Integer.parseInt(args[0]);
    		String uri = args[1];
    		
    		Path path = new Path(uri);
    		Configuration conf = new Configuration();
    		
    		MapFile.Reader reader = null;
    		try {
    			Option[] options = new Option[]{
    				Reader.file(path),
    			};
    			reader = new MapFile.Reader(path, conf,options);
    			
    			Writable writable = reader.get(new IntWritable(key), new Text());
    			System.out.println(writable.toString());
    			
    			reader.close();
    		} catch (Exception e) {
    			IOUtils.closeStream(reader);
    		}
    		
    	}
    	
    }
    

    运行结果:

    [grid@tiny01 ~]$ hadoop MapFileGetReaderDemo 20 file:///home/grid/result3.map 
    
    [Tomcat 2.x] xxx xxx, xxx
    

    6. 参考资料

    [1] Hadoop:The Definitive Guide,Third Edition, by Tom White. Copyright 2013 Tom White,978-1-449-31152-0

  • 相关阅读:
    实用工具分享
    美国西储大学轴承数据解读
    CSDN去广告插件
    [教程]SPSS for Mac 安装教程
    [教程]Ubuntu 安装 Docker CE
    [教程]Windows操作系统下安装Ubuntu虚拟机
    Chrome视频解析插件
    [软件]MATLAB小波包的分解与重构
    [信号处理]奈奎斯特采样定理
    [软件]LabVIEW编程实例:如何通过TCP协议进行数据通信
  • 原文地址:https://www.cnblogs.com/erygreat/p/7352514.html
Copyright © 2011-2022 走看看