上例中将HDFS里小文件通过mapper压缩到一个文件中,本例将这些小文件解压出来。
mapreduce可以按SequenceFile的key进行分片。
1、mapper
public class MultiOutputMapper extends Mapper<Text,BytesWritable,NullWritable,Text> { private MultipleOutputs<NullWritable,Text> multipleOutputs; private long splitLength; /** * Called once at the beginning of the task. * * @param context */ @Override protected void setup(Context context) throws IOException, InterruptedException { multipleOutputs = new MultipleOutputs<NullWritable, Text>(context); InputSplit split = context.getInputSplit(); splitLength = split.getLength(); } /** * Called once for each key/value pair in the input split. Most applications * should override this, but the default is the identity function. * * @param key * @param value * @param context */ @Override protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException { System.out.printf("split length:%s,value length:%s,bytes length:%s",splitLength,value.getLength(),value.getBytes().length); int length = value.getLength(); byte[] bytes = new byte[length]; System.arraycopy(value.getBytes(),0,bytes,0,length); Text contents = new Text(bytes); //根据SequenceFile里key的原路径路径生成文件 // multipleOutputs.write(NullWritable.get(),new Text(bytes),key.toString()); //在output里输出文件 Path path = new Path(key.toString()); String outputFileName = String.format("%s/%s","2019",path.getName()); multipleOutputs.write(NullWritable.get(),new Text(bytes),outputFileName); // // multipleOutputs.write(NullWritable.get(),new Text(value.getBytes()),key.toString());//这句是错的。 // 通过测试,对于SequenceFile,是按key进入分片,value的length是实际长度,value.getbytes的长度是value的buff长度,两个不一定相等 // split length:88505,value length:4364,bytes length:6546 } /** * Called once at the end of the task. * * @param context */ @Override protected void cleanup(Context context) throws IOException, InterruptedException { multipleOutputs.close(); super.cleanup(context); } }
2、job
public class SequenceFileToSmallFileConverter { public static void main(String[] args) throws Exception{ long startTime = System.currentTimeMillis(); Configuration conf = new Configuration(); Path outPath = new Path(args[1]); FileSystem fileSystem = outPath.getFileSystem(conf); //删除输出路径 if(fileSystem.exists(outPath)) { fileSystem.delete(outPath,true); } Job job = Job.getInstance(conf,"SequenceFileToSmallFileConverter"); job.setJarByClass(SequenceFileToSmallFileConverter.class); job.setMapperClass(MultiOutputMapper.class); job.setNumReduceTasks(0); job.setInputFormatClass(SequenceFileInputFormat.class); //TextOutputFormat会在每行文本后面加入换行符号,如果是这个文本作为一个整体来处理,最后就会比预期多一个换行符号 // job.setOutputFormatClass(TextOutputFormat.class); //WholeTextOutputFormat与TextOutputFormat的区别就是没有在每行写入换行符 job.setOutputFormatClass(WholeTextOutputFormat.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(BytesWritable.class); FileInputFormat.addInputPath(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); int exitCode = job.waitForCompletion(true) ? 0:1; long endTime = System.currentTimeMillis(); long timeSpan = endTime - startTime; System.out.println("运行耗时:"+timeSpan+"毫秒。"); System.exit(exitCode); } }
public class WholeTextOutputFormat<K, V> extends FileOutputFormat<K, V> { public static String SEPARATOR = "mapreduce.output.textoutputformat.separator"; /** * @deprecated Use {@link #SEPARATOR} */ @Deprecated public static String SEPERATOR = SEPARATOR; protected static class LineRecordWriter<K, V> extends RecordWriter<K, V> { private static final byte[] NEWLINE = " ".getBytes(StandardCharsets.UTF_8); protected DataOutputStream out; private final byte[] keyValueSeparator; public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; this.keyValueSeparator = keyValueSeparator.getBytes(StandardCharsets.UTF_8); } public LineRecordWriter(DataOutputStream out) { this(out, " "); } /** * Write the object to the byte stream, handling Text as a special * case. * @param o the object to print * @throws IOException if the write throws, we pass it on */ private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o; out.write(to.getBytes(), 0, to.getLength()); } else { out.write(o.toString().getBytes(StandardCharsets.UTF_8)); } } public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (nullKey && nullValue) { return; } if (!nullKey) { writeObject(key); } if (!(nullKey || nullValue)) { out.write(keyValueSeparator); } if (!nullValue) { writeObject(value); } // out.write(NEWLINE);将文本当做整体,各key之间不主动加入换行符号 } public synchronized void close(TaskAttemptContext context) throws IOException { out.close(); } } public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job ) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); String keyValueSeparator= conf.get(SEPARATOR, " "); CompressionCodec codec = null; String extension = ""; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); codec = ReflectionUtils.newInstance(codecClass, conf); extension = codec.getDefaultExtension(); } Path file = getDefaultWorkFile(job, extension); FileSystem fs = file.getFileSystem(conf); FSDataOutputStream fileOut = fs.create(file, false); if (isCompressed) { return new WholeTextOutputFormat.LineRecordWriter<>( new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator); } else { return new WholeTextOutputFormat.LineRecordWriter<>(fileOut, keyValueSeparator); } } }
3、验证,压入SequenceFile和解压后的文件完全相同。
[hadoop@bigdata-senior01 ~]$ hadoop fs -checksum /demo/1.txt-m-00000 /demo3/1.txt /demo/1.txt-m-00000 MD5-of-0MD5-of-512CRC32C 0000020000000000000000007b6bd9c9f517a6ea12ede79fd43700ca /demo3/1.txt MD5-of-0MD5-of-512CRC32C 0000020000000000000000007b6bd9c9f517a6ea12ede79fd43700ca