在《聊一聊Spark写文件的机制——如何保证数据一致性》一文中,我们分析了Spark写文件的机制,探讨了多个File Output Committer在性能与数据一致性上的权衡,以及针对AWS S3这样的对象存储的优化思路。文章结尾处,曾提到我们将会采用EMRFS S3-optimized Committer来解决Rename机制带来的性能与一致性问题。然而,最近在尝试使用这个Committer时,发现其并不如预期那么美好,存在诸多问题,这里将其总结下来,与诸位同道中人分享。
1. 前言
我们先来回顾下之前介绍过的三种Committer:FileOutputCommitter V1、FileOutputCommitter V2、S3A Committer,其基本代表了整体的演进趋势:
FileOutputCommitter V1,采用两次Commit的方式来保证较强的一致性,每次Commit都对应一次文件的Rename。每个Task先将数据写入到Task的临时目录下,写完后将其Rename到Job的临时目录下;所有Task都完成后,由Job负责将其临时目录下的所有文件Rename到正式目录下,此时文件对外可见。对于HDFS而已,Rename是一个十分高效的操作,然而对于S3这样的对象存储来说,则有着很大的代价。原因在于,S3本身并不是文件系统,不存在Rename操作,一个Rename操作需要分解为List + Copy + Delete操作。因此,对于S3而言,两次Rename有着非常大的性能开销。我们经常发现,Spark UI上看到各个Task已经结束了,但是Job就是迟迟不结束,有点像hang住了,其实就是在做第二次Rename。
FileOutputCommitter V2,在V1的基础上减去了第二次Rename,即每个Task先将数据写入到Task的临时目录下,写完后直接将其Rename到正式目录中;所有Task都完成后,Job只是写入一个_SUCCESS文件来标识已完成。显然,V2是牺牲一定的一致性来换取性能。因为,如果Spark Job在执行过程中失败了,就会出现部分成功的Task写入的文件对外可见,成为脏数据。
S3A Committer,最初由Netflix贡献给社区,采用S3 Multipart Upload机制替换了Rename机制。原因在于,对于S3而言,Rename不仅仅会带来性能问题,还可能因为S3的“最终一致性”特性而失败。社区版的这个Committer,会在每个Task中将数据先写入到本地磁盘,然后采用Multipart Upload方式上传到S3;所有Task都完成后,由Job统一向S3发送Complete信号,此时文件对外可见。
综合来看,使用Spark往S3写入文件时,应该尽量选择基于S3 Multipart Upload机制的Committer。在我们的系统中,主要采用AWS EMR来构建Spark集群,数据写入S3存储。EMR在5.19.0之后引入了EMRFS S3-optimized Committer,同样采用S3 Multipart Upload机制,因此我们会优先使用这个Committer。
本文所述均基于EMR5.29.0、Spark2.4.4。
2. AWS EMRFS S3-optimized Committer
EMRFS S3-optimized Committer,如AWS官方博客所言,其思想来自于S3A Committer,但是就目前的实现来看,坦白说,个人感觉比较鸡肋。一方面,这是官方出品,虽然不开源,但是相信其内部做了很多跟EMR集成的东西,因此我们通常会默认选择使用;另一方面,它有两个比较大的缺陷:
- 目前只能对部分符合条件的语法代码生效,比如只能是写Parquet文件,具体可以参考官方文档
- 它只是在FileOutputCommitter V2的基础上进行了改进,即将V2中的Rename机制替换为了S3 Multipart Upload机制,因此V2存在的数据一致性问题它也存在
对于第一个缺陷,我们在使用中经常战战兢兢,需要确认是否有效触发了这个Committer。以下面代码为例,我们在Spark Streaming中,每10分钟一个Batch,对数据进行加工处理后写入到S3中。该代码是否触发到了这个Committer呢?通过INFO Log分析来看,是有触发到的。下图上半部分是一个Executor的Log,下半部分是Driver的Log,读者可以参考这个来判断是否有效触发。
df.write.mode("append")
.partitionBy("ts_interval", "schema_version")
.option("path", s3_path)
.saveAsTable(table)
3. Job失败带来的数据不一致问题
这里重点探讨一下第二个缺陷,即Job失败带来的数据不一致问题,也是FileOutputCommitter V2存在的问题。如下图,假设一个Job有12个Task,执行过程中Task 0~2成功了,而其他失败了,进而导致Job失败了。此时,在S3上就可以看到前面三个Task写入的文件,当这个Job重做一次时,已经写入的文件的数据就会成为重复的数据。
针对这个问题,有哪些解法呢?目前,就我们接触到的而言,主要有三种方法,可以分别应用在不同的业务场景。
第一种,每次写入的目录都带有一个UUID,在整体文件写入成功后,将这个目录分发出去,给下游的Reader使用。比如下面的代码中,seq就扮演着这样的角色。但是,这样做还是会有数据残留在S3中的,只是暂时不会被下游Reader读到而已。
path = "s3://{bucket}/type={type}/ts_interval={ts_interval}/seq={uuid}"
.format(bucket=args["bucket"],
type=log_type,
ts_interval=ts_interval,
uuid=uuid.uuid1())
df.write.parquet(path)
第二种,采用overwrite写入的方式,该方式需要有一个UUID来标识某一批数据,保证该批数据在多次写入时UUID是不变的,而不同批次的数据的UUID是不同的。比如,在Spark Streaming中,每个Batch的数据,就可以使用Batch Timestamp来作为这个UUID。在下面的代码中,就可以达到这个效果,然而遗憾的是,AWS官方文档明确提出了目前这种写法无法触发到EMRFS S3-optimized Committer。
df.write.mode("overwrite")
.partitionBy("ts_interval", "schema_version", "ts_batch")
.option("path", s3_path)
.option("partitionOverwriteMode", "dynamic")
.saveAsTable(table)
第三种,保持写文件的方式不变,但是在Job失败后,捕获其异常,然后进行一次补偿,即删除掉多余的文件。我们知道,每次commitJob成功后,都会写入一个_SUCCESS文件来标识整体写入成功。如果在这个文件的Last Modified Time之后又有一些新的文件残留在S3上,我们就认为其实脏数据,将其删除。当然,随着数据量的积累,我们不可能检测所有的数据,不过对于数据实时上传的业务而言,只要检测最近一段时间内的数据文件就好了。
4. 残留数据问题
除了上述的问题之外,采用S3 Multipart Upload机制实现的Committer还会存在一些共性的数据残留问题,需要在实践中有所注意。残留的数据主要来自两方面:
- 每个Task的数据会先写入到本地磁盘,比如上面的“/mnt/s3/emrfs-4425809305170904769/0000000000”,如果Task中断,有可能会有数据残留在本地
- S3 Multipart Upload会先将上传的多个Part的文件放在S3的cache隐藏目录,如果Task中断,有可能会有数据残留在S3
对于第一方面,通过脚本监控相应的本地目录的磁盘大小并定期清理掉历史悠久的数据即可,避免磁盘被用爆了。
对于第二方面,残留在S3上的数据虽然对外不可见,但是会被收取存储费用的,因此需要进行相关清理,目前有两种方式:
- 设置Spark参数fs.s3.multipart.clean.enabled,该方式会启动一个异步进程来定期清理,会有一定的负载压力
- 在S3中配置相关Policy Lifecyle的属性即可,我们更倾向于这种方式,由S3来负责解决,没有额外开销
<LifecycleConfiguration>
<Rule>
<ID>sample-rule</ID>
<Prefix></Prefix>
<Status>Enabled</Status>
<AbortIncompleteMultipartUpload>
<DaysAfterInitiation>7</DaysAfterInitiation>
</AbortIncompleteMultipartUpload>
</Rule>
</LifecycleConfiguration>
以上便是当前我们在AWS EMRFS S3-optimized Committer,虽然存在诸多问题,我们还是尽量优先选择使用,毕竟是官方出品的,也期望其能做的越来越好。