zoukankan      html  css  js  c++  java
  • 用Hadoop AVRO进行大量小文件的处理(转)

    使用 使用使用 使用 HDFS 保存大量小文件的缺点:
    1.Hadoop NameNode 在内存中保存所有文件的“元信息”数据。据统计,每一个文件需要消耗 NameNode600 字节内存。如果需要保存大量的小文件会对NameNode 造成极大的压力。
    2.如果采用 Hadoop MapReduce 进行小文件的处理,那么 Mapper 的个数就会跟小文件的个数成线性相关(备注:FileInputFormat 默认只对大于 HDFS Block Size的文件进行划分)。如果小文件特别多,MapReduce 就会在消耗大量的时间进行Map 进程的创建和销毁。
    为了解决大量小文件带来的问题,我们可以将很多小文件打包,组装成一个大文件。 Apache Avro 是语言独立的数据序列化系统。 Avro 在概念上分为两部分:模式(Schema)和数据(一般为二进制数据)。Schema 一般采用 Json 格式进行描述。Avro 同时定义了一些自己的数据类型如表所示:

    Avro基础数据类型

    类型

    描述

    模式

    null 

    The absence of a value

     "null"

    boolean

    A binary value

    "boolean"

    int

    32位带符号整数

    "int"

    long

    64位带符号整数

    "long"

    float

    32位单精度浮点数

    "float"

    double

    64位双精度浮点数

    "double"

    bytes

    byte数组

    "bytes"

    string

    Unicode字符串

    "string"

    类型

    描述

    模式

    array

    An ordered collection of objects. All objects in a particular array must have the same schema.

    {

    "type": "array",

    "items": "long"

    }

    map

    An unordered collection of key-value pairs. Keys must be strings and values may be any type, although within a particular map, all values must have the same schema.

    {

    "type": "map",

    "values": "string"

    }

    record

    A collection of named fields of any type.

    {

    "type": "record",

    "name": "WeatherRecord",

    "doc": "A weather reading.",

    "fields": [

    {"name": "year", "type": "int"},

    {"name": "temperature", "type": "int"},

    {"name": "stationId", "type": "string"}

    ]

    }

    enum

    A set of named values.

    {

    "type": "enum",

    "name": "Cutlery",

    "doc": "An eating utensil.",

    "symbols": ["KNIFE", "FORK", "SPOON"]

    }

    fixed

    A fixed number of 8-bit unsigned bytes.

    {

    "type": "fixed",

    "name": "Md5Hash",

    "size": 16

    }

    union

    A union of schemas. A union is represented by a JSON

    array, where each element in the array is a schema. Data represented by a union must match one of the schemas in the union.

    [

    "null",

    "string",

    {"type": "map", "values": "string"}

    ]

    Avro复杂数据类型

     通过上图所示,通过程序可以将本地的小文件进行打包,组装成一个大文件在HDFS中进行保存,本地的小文件成为Avro的记录。具体的程序如下面的代码所示:

    [java] view plaincopy
     
    1. public class Demo {  
    2.     public static final String FIELD_CONTENTS = "contents";  
    3.     public static final String FIELD_FILENAME = "filename";  
    4.     public static final String SCHEMA_JSON = "{"type": "record","name": "SmallFilesTest", "  
    5.             + ""fields": ["  
    6.             + "{"name":""  
    7.             + FIELD_FILENAME  
    8.             + "","type":"string"},"  
    9.             + "{"name":""  
    10.             + FIELD_CONTENTS  
    11.             + "", "type":"bytes"}]}";  
    12.     public static final Schema SCHEMA = new Schema.Parser().parse(SCHEMA_JSON);  
    13.   
    14.     public static void writeToAvro(File srcPath, OutputStream outputStream) throws IOException {  
    15.         DataFileWriter<Object> writer = new  DataFileWriter<Object>(new GenericDatumWriter<Object>()).setSyncInterval(100);  
    16.         writer.setCodec(CodecFactory.snappyCodec());  
    17.         writer.create(SCHEMA, outputStream);  
    18.         for (Object obj : FileUtils.listFiles(srcPath, null, false)){  
    19.             File file = (File) obj;  
    20.             String filename = file.getAbsolutePath();  
    21.             byte content[] = FileUtils.readFileToByteArray(file);  
    22.             GenericRecord record = new GenericData.Record(SCHEMA);  
    23.             record.put(FIELD_FILENAME, filename);  
    24.             record.put(FIELD_CONTENTS, ByteBuffer.wrap(content));  
    25.             writer.append(record);  
    26.             System.out.println(file.getAbsolutePath() + ":"+ DigestUtils.md5Hex(content));  
    27.         }  
    28.         IOUtils.cleanup(null, writer);  
    29.         IOUtils.cleanup(null, outputStream);  
    30.     }  
    31.   
    32.     public static void main(String args[]) throws Exception {  
    33.         Configuration config = new Configuration();  
    34.         FileSystem hdfs = FileSystem.get(config);  
    35.         File sourceDir = new File(args[0]);  
    36.         Path destFile = new Path(args[1]);  
    37.         OutputStream os = hdfs.create(destFile);  
    38.         writeToAvro(sourceDir, os);  
    39.     }  
    40. }  
    [java] view plaincopy
     
      1. public class Demo {  
      2.     private static final String FIELD_FILENAME = "filename";  
      3.     private static final String FIELD_CONTENTS = "contents";  
      4.   
      5.     public static void readFromAvro(InputStream is) throws  IOException {  
      6.         DataFileStream<Object> reader = new DataFileStream<Object>(is,new GenericDatumReader<Object>());  
      7.         for (Object o : reader) {  
      8.             GenericRecord r = (GenericRecord) o;  
      9.             System.out.println(r.get(FIELD_FILENAME)+ ":"+DigestUtils.md5Hex(((ByteBuffer)r.get(FIELD_CONTENTS)).array()));  
      10.         }  
      11.         IOUtils.cleanup(null, is);  
      12.         IOUtils.cleanup(null, reader);  
      13.     }  
      14.   
      15.     public static void main(String... args) throws Exception {  
      16.         Configuration config = new Configuration();  
      17.         FileSystem hdfs = FileSystem.get(config);  
      18.         Path destFile = new Path(args[0]);  
      19.         InputStream is = hdfs.open(destFile);  
      20.         readFromAvro(is);  
      21.     }  
      22. }  
  • 相关阅读:
    第3章 MFC框架程序剖析
    第2章 掌握C++
    第1章 Windows程序内部运行机制
    【MFC】画线
    使用RegSetValueEx修改注册表时遇到的问题(转)
    读书笔记
    POJ 1182[并查集]
    读书笔记
    HihoCoder 1532 : 最美和弦
    HihoCode 1531 : 德国心脏病
  • 原文地址:https://www.cnblogs.com/fillPv/p/5010827.html
Copyright © 2011-2022 走看看