zoukankan      html  css  js  c++  java
  • MR案例:MR和Hive中使用Lzo压缩

    在MapReduce中使用lzo压缩

     1).首先将数据文件在本地使用lzop命令压缩。具体配置过详见配置hadoop集群的lzo压缩

    //压缩lzop,解压缩lzop -d
    [root@ncst word]# lzop words.txt 
    [root@ncst word]# ls
    words.txt  words.txt.lzo

     2).将lzo文件上传到hdfs

    [root@ncst word]# hadoop fs -put words.txt.lzo /test/in/words/
    [root@ncst word]# hadoop fs -ls /test/in/words
    Found 1 items
    -rw-r--r--   1 root supergroup        115 2015-08-28 21:13 /test/in/words/words.txt.lzo

     3).给Lzo文件建立索引Index(两种方式)

    //单机版
    [root@ncst word]# hadoop jar
    > /usr/local/hadoop/hadoop-2.2.0/share/hadoop/common/lib/hadoop-lzo-0.4.20-SNAPSHOT.jar > com.hadoop.compression.lzo.LzoIndexer > /test/in/words
    //集群版本
    [root@ncst word]# hadoop jar 
    > /usr/local/hadoop/hadoop-2.2.0/share/hadoop/common/lib/hadoop-lzo-0.4.20-SNAPSHOT.jar 
    > com.hadoop.compression.lzo.DistributedLzoIndexer 
    > /test/in/words
    //索引文件以.index结尾
    [root@ncst word]# hadoop fs -ls /test/in/words
    Found 2 items
    -rw-r--r--   1 root supergroup        115 2015-08-28 21:13 /test/in/words/words.txt.lzo
    -rw-r--r--   1 root supergroup          8 2015-08-28 21:28 /test/in/words/words.txt.lzo.index

     4).编写MapReduce程序(需要添加的额外包hadoop-lzo-0.4.13.jar)

    package test0820;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.VLongWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    import com.hadoop.compression.lzo.LzopCodec;
    import com.hadoop.mapreduce.LzoTextInputFormat;
    
    public class WordCount0826 {
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
    
    //GenericOptionsParser类的getRemainingArgs()方法作用是从命令行获取配置信息
    String[] otherArgs
    = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = Job.getInstance(conf); job.setJarByClass(WordCount0826.class); job.setMapperClass(IIMapper.class); job.setReducerClass(IIReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(VLongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(VLongWritable.class);

    //配置输入类型为Lzo job.setInputFormatClass(LzoTextInputFormat.class);
    //配置reduce结果压缩以及压缩格式 FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
    FileInputFormat.addInputPath(job,
    new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true)? 0:1); } //map public static class IIMapper extends Mapper<LongWritable, Text, Text, VLongWritable>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] splited = value.toString().split(" "); for(String word : splited){ context.write(new Text(word),new VLongWritable(1L)); } } } //reduce public static class IIReducer extends Reducer<Text, VLongWritable, Text, VLongWritable>{ @Override protected void reduce(Text key, Iterable<VLongWritable> v2s, Context context) throws IOException, InterruptedException { long sum=0; for(VLongWritable vl : v2s){ sum += vl.get(); } context.write(key, new VLongWritable(sum)); } } }

     5).运行hadoop jar

    [root@ncst test]# hadoop jar myjar.jar /test/in/words/ /test/out/0828/02

      如若未在程序中配置输入和输出都为Lzo格式,可以在命令行通过 -D 开头的参数进行配置

    hadoop jar myjar.jar 
     -D mapred.reduce.tasks=2 
     -D mapreduce.job.inputformat.class=com.hadoop.mapreduce.LzoTextInputFormat 
     -D mapred.output.compress=true 
     -D mapred.output.compression.codec=com.hadoop.compression.lzo.LzopCodec 
     /test/in/words /test/out/0828/001

     6).查看结果文件

    [root@ncst test]# hadoop fs -ls /test/out/0828/001
    Found 3 items
    -rw-r--r--   1 root supergroup          0 2015-08-28 21:35 /test/out/0828/001/_SUCCESS
    -rw-r--r--   1 root supergroup         61 2015-08-28 21:35 /test/out/0828/001/part-r-00000.lzo
    -rw-r--r--   1 root supergroup         87 2015-08-28 21:35 /test/out/0828/001/part-r-00001.lzo

     7).查看结果Lzo文件的内容(两种方式)

    //以fs -get方式下载下来,再解压
    [root@ncst test]# hadoop fs -get /test/out/0828/001 ~/out
    [root@ncst test]# cd ~/out
    
    //lzop -d *.lzo 是解压lzo文件的命令
    [root@ncst out]# lzop
    -d part-* [root@ncst out]# ls part-r-00000 part-r-00000.lzo part-r-00001 part-r-00001.lzo _SUCCESS
    //利用fs -text 结合Linux的重定向 > 命令
    [root@ncst test]# hadoop fs -text /test/out/0828/001/part-* > out.txt
    
    //查看out.txt结果
    [root@ncst test]# more out.txt hello
    3 man 2 women 2 word 1 world 2

    总结:

    • lzo文件需要建立索引才能支持分块(split)。如果没有索引,lzo文件也是可以处理的,MapReduce会根据后缀名 “.lzo” 来对lzo文件解压,并且InputFormat也不需要特别指定,但是不支持分块,整个lzo文件只用一个map来处理。
    • 对于输入文件为添加了索引的Lzo压缩文件,如若不在代码中指定 job.setInputFormatClass(LzoTextInputFormat.class); 则Mapreduce程序将会把索引文件.index也当作是数据文件!LzoTextInputFormat类需要引入相应的(hadoop-lzo-0.4.13.jar)包,如果你是使用Maven管理依赖,则需要在pom.xml文件中添加以下属性。
    <dependency> 
        <groupId>com.hadoop.gplcompression</groupId> 
        <artifactId>hadoop-lzo</artifactId> 
        <version>0.4.19</version> 
    </dependency>

    在Streaming程序中使用lzo压缩

    把InputFormat设置为DeprecatedLzoTextInputFormat,还要设置参数 stream.map.input.ignoreKey=true。这样map的key值(key值是行在文件中的偏移量,value值是每行的文本)就不会传入reduce程序中,这个key值我们是不需要的。

    hadoop jar 
    /opt/mapr/hadoop/hadoop-0.20.2/contrib/streaming/hadoop-0.20.2-dev-streaming.jar 
    -file /home/pyshell/map.py 
    -file /home/pyshell/red.py  
    -mapper /home/pyshell/map.py 
    -reducer /home/pyshell/red.py 
    -input /aojianlog/20120304/gold/gold_38_3.csv.lzo 
    -output /aojianresult/gold38 
    -inputformat com.hadoop.mapred.DeprecatedLzoTextInputFormat    //没有此选项,map作业也不会分片
    -jobconf mapred.output.compress=true    //指定reduce输出压缩
    -jobconf mapred.output.compression.codec=com.hadoop.compression.lzo.LzopCodec    //没有此选项,reduce作业输出文件的格式为.lzo_deflate 

    在hive中使用lzo压缩

    hadoop集群启用了Lzo压缩,就需要在Hive建表的时候指定压缩时所使用的编解码器,否则Hive无法正确读取数据。

     1).Gzip和Bzip2由于是hadoop默认支持的,所以无需指定特殊的编解码器,只要指定Text类型即可。

    create table lzo( 
    id int, 
    name string) 
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '	' 
    STORED AS TEXTFILE;

     2).LZO是外挂的第三方库,所以要指定输入和输出的编解码器。

    create table lzo( 
    id int, 
    name string) 
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '	' 
    STORED AS INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'; 

     3).对于Hive表的数据文件,用lzop在本地压缩好了,直接上传到hdfs上就可以了。

    [root@ncst test]# lzop -v lzo 
    compressing lzo into lzo.lzo
    
    hive
    > load data local inpath '/root/test/lzo.lzo' > overwrite into table lzo;
    hive
    > select * from lzo; OK Tie Coat Hat Scarf Time taken: 0.218 seconds, Fetched: 4 row(s)

     4).对于已经存在的表修改为Lzo 

      alter table后对已经load进表中的数据,需要重新load和创建索引,要不还是不能分块。

    ALTER TABLE things
        SET FILEFORMAT
            INPUTFORMAT "com.hadoop.mapred.DeprecatedLzoTextInputFormat"
            OUTPUTFORMAT "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";

     5).在做数据清洗的时候,假如源日志是lzo压缩的,输出的时候也希望使用lzo压缩。则在数据清洗的脚本中对hadoop的jobconf做一个指定。这样就可以做到,输入是lzo,输出也可以lzo。或者输入是text,输出是lzo。

    -inputformat com.hadoop.mapred.DeprecatedLzoTextInputFormat 
    -jobconf mapreduce.output.compress=true 
    -jobconf mapreduce.output.compression.codec=com.hadoop.compression.lzo.LzopCodec
  • 相关阅读:
    python之Lambda
    oracle数据处理之expdb/impdb
    oracle之dblink
    oracle数据处理之sql*loader(二)
    exsi主机之间使用scp拷贝文件超时问题
    exsi从磁盘中加载虚拟机
    exsi的虚拟机加载U盘
    python 中的property
    hp服务器安装exsi5.5
    关于vsphere的 许可证配置问题
  • 原文地址:https://www.cnblogs.com/skyl/p/4766332.html
Copyright © 2011-2022 走看看