zoukankan      html  css  js  c++  java
  • MapReduce的SequenceFileOutputFormat使用

    package com.mengyao.hadoop.mapreduce;
    
    import java.io.IOException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.SequenceFile.CompressionType;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    /**
     * 将普通文本文件并生成SequenceFile格式的文件
     * @author mengyao
     *
     */
    public class SequenceFileOutputFormatApp extends Configured implements Tool {
        
        private static final String APP_NAME = SequenceFileOutputFormatApp.class.getSimpleName();
        
        static class SequenceFileOutputFormatAppMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
            
            private LongWritable outputKey;
            private Text outputValue;
            
            @Override
            protected void setup(
                    Mapper<LongWritable, Text, LongWritable, Text>.Context context)
                    throws IOException, InterruptedException {
                this.outputKey = new LongWritable();
                this.outputValue = new Text();
            }
            
            @Override
            protected void map(LongWritable key, Text value,
                    Mapper<LongWritable, Text, LongWritable, Text>.Context context)
                    throws IOException, InterruptedException {
                this.outputKey.set(key.get());
                this.outputValue.set(value.toString());
                context.write(outputKey, outputValue);
            }
        }
    
        @Override
        public int run(String[] args) throws Exception {
            Configuration conf = getConf();
            Job job = Job.getInstance(conf, APP_NAME);
            job.setJarByClass(SequenceFileOutputFormatApp.class);
            
            job.setInputFormatClass(TextInputFormat.class);
            FileInputFormat.addInputPaths(job, args[0]);
            job.setMapperClass(SequenceFileOutputFormatAppMapper.class);
    
            job.setNumReduceTasks(0);
            
            job.setOutputKeyClass(LongWritable.class);
            job.setOutputValueClass(Text.class);
            
            job.setOutputFormatClass(SequenceFileOutputFormat.class);
            
            /**
             * 关于输出SequenceFile文件压缩有三种,如下
             *         1、不压缩
             *            SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.NONE);
             *        2、单条记录级压缩(基于每条记录,只压缩值,不压缩键。),与不压缩大小基本相同。
             *            SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.RECORD);
             *            SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
             *        3、块级压缩(基于一次性压缩多条记录,效率较高)
             *            SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
             *            SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
             */
            SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.NONE);
            
            SequenceFileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            return job.waitForCompletion(true)?0:1;
        }
        
        public static int createJob(String[] args) {
            Configuration conf = new Configuration();
            conf.set("dfs.datanode.socket.write.timeout", "7200000");
            conf.set("mapreduce.input.fileinputformat.split.minsize", "268435456");
            conf.set("mapreduce.input.fileinputformat.split.maxsize", "536870912");
            int status = 0;
            
            try {
                status = ToolRunner.run(conf, new SequenceFileOutputFormatApp(), args);
            } catch (Exception e) {
                e.printStackTrace();
            }
            
            return status;
        }
        
        public static void main(String[] args) throws Exception {
            args = new String[]{
                    "/mapreduces/bookOutline.txt", 
                    "/mapreduces/"+APP_NAME+"/"+new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())};
            if (args.length != 2) {
                System.out.println("Usage:");
            } else {
                int status = createJob(args);
                System.exit(status);
            }
            
        }
    }
  • 相关阅读:
    WCF系列(一)BasicHttpBinding 和 WsHttpBinding 的不同点
    SQL Server中的事务与锁
    领域驱动设计之领域模型
    http请求到响应经历的阶段
    vs调试
    c# 基本值类型及其默认值
    ASP.NET中JSONP的两种实现以及其他跨域解决方案的简单实现
    通俗易懂,什么是.NET?什么是.NET Framework?什么是.NET Core?
    最完整的数据倾斜解决方案(spark)
    Spark性能调优
  • 原文地址:https://www.cnblogs.com/mengyao/p/4865598.html
Copyright © 2011-2022 走看看