zoukankan      html  css  js  c++  java
  • MR案例:输出/输入SequenceFile

    SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。在SequenceFile文件中,每一个key-value对被看做是一条记录(Record),基于Record的压缩策略,SequenceFile文件支持三种压缩类型:

    NONE: 对records不进行压缩; (组合1)

    RECORD: 仅压缩每一个record中的value值(不包括key); (组合2)

    BLOCK: 将一个block中的所有records(包括key)压缩在一起;(组合3)

    package test0820;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.SequenceFile.CompressionType;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.VLongWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
    
    public class Test0829 {
    
        public static void main(String[] args) throws Exception {        
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            job.setJarByClass(Test0829.class);
    
            job.setMapperClass(WCMapper.class);
            job.setReducerClass(WCReducer.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(VLongWritable.class);        
    
            // 设置输出类
            job.setOutputFormatClass(SequenceFileOutputFormat.class);
    
            /**
             * 设置sequecnfile的格式,对于sequencefile的输出格式,有多种组合方式,
             * 从下面的模式中选择一种,并将其余的注释掉
             */
    
    // 组合方式1:不压缩模式 SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.NONE); //组合方式2:record压缩模式,并指定采用的压缩方式 :默认、gzip压缩等 // SequenceFileOutputFormat.setOutputCompressionType(job, // CompressionType.RECORD); // SequenceFileOutputFormat.setOutputCompressorClass(job, // DefaultCodec.class); //组合方式3:block压缩模式,并指定采用的压缩方式 :默认、gzip压缩等 // SequenceFileOutputFormat.setOutputCompressionType(job, // CompressionType.BLOCK); // SequenceFileOutputFormat.setOutputCompressorClass(job, // DefaultCodec.class); FileInputFormat.addInputPaths(job, args[0]); SequenceFileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } //map public static class WCMapper extends Mapper<LongWritable, Text, Text, VLongWritable> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split(":",2); if(split.length!=1){ String[] splited = split[1].split(","); for(String s : splited){ context.write(new Text(s), new VLongWritable(1L)); } } } } //reduce public static class WCReducer extends Reducer<Text, VLongWritable, Text, VLongWritable>{ @Override protected void reduce(Text key, Iterable<VLongWritable> v2s, Context context) throws IOException, InterruptedException { long sum=0; for(VLongWritable vl : v2s){ sum += vl.get(); } context.write(key, new VLongWritable(sum)); } } }

    MR输入SequenceFile

    当输入文件格式是SequenceFile的时候,要使用SequenceFileInputformat类。由于SequenceFile都是以key和value的二进制形式存放的(注意hadoop类型的二进制的解释方式和原始二进制不一样,会多一些维护信息),所以在读取SequenceFile文件时必须预先知道key和value对应的hadoop类型

    对于上面代码产生的SequenceFile结果文件,以SequenceFileInputformat类进行读取。其中key为Text类型,value为VLongWritable类型。

    package test0820;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.VLongWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    public class SFInput02 {
        public static void main(String[] args) throws Exception {
            Job job = Job.getInstance(new Configuration());
            job.setJarByClass(SFinput.class);
    
            job.setMapperClass(SFMapper.class);
            job.setReducerClass(SFReducer.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(VLongWritable.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(VLongWritable.class);
    
            job.setInputFormatClass(SequenceFileInputFormat.class);
    
            SequenceFileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            job.waitForCompletion(true);
        }
        public static class SFMapper extends Mapper<Text, VLongWritable,Text, VLongWritable> {
            public void map(Text key, VLongWritable value, Context context)
                    throws IOException, InterruptedException {
                context.write(key, value);
            }
    
        }
        //reduce
        public static class SFReducer extends Reducer<Text, VLongWritable,Text, VLongWritable>{
            @Override
            protected void reduce(Text key, Iterable<VLongWritable> v2s,Context context)
                    throws IOException, InterruptedException {
                for(VLongWritable vl : v2s){
                    context.write(key, vl);
                }
            }
        }
    }

    如若不清楚SequenceFile文件中key和value的类型,可以使用SequenceFileAsTextInputFormat。它将SequenceFile的key和value都转化成Text对象传入map中。  

    package test0820;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsTextInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    public class SFinput {
        public static void main(String[] args) throws Exception {
            Job job = Job.getInstance(new Configuration());
            job.setJarByClass(SFinput.class);
    
            job.setMapperClass(SFMapper.class);
            job.setReducerClass(SFReducer.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            job.setInputFormatClass(SequenceFileAsTextInputFormat.class);
    
            SequenceFileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            job.waitForCompletion(true);
        }
        public static class SFMapper extends Mapper<Text, Text,Text, Text> {
            public void map(Text key, Text value, Context context)
                    throws IOException, InterruptedException {
                context.write(key, value);
            }
    
        }
        //reduce
        public static class SFReducer extends Reducer<Text, Text,Text,Text>{
            @Override
            protected void reduce(Text key, Iterable<Text> v2s,Context context)
                    throws IOException, InterruptedException {
                for(Text text : v2s){
                    context.write(key, text);
                }
            }
        }
    }

    最后还有一种sequencefileAsBinaryInputFormat 类,它将SequenceFile中的key和value都以原始二进制的形式封装在byteswritable对象中传给map,如何对二进制数据进行解释是map函数编写者的工作。

  • 相关阅读:
    POJ 3616 Milking Time(简单区间DP)
    AizuOJ ALDS1_7_A Rooted Trees(有根树的表达)
    jQuery中 attr() 和 prop() 的区别
    前后端交互模式
    快速排序
    冒泡排序实现
    Vue 组件间进行通信
    JavaScript 数组常用方法
    如何将内网映射到公网?
    javax.mail.AuthenticationFailedException: 535 Login Fail. Please enter your authorization code to login. More information in
  • 原文地址:https://www.cnblogs.com/skyl/p/4769542.html
Copyright © 2011-2022 走看看