1、默认情况下生成的文件名是part-r-00000格式,想要自定义生成输出文件名可以使用org.apache.hadoop.mapreduce.lib.output.MultipleOutputs类用来写出
2、MultipleOutputs类需要在Reduce的setup()方法初始化,最好在cleanup()中关闭
3、这个时候还会生产成part-r-000000这种文件,发现是里面是空的,需要 LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
4、MultipleOutputs类的write()方法有几个重载的函数
write(KEYOUT key, VALUEOUT value, String baseOutputPath)
如果baseOutputPath带/,那么输出路径就是baseOutputPath + -r-00000,比如baseOutpuPath="/PUFA/" + key.toString(),输出文件路径就是/PUFA/0000000001-r-000000
不带/,输出路径就是FileOutputFormat.setOutputPath(job, outPutPath)下面,比如baseOutputPath=key.toString(),ouputPath=/trx,输出文件的路径就是/trx/0000000001-r-000000
write方法如果是带namedOutput参数的,需要在运行主类上面指定namedOutput,
MultipleOutputs.addNamedOutput(job, "PFBANK", TextOutputFormat.class, JournalTrxDataSet.class, NullWritable.class);
MultipleOutputs.addNamedOutput(job, "ZSBANK", TextOutputFormat.class, JournalTrxDataSet.class, NullWritable.class);
write(String namedOutput, K key, V value, String baseOutputPath)
write(String namedOutput, K key, V value)
这两种况和上面方法差不多,就是通过namedOutput对一组reduce处理的结果输出到不同的文件夹中,如果没有baseOutputPath,会输出到FileOutputFormat.setOutputPath()目录
if (Integer.parseInt(key.toString()) >= 500000){ mos.write("PFBANK", journalTrxDataSet, NullWritable.get(), "/PUFA/" + key.toString()); }else if(Integer.parseInt(key.toString()) < 500000){ mos.write("ZSBANK", journalTrxDataSet, NullWritable.get(), "/ZHAOSHANG/" + key.toString()); }
注意:有的时候会出现_SUCCESS文件和reduce输出的文件不在同一个目录,这是因为FileOutputFormat.setOutputPaht()和MultipleOutputs类的write()方法设置的baseOutputPath不一样所致,_SUCCESS文件始终在FileOutputFormat.setOutputPaht()设定的路径上
有的时候会报一些莫名其妙的错的话,可能是LazyOutputFormat.setOutputFormatClass()和MultipleOutputs.addNamedOutput()的formatclass参数有关
最后附上完整代码
当时我们的需求是,需要分析统计银行各个终端的交易情况,当时我们数据量也不太多,领导说尽可能简单点做,当时统计纬度有两种,一种是按照机器,一种是按照分行,所以直接使用了同一个mapreduce来完成,当然,下面代码只是统计机器的,分行的维度还没写上。当时mr程序是独立的一个模块,在数据采集完成后,会直接使用sh或cmd命令调用这个jar包,文件名当成启动参数传递过来。文件需要自定义
package com.xhy.xgg.mapreduce; import com.xhy.xgg.common.HDFSCommand; import com.xhy.xgg.common.enums.StatisticDemensionEnums; import com.xhy.xgg.entity.JournalTrxDataSet; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.SpringApplication; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.io.File; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; @Slf4j @Service public class MapReduceTest { @Value("${data-set.output-path}") private String dataCollectionOutputPath; @Value("${data-collection.output-path}") private String dataCollectionIutputPath; @Value("${data-collection.hdfsURI}") private String hdfsURI; @Value("${mr-run-mode.value}") private String mrRunMode; @PostConstruct public void executeMRJob(StatisticDemensionEnums statisticDemensionEnums) throws IOException, ClassNotFoundException, InterruptedException { log.info(">> executeMRJob start execute"); Configuration conf = new Configuration(); conf.set("dfs.blocksize", "67108864"); conf.set("dfs.replication", "1"); conf.set("mapreduce.framework.name", mrRunMode); conf.set("fs.defaultFS", hdfsURI); Job job; job = Job.getInstance(conf, "JournalDataProcessService"); job.setJarByClass(com.xhy.xgg.mapreduce.TerminalJournalDataService.class); job.getConfiguration().set("statisticDemensionEnums", String.valueOf(statisticDemensionEnums.getIndex())); job.setMapperClass(com.xhy.xgg.mapreduce.MapReduceTest.JournalDataMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(JournalTrxDataSet.class); job.setReducerClass(com.xhy.xgg.mapreduce.MapReduceTest.JournalDataReducer.class); job.setOutputKeyClass(JournalTrxDataSet.class); job.setOutputValueClass(NullWritable.class); job.setPartitionerClass(com.xhy.xgg.mapreduce.MapReduceTest.JournalDataPartitioner.class); // job.setSortComparatorClass(JournalTrxDataComparator.class); job.setNumReduceTasks(3); Path inputPath = new Path(dataCollectionIutputPath + File.separator + "esbTrx" + File.separator + hdfsFileName); log.info("-- executeMRJob inputPath = {}", inputPath.toString()); FileInputFormat.addInputPath(job, inputPath); String outPutPathData = ""; outPutPathData = dataCollectionOutputPath + "_" + new SimpleDateFormat("yyyyMMdd").format(new Date()); try { if (HDFSCommand.exists(conf, hdfsURI, outPutPathData)) { HDFSCommand.delete(conf, hdfsURI, outPutPathData); } } catch (Exception e) { // TODO Auto-generated catch block log.error("<< executeMRJob exception, message {}", e.getMessage()); } Path outPutPath = new Path(outPutPathData); FileOutputFormat.setOutputPath(job, outPutPath); MultipleOutputs.addNamedOutput(job, "PFBANK", TextOutputFormat.class, JournalTrxDataSet.class, NullWritable.class); MultipleOutputs.addNamedOutput(job, "ZSBANK", TextOutputFormat.class, JournalTrxDataSet.class, NullWritable.class); LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } public void start() throws Exception { executeMRJob(StatisticDemensionEnums.TERMINAL); } private String hdfsFileName = "journal_test.txt"; private static class JournalDataReducer extends Reducer<Text, JournalTrxDataSet, JournalTrxDataSet, NullWritable> { private String statisticDemensionEnums = null; JournalTrxDataSet journalTrxDataSet = new JournalTrxDataSet(); private MultipleOutputs mos; @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); mos = new MultipleOutputs<>(context); statisticDemensionEnums = context.getConfiguration().get("statisticDemensionEnums"); } @Override protected void reduce(Text key, Iterable<JournalTrxDataSet> values, Reducer<Text, JournalTrxDataSet, JournalTrxDataSet, NullWritable>.Context context) throws IOException, InterruptedException { String branchId = ""; String branchName = ""; String trxType = ""; double trxAmt = 0.0; int trxCount = 0; for (JournalTrxDataSet rw : values) { branchId = rw.getBranchId(); branchName = rw.getBranchName(); trxType = rw.getTrxType(); trxAmt += rw.getAmount(); trxCount += rw.getCount(); } journalTrxDataSet.Set(key.toString(), branchId, branchName, trxType, trxAmt, trxCount); if (Integer.parseInt(key.toString()) >= 500000) { mos.write("PFBANK", journalTrxDataSet, NullWritable.get(), "/PUFA/" + key.toString()); } else if (Integer.parseInt(key.toString()) < 500000) { mos.write("ZSBANK", journalTrxDataSet, NullWritable.get(), "/ZHAOSHANG/" + key.toString()); } //mos.write(journalTrxDataSet, NullWritable.get(), "/PUFA/"+ key.toString()); //mos.write(journalTrxDataSet, NullWritable.get(), key.toString()); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); mos.close(); } } private static class JournalDataPartitioner extends Partitioner<Text, JournalTrxDataSet> { @Override public int getPartition(Text key, JournalTrxDataSet value, int arg2) { if ("706010101".equals(value.getBranchId())) { return 0; } else if ("706010106".equals(value.getBranchId())) { return 1; } return 2; } } private static class JournalDataMapper extends Mapper<Object, Text, Text, JournalTrxDataSet> { private String statisticDemensionEnums = null; JournalTrxDataSet journalTrxDataSet = new JournalTrxDataSet(); @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); // 上面传递过来需要map reduce的纬度,一个按照终端来统计,一个按照分行来统计 statisticDemensionEnums = context.getConfiguration().get("statisticDemensionEnums"); } @Override protected void map(Object key, Text value, Mapper<Object, Text, Text, JournalTrxDataSet>.Context context) throws IOException, InterruptedException { String strContent = value.toString(); String result[] = strContent.split("\|"); String terminalId = result[0]; // terminal id String branchId = result[1]; // branch id String branchName = result[2]; // branch id double amount = Double.parseDouble(result[4]); // transaction amount String trxType = result[5]; journalTrxDataSet.Set(terminalId, branchId, branchName, trxType, amount, 1); context.write(new Text(terminalId), journalTrxDataSet); } } private static class JournalTrxDataComparator extends WritableComparator { protected JournalTrxDataComparator() { super(JournalTrxDataSet.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { JournalTrxDataSet j1 = (JournalTrxDataSet) w1; JournalTrxDataSet j2 = (JournalTrxDataSet) w2; int resultCompare = 0; /* * if (j1.getAmount() == j2.getAmount()) { resultCompare = 0; } else if * (j1.getAmount() < j2.getAmount()) { resultCompare = -1; } else { * resultCompare = 1; } */ return resultCompare;// return -1,0,1 } public static void main(String[] args) { SpringApplication.run(com.xhy.xgg.mapreduce.TerminalJournalDataService.class, args); } } }