zoukankan      html  css  js  c++  java
  • Hadoop优化之数据压缩

    bBHadoop数据压缩

    概述

    运行hadoop程序时,I/O操作、网络数据传输、shuffle和merge要花大量的时间,尤其是数据规模很大和工作负载密集的情况下,这个时候,使用数据压缩可以提高效率

    压缩策略和原则

    压缩是提高Hadoop运行效率的一种策略

    通过对Mapper、Reducer运行过程的数据进行压缩,减少磁盘IO,提高运行速度

    压缩原则

    1. 运算密集型的job,少用压缩

    2. IO密集型的job,多用压缩

    总结:当面对一些较大IO量的数据是,使用压缩会提高效率

    Hadoop支持的压缩编码

    压缩格式hadoop自带?算法文件扩展名是否可切片换成压缩格式后,原来的程序是否需要修改
    DEFLATE 是,直接使用 DEFLATE .deflate 和文本处理一样,不需要修改
    Gzip 是,直接使用 DEFLATE .gz 和文本处理一样,不需要修改
    bzip2 是,直接使用 bzip2 .bz2 和文本处理一样,不需要修改
    LZO 否,需要安装 LZO .lzo 需要建索引,还需要指定输入格式
    Snappy 是,直接使用 Snappy .snappy 和文本处理一样,不需要修改

    为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器,如下表所示。

    压缩格式对应的编码/解码器
    DEFLATE org.apache.hadoop.io.compress.DefaultCodec
    gzip org.apache.hadoop.io.compress.GzipCodec
    bzip2 org.apache.hadoop.io.compress.BZip2Codec
    LZO com.hadoop.compression.lzo.LzopCodec
    Snappy org.apache.hadoop.io.compress.SnappyCodec

    压缩性能的比较

    压缩算法原始文件大小压缩文件大小压缩速度解压速度
    gzip 8.3GB 1.8GB 17.5MB/s 58MB/s
    bzip2 8.3GB 1.1GB 2.4MB/s 9.5MB/s
    LZO 8.3GB 2.9GB 49.3MB/s 74.6MB/s

    压缩方式的选择

    GZIP压缩

    优点:压缩率较高,压缩解压速度快,hadoop和linux自带,方便
    缺点:不支持切片
    应用场景:因为GZIP压缩不支持切片,所以当每个文件压缩之后再130M以内,都可以考虑使用GZIP压缩格式,例如吧一天的日志压缩成GZIP

    BigZIP2压缩

    优点:支持Split;具有很高的压缩率,比Gzip压缩率都要,Hadoop自带,方便
    缺点:速度慢
    应用场景:适合对速度要求不高,但需要较高压缩率的时候。或者对单个很大的文本文件想压缩减少存储空间,同时又需要支持Split,而且兼容之间的应用程序的时候

    LZO压缩

    优点:解压缩速度快,合理的压缩率。支持Split(需要建立索引),是Hadoop中最流行的压缩格式;可以在Linux下安装LZO命令,
    缺点:压缩率比Gzip要低一些;Hadoop本身不支持,需要安装;在应用中对LZO格式的文件要做一些特殊处理(为了支持Split要建索引,并且把InputFormat指定为LZO格式)
    应用场景:一个很大的文件,压缩之后还大于200M以上的可以考虑,而且单个文件越大,Lzo优点越明显

    Snaapy压缩

    优点:高速压缩速度和合理的压缩率
    缺点:不支持Split;压缩率比Gzip还要低;Hadoop本身不支持,需要安装
    应用场景:当MR作业中Map输出的数据较大时,作为Map到Reduce的中间数据的压缩格式;或者作为一个MR的输出到另外一个MR的输入
    MR流程图内,map-shuffle落盘之间适合使用此压缩算法

    压缩位置的选择

    压缩可以在MR作用的任意阶段启用

    • 在输入端启用压缩

      有大量数据并计划重复处理的情况下,应该考虑对数据进行压缩;
      这时候无序指定压缩编码,Hadoop能自动检测文件拓展名,如果拓展名能够匹配,就会匹配恰当的编码方式对文件进行压缩和解压,否则就不会进行压缩/解压
    • 在Mapper输出启用压缩

      当Map任务输出的中间数据量很大时,应在此阶段考虑此压缩技术,能显著提升Shuffle效率
      shuffle过程在Hadoop处理过程中资源消耗最多的环节,如果发现数据量大造成网络传输缓慢,应考虑使用压缩技术,可用于Mapper输出的快速编码器包括LZO和Snappy

    注意事项(建议Mapper输出阶段使用LZO压缩编码,会让Map阶段完成时间快4倍)

    LZO是供Hadoop压缩数据用的通用编码器,其设计目标是达到与硬盘读取速度相当的压缩速度,因此速度是LZO优先考虑的因素,而不是压缩率
    与Gzip编解码器相比,它的压缩速度是Gzip的5倍,而解压速度是Gzip的2倍。同一个文件用LZO压缩后比用Gzip压缩后大50%,但比压缩前小25%~50%。这对改善性能非常有利,Map阶段完成时间快4倍。
    • Reduce端

      在此阶段使用压缩可以降低磁盘使用量,在进行链条式作业时也同样有效

    压缩参数配置

    要在Hadoop中启用压缩,可以配置如下参数:

    参数默认值阶段建议
    io.compression.codecs (在core-site.xml中配置) 无,这个需要在命令行输入hadoop checknative查看 输入压缩 Hadoop使用文件扩展名判断是否支持某种编解码器
    mapreduce.map.output.compress(在mapred-site.xml中配置) false mapper输出 这个参数设为true启用压缩
    mapreduce.map.output.compress.codec(在mapred-site.xml中配置) org.apache.hadoop.io.compress.DefaultCodec mapper输出 企业多使用LZO或Snappy编解码器在此阶段压缩数据
    mapreduce.output.fileoutputformat.compress(在mapred-site.xml中配置) false reducer输出 这个参数设为true启用压缩
    mapreduce.output.fileoutputformat.compress.codec(在mapred-site.xml中配置) org.apache.hadoop.io.compress.DefaultCodec reducer输出 使用标准工具或者编解码器,如gzip和bzip2
    mapreduce.output.fileoutputformat.compress.type(在mapred-site.xml中配置) RECORD reducer输出 SequenceFile输出使用的压缩类型:NONE和BLOCK

    数据压缩案例

    CompressionCodec有两个方法可以用于轻松地压缩或解压缩数据。

    要想对正在被写入一个输出流的数据进行压缩,我们可以使用createOutputStream(OutputStreamout)方法创建一个CompressionOutputStream,将其以压缩格式写入底层的流

    相反,要想对从输入流读取而来的数据进行解压缩,则调用createInputStream(InputStreamin)函数,从而获得一个CompressionInputStream,从而从底层的流读取未压缩的数据。

    压缩数据据案例

    测试一下如下压缩方式:
    DEFLATE org.apache.hadoop.io.compress.DefaultCodec
    gzip org.apache.hadoop.io.compress.GzipCodec
    bzip2 org.apache.hadoop.io.compress.BZip2Codec
    package com.atguigu.mapreduce.compress;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.CompressionCodecFactory;
    import org.apache.hadoop.io.compress.CompressionInputStream;
    import org.apache.hadoop.io.compress.CompressionOutputStream;

    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    import java.io.IOException;

    public class TestCompress {
       public static void main(String[] args) throws IOException {
           compress("D:\input\inputcompression\JaneEyre.txt"
    ,"org.apache.hadoop.io.compress.BZip2Codec");
           //decompress("D:\input\inputcompression\JaneEyre.txt.bz2");
      }
       //压缩
       private static void compress(String filename, String method) throws IOException {
           //1 获取输入流
           FileInputStream fis = new FileInputStream(new File(filename));

           //2 获取输出流
           //获取压缩编解码器codec
           CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
           CompressionCodec codec = factory.getCodecByName(method);

           //获取普通输出流,文件后面需要加上压缩后缀
           FileOutputStream fos = new FileOutputStream(new File(filename + codec.getDefaultExtension()));
           //获取压缩输出流,用压缩解码器对fos进行压缩
           CompressionOutputStream cos = codec.createOutputStream(fos);

           //3 流的对拷
           IOUtils.copyBytes(fis,cos,new Configuration());

           //4 关闭资源
           IOUtils.closeStream(cos);
           IOUtils.closeStream(fos);
           IOUtils.closeStream(fis);
      }
    //解压缩
       private static void decompress(String filename) throws IOException {
           //0 校验是否能解压缩
           CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
           CompressionCodec codec = factory.getCodec(new Path(filename));
           if (codec == null) {
               System.out.println("cannot find codec for file " + filename);
               return;
          }
           //1 获取输入流
           FileInputStream fis = new FileInputStream(new File(filename));
           CompressionInputStream cis = codec.createInputStream(fis);

           //2 获取输出流
           FileOutputStream fos = new FileOutputStream(new File(filename + ".decodec"));

           //3 流的对拷
           IOUtils.copyBytes(cis,fos,new Configuration());

           //4 关闭资源
           IOUtils.closeStream(fos);
           IOUtils.closeStream(cis);
           IOUtils.closeStream(fis);
      }
    }

    Map输出端采用压缩

    即使MR任务的输出输出文件都是未压缩格式的文件,也可以在Map和Reduce传输阶段使用压缩,因为它要写在磁盘并通过网络传输,用压缩可以提高很多性能而这些工作只要设置两个属性即可

    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.BZip2Codec;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.GzipCodec;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    public class WordCountDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

    Configuration conf = new Configuration();

    // 开启map端输出压缩
    conf.setBoolean("mapreduce.map.output.compress", true);
    // 设置map端输出压缩方式
    conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class,CompressionCodec.class);

    Job job = Job.getInstance(conf);

    job.setJarByClass(WordCountDriver.class);

    job.setMapperClass(WordCountMapper.class);
    job.setReducerClass(WordCountReducer.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    boolean result = job.waitForCompletion(true);

    System.exit(result ? 0 : 1);
    }
    }

    Reduce输出端采用压缩

    Reduce输出端需要设置驱动,两行代码搞定


    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.BZip2Codec;
    import org.apache.hadoop.io.compress.DefaultCodec;
    import org.apache.hadoop.io.compress.GzipCodec;
    import org.apache.hadoop.io.compress.Lz4Codec;
    import org.apache.hadoop.io.compress.SnappyCodec;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    public class WordCountDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

    Configuration conf = new Configuration();

    Job job = Job.getInstance(conf);

    job.setJarByClass(WordCountDriver.class);

    job.setMapperClass(WordCountMapper.class);
    job.setReducerClass(WordCountReducer.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    // 设置reduce端输出压缩开启
    FileOutputFormat.setCompressOutput(job, true);
    // 设置压缩的方式
       FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
    //   FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
    //   FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
       
    boolean result = job.waitForCompletion(true);

    System.exit(result?0:1);
    }
    }

     

  • 相关阅读:
    为什么单片机程序中会有延时程序加入
    20145238 《信息安全系统设计基础》课程总结
    20145238-荆玉茗 《信息安全系统设计》第14周学习总结
    20145238-荆玉茗 《信息安全系统设计基础》第13周学习总结
    20145238-荆玉茗 《信息安全系统设计基础》第十二周学习总结
    补:第五周实验楼实践
    20145224&20145238《信息安全系统设计基础》实验三
    20145238 《信息安全系统设计基础》第十一周学习总结
    20145238《信息安全系统设计基础》第十周学习总结
    20145224&20145238《信息安全系统设计基础》实验五
  • 原文地址:https://www.cnblogs.com/traveller-hzq/p/14038207.html
Copyright © 2011-2022 走看看