一、目的
a. 减小磁盘占用
b. 加速网络IO
二、几个常用压缩算法
是否可切分:是指压缩后的文件能否支持在任意位置往后读取数据。
各种压缩格式特点:
压缩算法都需要权衡 空间/时间 ;压缩率越高,就需要更多的压缩解压缩时间;压缩时有9个级别来控制:1为优化压缩速度,9为优化压缩率(如 gzip -1 file);
相比之下 gzip是一个空间/时间都比较适中的压缩算法;bzip2特点是压缩率高,且可切分;LZO/LZ4/Snappy压缩速度快,比gzip快一个数量级,且LZ4和Snappy又比LZO快很多;
三、使用方法
1、codec
在Hadoop中,一个对CompressionCodec接口的实现代表一个codec,例如GzipCodec封装了gzip算法;
2、在代码利用CompressionCodec来实现
public class StreamCompressor { public static void main(String[] args) throws Exception { String codecClassname = args[0]; Class<?> codecClass = Class.forName(codecClassname); Configuration conf = new Configuration(); CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); CompressionOutputStream out = codec.createOutputStream(System.out); IOUtils.copyBytes(System.in, out, 4096, false); out.finish(); } }
可利用CompressionCodecFactory 来识别压缩文件的算法,并可获取对应的codec:CompressionCodec codec = factory.getCodec(inputPath);主要利用文件名的后缀来识别判断。
3、使用native来实现压缩
可通过Java系统的java.library.path来设置原生代码库,bin目录下的hadoop脚本可能设置,也可手动去设置。
hadoop会默认去搜索是否有原生代码库,如有则会加载使用。也可心通过hadoop.native.lib去禁用使用原生代码库。
4、若大量使用codec,可使用CodecPool来优化创建和销毁codec的开销。
四、选择压缩算法
不支付切分的压缩算法,若在压缩后大于HDFS上一人块的大小,那在HDFS分被开存储;MapReduce在处理它时不会以split设置的大小去切分它,它会识别出不可切分后,会把整个文件都读取过来进行处理;总的来说这样并不好,牺牲了数据的本地性,大量时间花费在网络IO上。
选择优先从上到下:(主要考虑文件大小 )
1、大量小文件:使用容器文件格式,如SequenceFile、RCFile或Avro数据文件,所有这些都支持压缩和切分;通常与一个快速压缩工具使用,如LZO,LZ4,Snappy;
2、大文件:bzip2是个不错的选择,尽管非常慢;或者索引过的LZO;
3、压缩后和block差不多大(略小于block),可使用LZOLZ4Snappy;
五、在MapReduce中使用压缩
1、mapred.output.compress属性设为true,mapred.output.compression.codec设置为要使用的压缩的codec的类名;
2、在代码中设置:
FileOutputFormat.setCompressOutput( job, true );
FileOutputFormat.setOutputCompressorClass( job, GzipCodec.class );
3、如果是SequenceFile,可设置mapred.output.compression.type来限制压缩格式。默认是RECORD,可选NONE,BLOCK;
或使用SequenceFileOutputFormat类中静态方法putCompressionType()来设置。
4、只对map输入进行压缩:
map结果存储在节点的本地,并通过网络传输到reducer,这个过程可使用LZO,LZ4,Snappy来获取性能的提升;
或在作业中使用:
//新API Configuration conf = new Configuration(); conf.setBoolean("mapred.compress.map.output", true); conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class); Job job = new Job(conf); //旧API conf.setCompressMapOutput(true); conf.setMapOutputCompressorClass(GzipCodec.class);