zoukankan      html  css  js  c++  java
  • RcFile存储和读取操作

    工作中用到了RcFile来存储和读取RcFile格式的文件,记录下。 
    RcFile是FaceBook开发的一个集行存储和列存储的优点于一身,压缩比更高,读取列更快,它在MapReduce环境中大规模数据处理中扮演着重要的角色。 
    读取操作:

    job信息:  
            Job job = new Job();  
                job.setJarByClass(类.class);  
            //设定输入文件为RcFile格式  
                job.setInputFormatClass(RCFileInputFormat.class);    
            //普通输出  
                job.setOutputFormatClass(TextOutputFormat.class);  
            //设置输入路径  
                RCFileInputFormat.addInputPath(job, new Path(srcpath));  
                //MultipleInputs.addInputPath(job, new Path(srcpath), RCFileInputFormat.class);  
            // 输出  
                TextOutputFormat.setOutputPath(job, new Path(respath));  
                // 输出key格式  
                job.setOutputKeyClass(Text.class);    
            //输出value格式  
                job.setOutputValueClass(NullWritable.class);    
            //设置mapper类  
                job.setMapperClass(ReadTestMapper.class);  
            //这里没设置reduce,reduce的操作就是读Text类型文件,因为mapper已经给转换了。  
                  
                code = (job.waitForCompletion(true)) ? 0 : 1;  
      
      
    // mapper 类  
      
    pulic class ReadTestMapper extends Mapper<LongWritable, BytesRefArrayWritable, Text, NullWritable> {  
              
            @Override  
            protected void map(LongWritable key, BytesRefArrayWritable value, Context context) throws IOException, InterruptedException {  
                // TODO Auto-generated method stub  
                Text txt = new Text();   
            //因为RcFile行存储和列存储,所以每次进来的一行数据,Value是个列簇,遍历,输出。  
                StringBuffer sb = new StringBuffer();  
                for (int i = 0; i < value.size(); i++) {  
                    BytesRefWritable v = value.get(i);  
                    txt.set(v.getData(), v.getStart(), v.getLength());  
                    if(i==value.size()-1){  
                        sb.append(txt.toString());  
                    }else{  
                        sb.append(txt.toString()+"	");  
                    }  
                }  
                context.write(new Text(sb.toString()),NullWritable.get());  
                }  
            }  
    

      输出压缩为RcFile格式: 

    job信息:  
                Job job = new Job();  
                Configuration conf = job.getConfiguration();  
            //设置每行的列簇数  
                RCFileOutputFormat.setColumnNumber(conf, 4);  
                job.setJarByClass(类.class);  
      
                FileInputFormat.setInputPaths(job, new Path(srcpath));  
                RCFileOutputFormat.setOutputPath(job, new Path(respath));  
      
                job.setInputFormatClass(TextInputFormat.class);  
                job.setOutputFormatClass(RCFileOutputFormat.class);  
      
                job.setMapOutputKeyClass(LongWritable.class);  
                job.setMapOutputValueClass(BytesRefArrayWritable.class);  
      
                job.setMapperClass(OutPutTestMapper.class);  
      
                conf.set("date", line.getOptionValue(DATE));  
            //设置压缩参数  
                conf.setBoolean("mapred.output.compress", true);  
                conf.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");  
      
                code = (job.waitForCompletion(true)) ? 0 : 1;  
      
    mapper类:  
        public class OutPutTestMapper extends Mapper<LongWritable, Text, LongWritable, BytesRefArrayWritable> {  
            @Override  
            public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  
                String line = value.toString();  
                String day = context.getConfiguration().get("date");  
                if (!line.equals("")) {  
                    String[] lines = line.split(" ", -1);  
                    if (lines.length > 3) {  
                        String time_temp = lines[1];  
                        String times = timeStampDate(time_temp);  
                        String d = times.substring(0, 10);  
                        if (day.equals(d)) {  
                            byte[][] record = {lines[0].getBytes("UTF-8"), lines[1].getBytes("UTF-8"),lines[2].getBytes("UTF-8"), lines[3].getBytes("UTF-8")};  
      
                            BytesRefArrayWritable bytes = new BytesRefArrayWritable(record.length);  
      
                            for (int i = 0; i < record.length; i++) {  
                                BytesRefWritable cu = new BytesRefWritable(record[i], 0, record[i].length);  
                                bytes.set(i, cu);  
                            }  
                            context.write(key, bytes);  
                        }  
                    }  
                }  
            }  
    

      java代码实现,其实有种更简单的方法,hive已经支持rcfile压缩格式的文件,导入hive中,通过sql进行导入导出就可以达到解压、压缩的目的.

    工作方向: 大数据、数据仓库、 hadoop、hive、Hbase、 python、ad-hoc、scala、数据工具研发
    邮 箱    :zhangkai081@gmail.com
  • 相关阅读:
    680C. Bear and Prime 100 数学
    Codeforces 681C. Heap Operations 优先队列
    Codeforces C. NP-Hard Problem 搜索
    Codeforces 689C. Mike and Chocolate Thieves 二分
    Codeforces 689B. Mike and Shortcuts SPFA/搜索
    Codeforces Round #223 (Div. 2) E. Sereja and Brackets 线段树区间合并
    Educational Codeforces Round 1 E. Chocolate Bar dp
    Testing Round #12 A,B,C 讨论,贪心,树状数组优化dp
    Educational Codeforces Round 2 E. Lomsat gelral 启发式合并map
    hdu 3706 Second My Problem First 单调队列
  • 原文地址:https://www.cnblogs.com/smallbaby/p/3428159.html
Copyright © 2011-2022 走看看