Parquet列式存储
Apache Parquet是Hadoop生态系统中的列式存储格式,面向分析型业务,与数据处理框架、数据模型、编程语言无关。
● 优势
降低存储空间:按列存,能够更好地压缩数据,因为一列的数据一般都是同质的(homogenous)
提高IO效率:扫描(遍历/scan)的时候,可以只读其中部分列. 而且由于数据压缩的更好的缘故,IO所需带宽也会减小
降低上层应用延迟
查询引擎: Hive, Impala, Pig, Presto, Drill, Tajo, HAWQ, IBM Big SQL
计算框架: MapReduce, Spark, Cascading, Crunch, Scalding, Kite
数据模型: Avro, Thrift, Protocol Buffers, POJOs
当时Twitter的日增数据量达到压缩之后的100TB+,存储在HDFS上,工程师会使用多种计算框架(例如MapReduce, Hive, Pig等)对这些数据做分析和挖掘;
日志结构是复杂的嵌套数据类型,例如一个典型的日志的schema有87列,嵌套了7层。所以需要一种列式存储格式,既能支持关系型数据(简单数据类型),又能支持复杂的嵌套类型的数据,同时能够适配多种数据处理框架。
还原嵌套结构
● 难点
处理嵌套的数据结构才是真正的挑战。
多个 field 可以形成一个 group,一个 field 可以重复出现(叫做 repeated field)。用 group 和 repeated field 的各种组合来描述。
● Definition Level
知道到底是从哪一级开始没定义的,这是还原整条记录所必须知道的。
不要让 definition level 太大,这很重要,目标是所用的比特越少越好。
● Repetition level
标示着新 List 出现的层级。
repetition level 告诉我们,在从列式表达,还原嵌套结构的时候,是在哪一级插入新值。
示意图:http://lastorder.me/tag/parquet.html
每一个基本类型的列,都要创建三个子列(R, D, Value)。
三个子列的总开销其实并不大. 因为两种 Levels的最大值,是由 schema 的深度决定的,并且通常只用几个 bit 就够用了(1个bit 就可表达1层嵌套,2个bit就可以表达3层嵌套了,3个bit就能够表达7层嵌套。
为了通过列是存储,还原重建这条嵌套结构的记录,写一个循环读列中的值。
R=0, D=2, Value = “555 987 6543”:
R = 0 这是一个新的 record. 从根开始按照schema 重建结构,直到 repetition level 达到 2
D = 2 是最大值,值是有定义的,所以此时将值插入.
R=1, D=1:
R = 1 level1 的 contact list 中一条新记录
D = 1 contacts 有定义,但 phoneNumber 没定义,所建一个空的 contacts 即可.
R=0, D=0:
R = 0 一条新 record. 可以重建嵌套结构,直到达到 definition level 的值.
D = 0 => contacts 是 null,所以最后拼装出来的是一个空的 Address Book
Parquet列式存储 替代 HDFS文件
提高查询性能、存储压缩
● 《Spark SQL下的Parquet使用最佳实践和代码实战》http://blog.csdn.net/sundujing/article/details/51438306
● 《操作技巧:将 Spark 中的文本转换为 Parquet 以提升性能》http://www.ibm.com/developerworks/cn/analytics/blog/ba-parquet-for-spark-sql/index.html
● 《[Big Data]从Hadoop到Spark的架构实践》http://www.cnblogs.com/losbyday/p/5854618.html
● 《Spark SQL 下DateFrame的初步认识(3)》http://blog.csdn.net/erfucun/article/details/52086858
业界的主流公司在做大数据分析的时候,基本上都以Parquet方式存储数据。
Parquet文件是基于列式存储的,列之间是分开的,可以进行各种高效的优化。Spark底层一般都会接 Parquet文件。
Parquet是列式存储格式的一种文件类型,列式存储有以下的核心优势:
a.可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量。
b.压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如Run Length Encoding和Delta Encoding)进一步节约存储空间。
c.只读取需要的列,支持向量运算,能够获取更好的扫描性能。
数据处理流程
● 离线数据挖掘:HDFS(JSON) -> Spark/Hive(ETL) -> HDFS Parquet -> Spark SQL / ML / GraphX
● 实时即时分析:Kafka -> Spark Streaming -> HDFS(JSON) -> HDFS Parquet -> Spark SQL / ML / GraphX
● 实时流式计算:Kafka -> Spark Streaming -> Redis
Spark Streaming完成以下工作
1. 原始日志的保存。将Kafka中的原始日志以JSON格式无损的保存在HDFS中
2. 数据清洗和转换,清洗和标准化之后,转变为Parquet格式,存储在HDFS中,方便后续的各种数据计算任务
3. 定义好的流式计算任务,比如基于频次规则的标签加工等等,计算结果直接存储在MongoDB中
将文件转换为Parquet格式
cd /usr/local/hadoop
./sbin/start-dfs.sh
./sbin/start-yarn.sh
# 上传行为日志文件至HDFS
hdfs dfs -mkdir /user/hadoop/test
hdfs dfs -put /home/hadoop/test/user_behavior.txt /user/hadoop/test/
# 启动Spark
cd /usr/local/spark
./bin/spark-shell
// 读取HDFS上的JSON文件
val df = sqlContext.read.json("hdfs://localhost:9000/user/hadoop/test/user_behavior.txt")
// 保存为parquet文件
df.saveAsParquetFile("hdfs://localhost:9000/user/hadoop/test/userBehavior.parquet")
// 读取parquet文件
val parquetData = sqlContext.parquetFile("hdfs://localhost:9000/user/hadoop/test/userBehavior.parquet")
parquetData.show()
parquetData.printSchema()
// 执行计算
df.groupBy("Behavior").count().show()
parquetData.groupBy("Behavior").count().show()