zoukankan      html  css  js  c++  java
  • hadoop 使用map将SequenFile里的小文件解压出来

    上例中将HDFS里小文件通过mapper压缩到一个文件中,本例将这些小文件解压出来。

    mapreduce可以按SequenceFile的key进行分片。

    1、mapper

    public class MultiOutputMapper extends Mapper<Text,BytesWritable,NullWritable,Text> {
        private MultipleOutputs<NullWritable,Text> multipleOutputs;
        private long splitLength;
    
        /**
         * Called once at the beginning of the task.
         *
         * @param context
         */
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
            InputSplit split = context.getInputSplit();
            splitLength = split.getLength();
        }
    
        /**
         * Called once for each key/value pair in the input split. Most applications
         * should override this, but the default is the identity function.
         *
         * @param key
         * @param value
         * @param context
         */
        @Override
        protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {
            System.out.printf("split length:%s,value length:%s,bytes length:%s",splitLength,value.getLength(),value.getBytes().length);
            int length = value.getLength();
            byte[] bytes = new byte[length];
            System.arraycopy(value.getBytes(),0,bytes,0,length);
    
            Text contents = new Text(bytes);
            //根据SequenceFile里key的原路径路径生成文件
    //        multipleOutputs.write(NullWritable.get(),new Text(bytes),key.toString());
    
            //在output里输出文件
            Path path = new Path(key.toString());
            String outputFileName = String.format("%s/%s","2019",path.getName());
            multipleOutputs.write(NullWritable.get(),new Text(bytes),outputFileName);
    //
    //        multipleOutputs.write(NullWritable.get(),new Text(value.getBytes()),key.toString());//这句是错的。
    //        通过测试,对于SequenceFile,是按key进入分片,value的length是实际长度,value.getbytes的长度是value的buff长度,两个不一定相等
    //        split length:88505,value length:4364,bytes length:6546
        }
    
        /**
         * Called once at the end of the task.
         *
         * @param context
         */
        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            multipleOutputs.close();
            super.cleanup(context);
        }
    }

    2、job

    public class SequenceFileToSmallFileConverter {
    
        public static void main(String[] args) throws Exception{
    
            long startTime = System.currentTimeMillis();
    
            Configuration conf = new Configuration();
    
    
            Path outPath = new Path(args[1]);
            FileSystem fileSystem = outPath.getFileSystem(conf);
            //删除输出路径
            if(fileSystem.exists(outPath))
            {
                fileSystem.delete(outPath,true);
            }
    
            Job job = Job.getInstance(conf,"SequenceFileToSmallFileConverter");
            job.setJarByClass(SequenceFileToSmallFileConverter.class);
    
            job.setMapperClass(MultiOutputMapper.class);
            job.setNumReduceTasks(0);
    
            job.setInputFormatClass(SequenceFileInputFormat.class);
            //TextOutputFormat会在每行文本后面加入换行符号,如果是这个文本作为一个整体来处理,最后就会比预期多一个换行符号
    //        job.setOutputFormatClass(TextOutputFormat.class);
    
            //WholeTextOutputFormat与TextOutputFormat的区别就是没有在每行写入换行符
            job.setOutputFormatClass(WholeTextOutputFormat.class);
    
            job.setOutputKeyClass(NullWritable.class);
            job.setOutputValueClass(BytesWritable.class);
    
    
    
            FileInputFormat.addInputPath(job,new Path(args[0]));
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
    
    
            int exitCode = job.waitForCompletion(true) ? 0:1;
    
            long endTime = System.currentTimeMillis();
            long timeSpan = endTime - startTime;
            System.out.println("运行耗时:"+timeSpan+"毫秒。");
    
            System.exit(exitCode);
        }
    }
    public class WholeTextOutputFormat<K, V> extends FileOutputFormat<K, V> {
        public static String SEPARATOR = "mapreduce.output.textoutputformat.separator";
        /**
         * @deprecated Use {@link #SEPARATOR}
         */
        @Deprecated
        public static String SEPERATOR = SEPARATOR;
        protected static class LineRecordWriter<K, V>
                extends RecordWriter<K, V> {
            private static final byte[] NEWLINE =
                    "
    ".getBytes(StandardCharsets.UTF_8);
    
            protected DataOutputStream out;
            private final byte[] keyValueSeparator;
    
            public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
                this.out = out;
                this.keyValueSeparator =
                        keyValueSeparator.getBytes(StandardCharsets.UTF_8);
            }
    
            public LineRecordWriter(DataOutputStream out) {
                this(out, "	");
            }
    
            /**
             * Write the object to the byte stream, handling Text as a special
             * case.
             * @param o the object to print
             * @throws IOException if the write throws, we pass it on
             */
            private void writeObject(Object o) throws IOException {
                if (o instanceof Text) {
                    Text to = (Text) o;
                    out.write(to.getBytes(), 0, to.getLength());
                } else {
                    out.write(o.toString().getBytes(StandardCharsets.UTF_8));
                }
            }
    
            public synchronized void write(K key, V value)
                    throws IOException {
    
                boolean nullKey = key == null || key instanceof NullWritable;
                boolean nullValue = value == null || value instanceof NullWritable;
                if (nullKey && nullValue) {
                    return;
                }
                if (!nullKey) {
                    writeObject(key);
                }
                if (!(nullKey || nullValue)) {
                    out.write(keyValueSeparator);
                }
                if (!nullValue) {
                    writeObject(value);
                }
    //            out.write(NEWLINE);将文本当做整体,各key之间不主动加入换行符号
            }
    
            public synchronized
            void close(TaskAttemptContext context) throws IOException {
                out.close();
            }
        }
    
        public RecordWriter<K, V>
        getRecordWriter(TaskAttemptContext job
        ) throws IOException, InterruptedException {
            Configuration conf = job.getConfiguration();
            boolean isCompressed = getCompressOutput(job);
            String keyValueSeparator= conf.get(SEPARATOR, "	");
            CompressionCodec codec = null;
            String extension = "";
            if (isCompressed) {
                Class<? extends CompressionCodec> codecClass =
                        getOutputCompressorClass(job, GzipCodec.class);
                codec = ReflectionUtils.newInstance(codecClass, conf);
                extension = codec.getDefaultExtension();
            }
            Path file = getDefaultWorkFile(job, extension);
            FileSystem fs = file.getFileSystem(conf);
            FSDataOutputStream fileOut = fs.create(file, false);
            if (isCompressed) {
                return new WholeTextOutputFormat.LineRecordWriter<>(
                        new DataOutputStream(codec.createOutputStream(fileOut)),
                        keyValueSeparator);
            } else {
                return new WholeTextOutputFormat.LineRecordWriter<>(fileOut, keyValueSeparator);
            }
        }
    }

    3、验证,压入SequenceFile和解压后的文件完全相同。

    [hadoop@bigdata-senior01 ~]$ hadoop fs -checksum /demo/1.txt-m-00000 /demo3/1.txt
    /demo/1.txt-m-00000    MD5-of-0MD5-of-512CRC32C    0000020000000000000000007b6bd9c9f517a6ea12ede79fd43700ca
    /demo3/1.txt    MD5-of-0MD5-of-512CRC32C    0000020000000000000000007b6bd9c9f517a6ea12ede79fd43700ca
  • 相关阅读:
    Windows Server 2008 R2 Enterprise 安装.NET Framework 4.0
    layer弹层content写错导致div复制了一次,导致id失效 $().val() 获取不到dispaly:none div里表单的值
    IIS 注册.NET Framework 4.0 命令
    记一次神秘又刺激的装机
    HTTP Error 503. The service is unavailable.
    找到多个与名为“Home”的控制器匹配的类型。
    Discuz论坛广告横幅大图在百度app内无法显示,百度app默认开启了广告屏蔽
    解决Antimalware Service Executable CPU,内存占用高的问题
    Discuz 部署,500 – 内部服务器错误。 您查找的资源存在问题,因而无法显示。
    IIS部署网站只有首页能访问,其他链接失效/运行.net+Access网站-可能原因:IIS未启用32位应用程序模式
  • 原文地址:https://www.cnblogs.com/asker009/p/10401715.html
Copyright © 2011-2022 走看看