zoukankan      html  css  js  c++  java
  • MapReduce输出文件名更改

    1、默认情况下生成的文件名是part-r-00000格式,想要自定义生成输出文件名可以使用org.apache.hadoop.mapreduce.lib.output.MultipleOutputs类用来写出

    2、MultipleOutputs类需要在Reduce的setup()方法初始化,最好在cleanup()中关闭

    3、这个时候还会生产成part-r-000000这种文件,发现是里面是空的,需要 LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

    4、MultipleOutputs类的write()方法有几个重载的函数

    write(KEYOUT key, VALUEOUT value, String baseOutputPath)
    如果baseOutputPath带/,那么输出路径就是baseOutputPath + -r-00000,比如baseOutpuPath="/PUFA/" + key.toString(),输出文件路径就是/PUFA/0000000001-r-000000
    不带/,输出路径就是FileOutputFormat.setOutputPath(job, outPutPath)下面,比如baseOutputPath=key.toString(),ouputPath=/trx,输出文件的路径就是/trx/0000000001-r-000000
    
    
    write方法如果是带namedOutput参数的,需要在运行主类上面指定namedOutput,
    MultipleOutputs.addNamedOutput(job, "PFBANK", TextOutputFormat.class, JournalTrxDataSet.class, NullWritable.class);
    MultipleOutputs.addNamedOutput(job, "ZSBANK", TextOutputFormat.class, JournalTrxDataSet.class, NullWritable.class);
    write(String namedOutput, K key, V value, String baseOutputPath) 
    write(String namedOutput, K key, V value) 
    这两种况和上面方法差不多,就是通过namedOutput对一组reduce处理的结果输出到不同的文件夹中,如果没有baseOutputPath,会输出到FileOutputFormat.setOutputPath()目录
      if (Integer.parseInt(key.toString()) >= 500000){
           mos.write("PFBANK", journalTrxDataSet, NullWritable.get(), "/PUFA/" + key.toString());
      }else if(Integer.parseInt(key.toString()) < 500000){
           mos.write("ZSBANK", journalTrxDataSet, NullWritable.get(), "/ZHAOSHANG/" + key.toString());
      }

    注意:有的时候会出现_SUCCESS文件和reduce输出的文件不在同一个目录,这是因为FileOutputFormat.setOutputPaht()和MultipleOutputs类的write()方法设置的baseOutputPath不一样所致,_SUCCESS文件始终在FileOutputFormat.setOutputPaht()设定的路径上

     有的时候会报一些莫名其妙的错的话,可能是LazyOutputFormat.setOutputFormatClass()和MultipleOutputs.addNamedOutput()的formatclass参数有关

    最后附上完整代码

    当时我们的需求是,需要分析统计银行各个终端的交易情况,当时我们数据量也不太多,领导说尽可能简单点做,当时统计纬度有两种,一种是按照机器,一种是按照分行,所以直接使用了同一个mapreduce来完成,当然,下面代码只是统计机器的,分行的维度还没写上。当时mr程序是独立的一个模块,在数据采集完成后,会直接使用sh或cmd命令调用这个jar包,文件名当成启动参数传递过来。文件需要自定义

    package com.xhy.xgg.mapreduce;
    
    import com.xhy.xgg.common.HDFSCommand;
    import com.xhy.xgg.common.enums.StatisticDemensionEnums;
    import com.xhy.xgg.entity.JournalTrxDataSet;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Partitioner;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.SpringApplication;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.PostConstruct;
    import java.io.File;
    import java.io.IOException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    @Slf4j
    @Service
    public class MapReduceTest {
    
    
        @Value("${data-set.output-path}")
        private String dataCollectionOutputPath;
        @Value("${data-collection.output-path}")
        private String dataCollectionIutputPath;
    
        @Value("${data-collection.hdfsURI}")
        private String hdfsURI;
    
        @Value("${mr-run-mode.value}")
        private String mrRunMode;
    
        @PostConstruct
        public void executeMRJob(StatisticDemensionEnums statisticDemensionEnums)
                throws IOException, ClassNotFoundException, InterruptedException {
    
            log.info(">> executeMRJob start execute");
            Configuration conf = new Configuration();
            conf.set("dfs.blocksize", "67108864");
            conf.set("dfs.replication", "1");
            conf.set("mapreduce.framework.name", mrRunMode);
            conf.set("fs.defaultFS", hdfsURI);
            Job job;
            job = Job.getInstance(conf, "JournalDataProcessService");
            job.setJarByClass(com.xhy.xgg.mapreduce.TerminalJournalDataService.class);
            job.getConfiguration().set("statisticDemensionEnums", String.valueOf(statisticDemensionEnums.getIndex()));
            job.setMapperClass(com.xhy.xgg.mapreduce.MapReduceTest.JournalDataMapper.class);
            job.setMapOutputKeyClass(Text.class);
    
            job.setMapOutputValueClass(JournalTrxDataSet.class);
            job.setReducerClass(com.xhy.xgg.mapreduce.MapReduceTest.JournalDataReducer.class);
            job.setOutputKeyClass(JournalTrxDataSet.class);
            job.setOutputValueClass(NullWritable.class);
            job.setPartitionerClass(com.xhy.xgg.mapreduce.MapReduceTest.JournalDataPartitioner.class);
            // job.setSortComparatorClass(JournalTrxDataComparator.class);
            job.setNumReduceTasks(3);
            Path inputPath = new Path(dataCollectionIutputPath + File.separator + "esbTrx" + File.separator
                    + hdfsFileName);
    
            log.info("-- executeMRJob inputPath = {}", inputPath.toString());
    
            FileInputFormat.addInputPath(job, inputPath);
    
            String outPutPathData = "";
    
            outPutPathData = dataCollectionOutputPath + "_" + new SimpleDateFormat("yyyyMMdd").format(new Date());
            try {
                if (HDFSCommand.exists(conf, hdfsURI, outPutPathData)) {
                    HDFSCommand.delete(conf, hdfsURI, outPutPathData);
                }
            } catch (Exception e) {
                // TODO Auto-generated catch block
                log.error("<< executeMRJob exception, message {}", e.getMessage());
            }
            Path outPutPath = new Path(outPutPathData);
            FileOutputFormat.setOutputPath(job, outPutPath);
            MultipleOutputs.addNamedOutput(job, "PFBANK", TextOutputFormat.class, JournalTrxDataSet.class, NullWritable.class);
            MultipleOutputs.addNamedOutput(job, "ZSBANK", TextOutputFormat.class, JournalTrxDataSet.class, NullWritable.class);
    
            LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    
        public void start() throws Exception {
            executeMRJob(StatisticDemensionEnums.TERMINAL);
        }
    
        private String hdfsFileName = "journal_test.txt";
    
        private static class JournalDataReducer extends Reducer<Text, JournalTrxDataSet, JournalTrxDataSet, NullWritable> {
    
            private String statisticDemensionEnums = null;
            JournalTrxDataSet journalTrxDataSet = new JournalTrxDataSet();
    
            private MultipleOutputs mos;
    
            @Override
            protected void setup(Context context) throws IOException, InterruptedException {
                super.setup(context);
                mos = new MultipleOutputs<>(context);
                statisticDemensionEnums = context.getConfiguration().get("statisticDemensionEnums");
    
            }
    
            @Override
            protected void reduce(Text key, Iterable<JournalTrxDataSet> values,
                                  Reducer<Text, JournalTrxDataSet, JournalTrxDataSet, NullWritable>.Context context)
                    throws IOException, InterruptedException {
    
                String branchId = "";
                String branchName = "";
                String trxType = "";
                double trxAmt = 0.0;
                int trxCount = 0;
    
                for (JournalTrxDataSet rw : values) {
                    branchId = rw.getBranchId();
                    branchName = rw.getBranchName();
                    trxType = rw.getTrxType();
                    trxAmt += rw.getAmount();
                    trxCount += rw.getCount();
    
                }
                journalTrxDataSet.Set(key.toString(), branchId, branchName, trxType, trxAmt, trxCount);
                if (Integer.parseInt(key.toString()) >= 500000) {
                    mos.write("PFBANK", journalTrxDataSet, NullWritable.get(), "/PUFA/" + key.toString());
                } else if (Integer.parseInt(key.toString()) < 500000) {
                    mos.write("ZSBANK", journalTrxDataSet, NullWritable.get(), "/ZHAOSHANG/" + key.toString());
                }
                //mos.write(journalTrxDataSet, NullWritable.get(), "/PUFA/"+  key.toString());
                //mos.write(journalTrxDataSet, NullWritable.get(), key.toString());
            }
    
            @Override
            protected void cleanup(Context context) throws IOException, InterruptedException {
                super.cleanup(context);
                mos.close();
            }
    
        }
    
        private static class JournalDataPartitioner extends Partitioner<Text, JournalTrxDataSet> {
            @Override
            public int getPartition(Text key, JournalTrxDataSet value, int arg2) {
    
                if ("706010101".equals(value.getBranchId())) {
                    return 0;
                } else if ("706010106".equals(value.getBranchId())) {
                    return 1;
                }
                return 2;
            }
    
        }
    
        private static class JournalDataMapper extends Mapper<Object, Text, Text, JournalTrxDataSet> {
    
            private String statisticDemensionEnums = null;
    
            JournalTrxDataSet journalTrxDataSet = new JournalTrxDataSet();
    
            @Override
            protected void setup(Context context) throws IOException, InterruptedException {
                super.setup(context);
                // 上面传递过来需要map reduce的纬度,一个按照终端来统计,一个按照分行来统计
                statisticDemensionEnums = context.getConfiguration().get("statisticDemensionEnums");
    
            }
    
            @Override
            protected void map(Object key, Text value, Mapper<Object, Text, Text, JournalTrxDataSet>.Context context)
                    throws IOException, InterruptedException {
    
                String strContent = value.toString();
                String result[] = strContent.split("\|");
                String terminalId = result[0]; // terminal id
                String branchId = result[1]; // branch id
                String branchName = result[2]; // branch id
                double amount = Double.parseDouble(result[4]); // transaction amount
                String trxType = result[5];
                journalTrxDataSet.Set(terminalId, branchId, branchName, trxType, amount, 1);
                context.write(new Text(terminalId), journalTrxDataSet);
            }
    
        }
    
        private static class JournalTrxDataComparator extends WritableComparator {
    
    
            protected JournalTrxDataComparator() {
                super(JournalTrxDataSet.class, true);
            }
    
            @Override
            public int compare(WritableComparable w1, WritableComparable w2) {
    
                JournalTrxDataSet j1 = (JournalTrxDataSet) w1;
                JournalTrxDataSet j2 = (JournalTrxDataSet) w2;
                int resultCompare = 0;
                /*
                 * if (j1.getAmount() == j2.getAmount()) { resultCompare = 0; } else if
                 * (j1.getAmount() < j2.getAmount()) { resultCompare = -1; } else {
                 * resultCompare = 1; }
                 */
                return resultCompare;// return -1,0,1
            }
    
            public static void main(String[] args) {
    
                SpringApplication.run(com.xhy.xgg.mapreduce.TerminalJournalDataService.class, args);
            }
    
        }
    }
  • 相关阅读:
    DjangoRestFramework学习二之序列化组件、视图组件 serializer modelserializer
    DjangoRestFramework 学习之restful规范 APIview 解析器组件 Postman等
    vue学习目录 vue初识 this指向问题 vue组件传值 过滤器 钩子函数 路由 全家桶 脚手架 vuecli element-ui axios bus
    vue框架的搭建 安装vue/cli 脚手架 安装node.js 安装cnpm
    Redis集合 安装 哨兵集群 安全配置 redis持久化
    linux vue uwsgi nginx 部署路飞学城 安装 vue
    爬虫第三部分综合案例
    creating server tcp listening socket 127.0.0.1:6379: bind No error
    爬虫第二部分
    外键为什么加上索引?
  • 原文地址:https://www.cnblogs.com/xhy-shine/p/10637527.html
Copyright © 2011-2022 走看看