zoukankan      html  css  js  c++  java
  • MapReduce-MulitipleOutputs实现自己定义输出到多个文件夹

    输入源数据例子:

    Source1-0001
    Source2-0002
    Source1-0003
    Source2-0004
    Source1-0005
    Source2-0006
    Source3-0007
    Source3-0008
    描写叙述:
    • Source1开头的数据属于集合A。
    • Source2开头的数据属于集合B;
    • Source3开头的数据即属于集合A,也属于集合B。

    输出要求:

    • 完整保留集合A数据(包括Source1、Source3开头数据)
    • 完整保留集合B数据(包括Source2、Source3开头数据)

    程序实现:

    import java.io.IOException;
    import java.util.List;
    import java.util.Map;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.mahout.common.AbstractJob;
    
    import com.yhd.common.util.HadoopUtil;
    
    /**
     * AbstractJob 是mahout的Job模板,能够不使用该模板,
     * 实则的核心部分在于MultipleOutputs部分
     * 
     * @author ouyangyewei
     *
     */
    public class TestMultipleOutputsJob extends AbstractJob {
        @Override
        public int run(String[] args) throws Exception {
            addInputOption();
            addOutputOption();
            
            Map<String, List<String>> parseArgs = parseArguments(args);
            if(parseArgs==null){
                return -1;
            }
            
            HadoopUtil.delete(getConf(), getOutputPath());
            
            Configuration conf = new Configuration();
            conf.setInt("mapred.reduce.tasks", 4);
            conf.set("mapred.job.queue.name", "pms");
            conf.set("mapred.child.java.opts", "-Xmx3072m");
            conf.set("mapreduce.reduce.shuffle.memory.limit.percent", "0.05");
    
            Job job = new Job(new Configuration(conf));
            job.setJobName("TestMultipleOutputsJob");
            job.setJarByClass(TestMultipleOutputsJob.class);
            job.setMapperClass(MultipleMapper.class);
            job.setNumReduceTasks(0);
            FileInputFormat.setInputPaths(job, this.getInputPath());
            FileOutputFormat.setOutputPath(job, this.getOutputPath());
            
            /** 输出文件格式将为:Source1-m-**** */
            MultipleOutputs.addNamedOutput(job, "Source1", TextOutputFormat.class, Text.class, Text.class);
            /** 输出文件格式将为:Source2-m-**** */
            MultipleOutputs.addNamedOutput(job, "Source2", TextOutputFormat.class, Text.class, Text.class);
            
            boolean suceeded = job.waitForCompletion(true);
            if(!suceeded) {
                return -1;
            }
            return 0;
        }
        
        /**
         * 
         * @author ouyangyewei
         *
         */
        public static class MultipleMapper extends Mapper<LongWritable, Text, Text, Text> {
            private MultipleOutputs<Text, Text> mos = null;
    
            @Override
            protected void setup(Context context
                                 ) throws IOException, InterruptedException {
                mos = new MultipleOutputs<Text, Text>(context);
            }
    
            public void map(LongWritable key, Text value, Context context
                            ) throws IOException, InterruptedException {
                String line = value.toString();
                String[] tokenizer = line.split("-");
    
                if (tokenizer[0].equals("Source1")) {
                    /** 集合A的数据 */
                    mos.write("Source1", new Text(tokenizer[0]), tokenizer[1]);
                } else if (tokenizer[0].equals("Source2")) {
                    /** 集合B的数据 */
                    mos.write("Source2", new Text(tokenizer[0]), tokenizer[1]);
                }
                
                /** 集合A交集合B的数据 */
                if (tokenizer[0].equals("Source3")) {
                    mos.write("Source1", new Text(tokenizer[0]), tokenizer[1]);
                    mos.write("Source2", new Text(tokenizer[0]), tokenizer[1]);
                }
            }
    
            protected void cleanup(Context context
                                   ) throws IOException, InterruptedException {
                mos.close();
            }
        }
        
        /**
         * @param args
         */
        public static void main(String[] args) {
            System.setProperty("javax.xml.parsers.DocumentBuilderFactory",
                "com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl");
            System.setProperty("javax.xml.parsers.SAXParserFactory", 
                "com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl");
            
            TestMultipleOutputsJob instance = new TestMultipleOutputsJob();
            try {
                instance.run(args);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    使用hadoop jar命令调度执行jar包代码:
    hadoop jar bigdata-datamining-1.0-user-trace-jar-with-dependencies.jar com.yhd.datamining.data.usertrack.offline.job.mapred.TestMultipleOutputsJob 
    --input /user/pms/workspace/ouyangyewei/testMultipleOutputs 
    --output /user/pms/workspace/ouyangyewei/testMultipleOutputs/output

    程序执行以后,输出的结果:
    [pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei]
    $hadoop fs -ls /user/pms/workspace/ouyangyewei/testMultipleOutputs/output
    Found 4 items
    -rw-r--r--   3 pms pms         65 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source1-m-00000
    -rw-r--r--   3 pms pms         65 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source2-m-00000
    -rw-r--r--   3 pms pms          0 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/_SUCCESS
    -rw-r--r--   3 pms pms          0 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/part-m-00000
    
    [pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei]
    $hadoop fs -cat /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source1-m-00000
    Source1	0001
    Source1	0003
    Source1	0005
    Source3	0007
    Source3	0008
    
    [pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei]
    $hadoop fs -cat /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source2-m-00000
    Source2	0002
    Source2	0004
    Source2	0006
    Source3	0007
    Source3	0008


    补充于2014-12-18:

    这样的方式的缺陷是会产生非常多类似Source1或Source2开头的子文件,一种非常好的方式就是指定baseOutputPath,将Source1开头的文件放在同一个文件夹中管理

    对上述代码进行改写实现文件夹管理:

    public void map(LongWritable key, Text value, Context context
                            ) throws IOException, InterruptedException {
                String line = value.toString();
                String[] tokenizer = line.split("-");
    
                if (tokenizer[0].equals("Source1")) {
                    /** 集合A的数据 */
                    mos.write("Source1", 
                              new Text(tokenizer[0]), 
                              tokenizer[1],
                              "Source1/part");
                } else if (tokenizer[0].equals("Source2")) {
                    /** 集合B的数据 */
                    mos.write("Source2", 
                              new Text(tokenizer[0]), 
                              tokenizer[1],
                              "Source2/part");
                }
                
                /** 集合A交集合B的数据 */
                if (tokenizer[0].equals("Source3")) {
                    mos.write("Source1", 
                              new Text(tokenizer[0]), 
                              tokenizer[1],
                              "Source1/part");
                    
                    mos.write("Source2", 
                              new Text(tokenizer[0]), 
                              tokenizer[1],
                              "Source2/part");
                }
            }
    程序执行以后,输出的结果:
    $hadoop fs -ls /user/pms/workspace/ouyangyewei/testUsertrack/job1Output
    Found 4 items
    -rw-r--r--   3 pms pms          0 2014-12-18 14:11 /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/_SUCCESS
    -rw-r--r--   3 pms pms          0 2014-12-18 14:11 /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/part-r-00000
    drwxr-xr-x   - pms pms          0 2014-12-18 14:11 /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/Source1
    drwxr-xr-x   - pms pms          0 2014-12-18 14:11 /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/Source2
    
    [pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei/testUsertrack]
    $hadoop fs -ls /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/Source1
    Found 1 items
    -rw-r--r--   3 pms pms   65 2014-12-18 14:11 /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/Source1/part-r-00000
    
    [pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei/testUsertrack]
    $hadoop fs -ls /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/Source2
    Found 1 items
    -rw-r--r--   3 pms pms   65 2014-12-18 14:11 /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/Source2/part-r-00000
    

    能够參考下:http://dirlt.com/mapred.html

  • 相关阅读:
    第二阶段冲刺--每日立会(6)
    第二阶段冲刺--每日立会(5)
    第十六周进度表
    第十五周进度表
    梦断代码阅读笔记之六
    梦断代码阅读笔记之五
    梦断代码阅读笔记之四
    梦断代码阅读笔记之三
    梦断代码阅读笔记之二
    梦断代码阅读笔记之一
  • 原文地址:https://www.cnblogs.com/yangykaifa/p/6773904.html
Copyright © 2011-2022 走看看