1. 尽早去除无用的数据
MapReduce Job的很大一部分开销在于磁盘IO和数据的网络传输,如果能尽早的去除无用的数据,减少数据量,会提升Pig的性能。
1). 尽早的使用Filter
使用Filter可以去除数据中无用的行(Record),尽早的Filter掉无用的数据,可以减少数据量,提升Pig性能。
2). 尽早的使用Project(Foreach Generate)
使用Foreach Generate可以去除数据中无用的列(Column),减少数据量,提升Pig性能。
2. 使用Combiner
Combiner可以对Map的结果进行combine,减少Shuffle的数据量。
在Pig中实现UDF时,应该尽可能地实现Algebraic接口(实现Algebraic接口的function可以对中间结果执行多次而不影响最终结果,比如Count, Sum等都是Algebraic function),这样的UDF就可能进行Combine,条件如下:
如果在Group之后的Foreach语句中,所有投影都是针对分组(Group)的列的表达式,或者是Algebraic UDF的表达式时,就可以使用Combiner(表达式包括Sum,Count,Distinct或者其他数学表达式等)。
3. Join优化
当进行Join时,最后一个表不会放入内存,而是以stream的方式进行处理,所以最好把最大的一个表放置到Join语句的最后。
Pig实现了以下三种定制的Join以进一步优化。
1) Replicated Join
当进行Join的一个表比较大,而其他的表都很小(能够放入内存)时,Replicated Join会非常高效。
Replicated Join会把所有的小表放置在内存当中,然后在Map中读取大表中的数据记录,和内存中存储的小表的数据进行Join,得到Join结果,无需Reduce。
可以在Join时使用 Using 'replicated'语句来触发Replicated Join,大表放置在最左端,其余小表(可以有多个)放置在右端。
2) Skewed Join
当进行Join的两个表中,一个表数据记录针对key的分布极其不均衡的时候,简单的使用Hash来分配Reduce端的key时,可能导致某些Reducer上的数据量特别大,降低整个集群的性能。
Skewed Join可以首先对左边的表的key统计其分布,然后决定Reduce端的key的分布,尽量使得Reduce端的数据分布比较均衡。
可以在Join时使用Using 'skewed'语句来触发Skewed Join,需要进行统计的表(亦即key可能分布不均衡的表)放置在左端。
3) Merge Join
当进行Join的两个表都已经是有序的时,可以使用Merge Join。
Join时,首先对右端的表进行一次采样,对采样的数据创建索引,记录(key, 文件名, 偏移[offset])。然后进行map,读取Join左边的表,对于每一条数据记录,根据前一步计算好的索引来查找数据,进行Join。
可以在Join时使用Using 'merge'语句来触发Merge Join,需要创建索引的表放置在右端。
另外,在进行Join之前,首先过滤掉key为Null的数据记录可以减少Join的数据量。
4. 使用压缩来提高性能
通过压缩Map/Reduce之间的数据,以及Job之间需要传输的数据,可以显著的减少需要存储在硬盘上的和需要传输的数据,提升Pig的性能。
1) 压缩Map/Reduce之间的数据
通过设置mapred.compress.map.output = true可以对Map的结果进行压缩,压缩的方法可以通过下面的语句来进行设置:mapred.map.output.compression.codec = org.apache.hadoop.io.compress.GzipCodec / com.hadoop.compression.lzo.LzopCodec。
Gzip的压缩效率比较高,但是比较消耗CPU,所以通常情况下可以使用Lzo来进行压缩。
2) 压缩Job之间的数据
通过设置pig.tmpfilecompression = true可以对Job之间的数据进行压缩,压缩的方法可以通过pig.tmpfilecompres sion.codec = org.apache.hadoop.io.compress.GzipCodec / com.hadoop.compression.lzo.LzopCodec来进行设置。
5. 设置Reduce的并发数
可以通过PARALLEL = n 来设置Reduce的并发数(Map的并发数不可以设置),可以启动Reduce的操作包括:
COGROUP, CROSS, DISTINCT, GROUP, JOIN (inner), JOIN (outer), 和 ORDER BY。
需要注意的是,PARALLEL并不是越大越好,这需要根据集群的配置来确定,比较合理的PARALLEL数 = 集群节点数*mapred.tasktracker.reduce.tasks.maximum。后者默认为2。
参考文献:
[1] Pig Performance and Efficency
[2] Alan Gates. Programming Pig.
[3] Pig Cookbook.