zoukankan      html  css  js  c++  java
  • parquet列式文件实战

    前言

    列式文件,顾名思义就是按列存储到文件,和行式存储文件对应。保证了一列在一个文件中是连续的。下面从parquet常见术语,核心schema和文件结构来深入理解。最后通过java api完成write和read。

     

    术语

    block

    parquet层面和row group是一个意思

    row group

    逻辑概念,用于对row进行分区。由数据集中每个column的column chunk组成。是读写过程中的缓存单元

    column chunk

    某个column的所有数据被称为column chunk,存在于row group,并保证在文件中是连续的

    page

    多个column chunk之间用page分开,也就是说一个page只会包含一个column的数据,一个page是一个独立的单元(可以被编码或者压缩)

    dictionary page

    每个page之前都可以选择是否需要dictionary page。dictionary page记录了该page所有不同的值。这可以增强处理速度提高压缩率。

    总结

    一个文件由多个row group组成,一个row group包括了多个column chunk,一个column chunck就是某个column的所有数据集, 被分割成多个page,一个page是最小的处理单元,可以被编码或者压缩。

    schema

    每种文件都有自己特有的规则,像csv文件,是用分隔符分隔开的一个个列。parquet文件也有自己独特的schema格式。

    这是一个parquet文件的schema例子,对应的api是MessageType

    message person{
      required binary name (UTF8);
      required int age;
      repeated group family{
        required binary father (UTF8);
        required binary mother (UTF8);
        optional binary sister (UTF8);
      }
    }

    message

    固定声明,就像结构体中的struct一样。

    person

    message name,可以粗暴的理解为表名,因为里面都是field。

    optional,required,repeated

    这是三种field的关键字,分别表示可选,必选,可重复选

    可选和必选类似数据库中的nullable,可重复选是为了支持复杂的嵌套结构。

    field类型

    目前parquet支持int32,int64,int96(有些系统会把时间戳存成int96如老版本hive),float,double,boolean,binary,fixed_len_byte_array。

    参考类org.apache.parquet.schema. PrimitiveType.PrimitiveTypeName

    UTF8

    field的原始类型(Original Type),可以辅助field的type进行细粒度的类型判断。

    参考类 org.apache.parquet.schema. OriginalType

    group

    嵌套结构声明,类似json对象

    schema&数据

    schema有了,那如何把schema和数据关联起来,也就是说可以通过schema构建或者解析出相应的数据。那就引出了嵌套关系,definition level和repetitional level。用于定位数据到底出现在嵌套中(如果有嵌套的话)的哪一层。值得注意的是,嵌套关系是针对列而言的,不同列有各自的嵌套关系。

    definition level

    optional字段定位,如果实际没有数据就为0,有数据就为1。涉及到嵌套optional,那么可以这么理解,如果从某一层开始没有该数据,那么该层之前肯定是有数据的,该层之后肯定没有数据。举个简单的例子

    message ExampleDefinitionLevel {
      optional group a {
        optional group b {
          optional string c;
        }
      }
    }

    这个schema对应的definition level所有的可能性如表所示

     

    repetition level

    repeated字段定位,如果在嵌套中某一层出现了值,那么就记录该层。那一个例子来说:

    message AddressBook {
      required string owner;
      repeated string ownerPhoneNumbers;
      repeated group contacts {
        required string name;
        optional string phoneNumber;
      }
    }

    针对不同的列,defnition level和repetition level的最大值如表

     

    文件结构

    结构图

     

    详细

    一个parquet文件由3部分组成,header,blocks,footer。类似一般文档中的页眉,正文,页脚。

    header

    只包含4个字节的魔数,PAR1

    blocks

    block定义参考“术语”

    footer

    记录了该parquet文件正文所有metadata,

    文件物理格式

    通过 cat -v 查看一个parquet,会看到很多的non-printable字符,比如:^U^@^U^P^U^P,^U^B^U^@^

    这些字符其实是可以和ascii互相映射,比如^@就是ascii中的0,详细可以看这篇文档

    https://docstore.mik.ua/orelly/unix/upt/ch25_07.htm

    其实就是八进制的ascii,小于100的+100,大于100的减100。

    所有的列,包括嵌套结构,例如test.c1和test.c2属于两个列,都是连续存储在parquet文件中。

    参考资料

    // twitter对parquet的概述

    https://blog.twitter.com/engineering/en_us/a/2013/announcing-parquet-10-columnar-storage-for-hadoop.html

    // parquet的github

    https://github.com/apache/parquet-format

    // 很详细的parquet文件解析

    http://www.infoq.com/cn/articles/in-depth-analysis-of-parquet-column-storage-format

    coding

    public static MessageType getMessageTypeFromCode(){
        MessageType messageType =
                Types.buildMessage()
                .required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("id")
                .required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("name")
                .required(PrimitiveType.PrimitiveTypeName.INT32).named("age")
                .requiredGroup()
                  .required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("test1")
                  .required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("test2")
                  .named("group1")
                .named("trigger");
        return messageType;
    }
    
    public static void writeParquet(String name){
    
        // 1. 声明parquet的messageType
        MessageType messageType = getMessageTypeFromCode();
        System.out.println(messageType.toString());
    
        // 2. 声明parquetWriter
        Path path = new Path("/tmp/etl/"+ name);
        Configuration configuration = new Configuration();
        GroupWriteSupport.setSchema(messageType, configuration);
        GroupWriteSupport writeSupport = new GroupWriteSupport();
    
        // 3. 写数据
        ParquetWriter<Group> writer = null;
        try {
            writer = new ParquetWriter<Group>(path,
                    ParquetFileWriter.Mode.CREATE,
                    writeSupport,
                    CompressionCodecName.UNCOMPRESSED,
                    128*1024*1024,
                    5*1024*1024,
                    5*1024*1024,
                    ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED,
                    ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED,
                    ParquetWriter.DEFAULT_WRITER_VERSION,
                    configuration);
            Random random = new Random();
    
            for(int i=0; i<10; i++){
                // 4. 构建parquet数据,封装成group
                Group group = new SimpleGroupFactory(messageType).newGroup();
                group.append("name", i+"@qq.com")
                        .append("id",i+"@id")
                        .append("age",i)
                .addGroup("group1")
                        .append("test1", "test1"+i)
                        .append("test2","test2"+i);
                writer.write(group);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if(writer != null){
                try {
                    writer.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
    }
    
    
    public static void readParquet(String name){
        // 1. 声明readSupport
        GroupReadSupport groupReadSupport = new GroupReadSupport();
        Path path = new Path("/tmp/etl/"+name);
    
        // 2.通过parquetReader读文件
        ParquetReader<Group>reader = null;
        try {
            reader = ParquetReader.builder(groupReadSupport, path).build();
            Group group = null;
            while ((group = reader.read()) != null){
                System.out.println(group);
            }
    
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if(reader != null){
                try {
                    reader.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
  • 相关阅读:
    JMeter学习-027-JMeter参数文件(脚本分发)路径问题:jmeter.threads.JMeterThread: Test failed! java.lang.IllegalArgumentException: File distributed.csv must exist and be readable解决方法
    JMeter学习-026-JMeter 分布式(远程)参数化测试实例
    JMeter学习-025-JMeter 命令行(非GUI)模式详解(三)-测试图形化 HTML 报表(dashboard)生成
    JMeter学习-024-JMeter 命令行(非GUI)模式详解(二)-执行代理设置
    JMeter学习-023-JMeter 命令行(非GUI)模式详解(一)-执行、输出结果及日志、简单分布执行脚本
    JMeter学习-022-JMeter 分布式测试(性能测试大并发、远程启动解决方案)
    JMeter学习-021-JMeter 定时器(Synchronizing Timer)之集合点应用
    JMeter学习-020-JMeter 监听器之【聚合报告】错误率、吞吐量、传输速率实例计算
    Selenium2学习-038-firefox、webdriver版本不对称问题解决:org.openqa.selenium.firefox.NotConnectedException: Unable to connect to host 127.0.0.1 on port 7055
    JMeter学习-019-JMeter 监听器之【聚合报告】界面字段解析及计算方法概要说明
  • 原文地址:https://www.cnblogs.com/ulysses-you/p/7985240.html
Copyright © 2011-2022 走看看