1. 小文件的产生原因
定义: 当一个文件的大小小于 HDFS 的块大小(默认128MB)就认定为小文件,否则就是大文件
-
批处理,离线计算, 会有小文件的产生;
-
数据处理时,把数据源搬迁到 HDFS,如果数据源本身就是有很多小文件;
-
MapReduce作业 和 Spark作业时,没有设置好 Reduce Task个数,或者spark最后一层Task的数量。
2. 小文件的危害
-
HDFS不适合大量小文件的存储,因namenode将文件系统的元数据存放在内存中,因此存储的文件数目受限于 namenode的内存大小。HDFS中每个文件、目录、数据块 占用150Bytes。如果存放的文件数目过多的话会占用很大的内存甚至撑爆内存;
-
HDFS适用于高吞吐量,而不适合低时间延迟的访问。如果同时存入大量的小文件会花费很长的时间;
-
流式读取的方式,不适合多用户写入,以及任意位置写入。如果访问小文件,则必须从一个DataNode跳转到另外一个DataNode,这样大大降低了读取性能;
-
小文件过多,尽管hadoop集群配置了HA ,一旦原先NameNode挂掉, 备用的NameNode也会因小文件多,导致Block多,NameNode启动是把FsImage磁盘数据和Edits操作日志加载到内存的过程,Block越多,读取耗时也越长,启动就会很慢;
3. 解决方案
处理指导原则:
-
在写入HDFS前进行处理 (优先考虑,此处只考虑了mr产生的,暂不考虑基于tez,spark等其他写入hdfs的途径)
-
- 输入合并,在Map前合并小文件;
-
- 输出合并,在Reduce写入磁盘前合并小文件;
-
-
在写入hdfs后进行处理
-
- HDFS中的小文件合并
-
具体操作方法:
对于小文件合并,一般的做法是编写脚本、程序完成合并。
3.1 通过参数调节,设置map/reduce的数量
#设置map输入合并小文件的相关参数:
//每个Map最大输入大小(这个值决定了合并后文件的数量,调大可以减小Mapper数),默认128M
set mapred.max.split.size=1024*1024*512;
//一个节点上split的至少的大小(小于这个值会进行合并,这个值决定了多个DataNode上的文件是否需要合并)
set mapred.min.split.size.per.node=1024*1024*512;
//一个交换机下split的至少的大小(这个值决定了多个交换机上的文件是否需要合并)
set mapred.min.split.size.per.rack=100000000;
//设置hive输入端进行小文件合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
# 减少reduce的数量
-- 除非知道数据量的分布,一般不直接设置
set mapred.reduce.tasks=10;
-- 每个reduce处理的数据量,实际上可能每个reduce 处理的数据量会超过它,默认1G 增加该参数,可以减小reduce数量
set hive.exec.reducers.bytes.per.reducer=1073741824
# 设置map输出和reduce输出进行合并的相关参数:
//设置map端输出进行合并,默认为true
set hive.merge.mapfiles = true
//设置reduce端输出进行合并,默认为false
set hive.merge.mapredfiles = true
//设置合并文件的大小
set hive.merge.size.per.task = 256*1000*1000
//当输出文件的平均大小小于该值时,启动一个独立的MapReduce任务进行文件merge。
set hive.merge.smallfiles.avgsize=16000000
//ps:理论上设置了输出端小文件合并,会减少小文件的数量,可惜实际中即使设置了这些参数,对tdw分区表并不起作用
// hadoop v2.x 后一些参数名发生了变化
mapred.min.split.size => mapreduce.input.fileinputformat.split.minsize。
mapred.max.split.size => mapreduce.input.fileinputformat.split.maxsize。
mapred.min.split.size.per.rack => mapreduce.input.fileinputformat.split.minsize.per.rack。
mapred.min.split.size.per.node => mapreduce.input.fileinputformat.split.minsize.per.node。
dfs.block.size => dfs.blocksize
mapred.map.tasks => mapreduce.job.maps
3.2 hadoop 自带的小文件处理方法
1) Hadoop Archive 小文件归档
// 创建归档文件 把 input目录下所有文件 归档到 output/input.har, 原先input下的文件依然存在,需要手动删除才能释放原先在namenode中元数据占用的内存
hadoop archive -archiveName input.har -p /user/kuncai/input /user/kuncai/output
// 查看归档文件
hadoop fs -ls har:///user/kuncai/output3/input.har
// 解压归档文件
hadoop fs -cp har:///user/kuncai/output3/input.har/* /user/kuncai/
ps: 文件是许多记录组成的,那么可以通过调用 HDFS 的
sync()
方法(和append
方法结合使用),每隔一定时间生成一个大文件。另外:hive中的数据,也可以通过sort by 和 distrubute by 重建表,建表时减少reduce的数量,从而减少小文件;
2) SequenceFile 合并小文件;
原理: 使用文件名作为 key,文件内容作为 value ,把多个文件数据格式转换后统一输出;
通过自定义MR中的 Mapper,Driver,Reducer方法;
具体代码实现可参考此链接: https://examples.javacodegeeks.com/enterprise-java/apache-hadoop/hadoop-sequence-file-example/
3) CombineFileInputFormat<K,V> 小文件合并
核心思想:
根据一定的规则,将 HDFS 上多个小文件合并到一个 InputSplit 中,然后会启用一个 Map 来处理这里面的文件,以此减少MR整体作业的运行时间
具体代码实现可以参考此链接: https://blog.csdn.net/u011007180/article/details/52333387
4) HBase
如果你产生很多小文件,根据访问模式的不同,应该进行不同类型的存储。HBase 将数据存储在 Map Files(带索引的 SequenceFile)中,如果你需要随机访问来执行 MapReduce 流式分析,这是一个不错的选择。如果延迟是一个问题,那么还有很多其他选择 - 参见Richard Jones对键值存储的调查。
Other参考链接:
Hadoop 大量小文件问题的优化 https://cloud.tencent.com/developer/article/1482598
Hive-小文件优化实战 https://www.jianshu.com/p/8f0ce9eb0d0b