zoukankan      html  css  js  c++  java
  • MapReduce 规划 六系列 MultipleOutputs采用

    在前面的示例,输出文件名是默认:

    _logs         part-r-00001  part-r-00003  part-r-00005  part-r-00007  part-r-00009  part-r-00011  part-r-00013  _SUCCESS
    part-r-00000  part-r-00002  part-r-00004  part-r-00006  part-r-00008  part-r-00010  part-r-00012  part-r-00014
    

    part-r-0000N

    另一个_SUCCESS文件标志job执行成功。

    另一个文件夹_logs。


    可是实际情况中,我们有时候须要依据情况定制我的输出文件名称。

    比方我要依据did的值分组,产生不同的输出文件。全部did出现次数在[0, 2)的都输出到a文件里。在[2, 4)的输出大b文件。其它输出到c文件。

    这里涉及到的输出类是MultipleOutputs类。

    以下是介绍怎样实现。

    首先有一个小优化,为了避免每次执行时输入一长串命令,利用maven exec plugin,參考pom.xml配置例如以下:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>org.freebird</groupId>
      <artifactId>mr1_example2</artifactId>
      <packaging>jar</packaging>
      <version>1.0-SNAPSHOT</version>
      <name>mr1_example2</name>
      <url>http://maven.apache.org</url>
      <dependencies>
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-core</artifactId>
          <version>1.2.1</version>
        </dependency>
      </dependencies>
      <build>
        <plugins>
          <plugin>
            <groupId>org.codehaus.mojo</groupId>
            <artifactId>exec-maven-plugin</artifactId>
            <version>1.3.2</version>
            <executions>
              <execution>
                <goals>
                  <goal>exec</goal>
                </goals>
              </execution>
            </executions>
            <configuration>
              <executable>hadoop</executable>
              <arguments>
                <argument>jar</argument>
                <argument>target/mr1_example2-1.0-SNAPSHOT.jar</argument>
                <argument>org.freebird.LogJob</argument>
                <argument>/user/chenshu/share/logs</argument>
                <argument>/user/chenshu/share/output12</argument>
              </arguments>
            </configuration>
          </plugin>
        </plugins>
      </build>
    </project>
    

    这样每次mvn clean package之后,执行mvn exec:exec命令就可以。


    然后在LogJob.java文件加入几行代码:

    package org.freebird;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.freebird.reducer.LogReducer;
    import org.freebird.mapper.LogMapper;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    
    public class LogJob {                                                                                                                                                                                                
                                                                                                                                                                                                                         
        public static void main(String[] args) throws Exception {                                                                                                                                                        
            System.out.println("args[0]:" + args[0]);                                                                                                                                                                    
            System.out.println("args[1]:" + args[1]);                                                                                                                                                                    
                                                                                                                                                                                                                         
            Configuration conf = new Configuration();                                                                                                                                                                    
            Job job = new Job(conf, "sum_did_from_log_file");                                                                                                                                                            
            job.setJarByClass(LogJob.class);                                                                                                                                                                             
                                                                                                                                                                                                                         
            job.setMapperClass(org.freebird.mapper.LogMapper.class);                                                                                                                                                     
            job.setReducerClass(org.freebird.reducer.LogReducer.class);                                                                                                                                                  
                                                                                                                                                                                                                         
            job.setOutputKeyClass(Text.class);                                                                                                                                                                           
            job.setOutputValueClass(IntWritable.class);                                                                                                                                                                  
                                                                                                                                                                                                                         
            MultipleOutputs.addNamedOutput(job, "a", TextOutputFormat.class, Text.class, IntWritable.class);                                                                                                             
            MultipleOutputs.addNamedOutput(job, "b", TextOutputFormat.class, Text.class, Text.class);                                                                                                                    
            MultipleOutputs.addNamedOutput(job, "c", TextOutputFormat.class, Text.class, Text.class);                                                                                                                    
                                                                                                                                                                                                                         
            FileInputFormat.addInputPath(job, new Path(args[0]));                                                                                                                                                        
            FileOutputFormat.setOutputPath(job, new Path(args[1]));                                                                                                                                                      
                                                                                                                                                                                                                         
            System.exit(job.waitForCompletion(true) ? 0 : 1);                                                                                                                                                            
        }                                                                                                                                                                                                                
    }
    

    MultipleOutputs.addNamedOutput 函数被调用了三次,设置了文件名称为a,b和c。最后两个參数各自是output key和output value类型。应该和job.setOutputKeyClass以及job.setOutputValueClass保持一致。


    最后改动reducer类的代码:

    public class LogReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    
        private IntWritable result = new IntWritable();
    
        private MultipleOutputs outputs;
    
        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            System.out.println("enter LogReducer:::setup method");
            outputs = new MultipleOutputs(context);
        }
    
        @Override
        public void cleanup(Context context) throws IOException, InterruptedException {
            System.out.println("enter LogReducer:::cleanup method");
            outputs.close();
        }
    
        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context) throws IOException, InterruptedException {
            System.out.println("enter LogReducer::reduce method");
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            System.out.println("key: " + key.toString() + " sum: " + sum);                                                                                             
            if ((sum < 2) && (sum >= 0)) {
                outputs.write("a", key, sum);
            } else if (sum < 4) {
                outputs.write("b", key, sum);
            } else {
                outputs.write("c", key, sum);
            }
        }
    }
    

    依据同样key(did)sum的结果大小,写入到不同的文件里。执行后观察一下结果:

    [chenshu@hadoopMaster output12]$ ls
    a-r-00000  a-r-00004  a-r-00008  a-r-00012  b-r-00001  b-r-00005  b-r-00009  b-r-00013  c-r-00002  c-r-00006  c-r-00010  c-r-00014     part-r-00002  part-r-00006  part-r-00010  part-r-00014
    a-r-00001  a-r-00005  a-r-00009  a-r-00013  b-r-00002  b-r-00006  b-r-00010  b-r-00014  c-r-00003  c-r-00007  c-r-00011  _logs         part-r-00003  part-r-00007  part-r-00011  _SUCCESS
    a-r-00002  a-r-00006  a-r-00010  a-r-00014  b-r-00003  b-r-00007  b-r-00011  c-r-00000  c-r-00004  c-r-00008  c-r-00012  part-r-00000  part-r-00004  part-r-00008  part-r-00012
    a-r-00003  a-r-00007  a-r-00011  b-r-00000  b-r-00004  b-r-00008  b-r-00012  c-r-00001  c-r-00005  c-r-00009  c-r-00013  part-r-00001  part-r-00005  part-r-00009  part-r-00013
    

    打开随意的a,b和c开头的文件,查看值果然是如此

    5371700bc7b2231db03afeb0        6
    5371700cc7b2231db03afec0        7
    5371701cc7b2231db03aff8d        6
    5371709dc7b2231db03b0136        6
    537170a0c7b2231db03b01ac        6
    537170a6c7b2231db03b01fc        6
    537170a8c7b2231db03b0217        6
    537170b3c7b2231db03b0268        6
    53719aa9c7b2231db03b0721        6
    53719ad0c7b2231db03b0731        4
    

    使用MultipleOutputs依据sum值对设备ID进行分组成功了。

    MapReduce仍然会默认生使part....档,不要紧,它们是空文件。


    版权声明:本文博主原创文章,博客,未经同意不得转载。

  • 相关阅读:
    [转载][教程]vs2005入门 之 文件上传控件(FileUpLoad)[视频]
    .net 中使用Javacript弹出提示窗口方法总结
    创建分层行集的父表和子表之间的关系
    [转载][教程]vs2005控件演示之 Literal
    [转载][教程]母版页里面查找Repeater内控件(自动编号),并构造URL
    asp.net中的加密方法
    [转载][教程]vs2005/.NET2.0 控件演示之 文件上传 《FileUpload》 (二)
    [转][翻译]极好的ASP.NET2.0入门教程
    [原创]DATALIST 自定议翻页[支持模糊查询]
    六种异常处理的陋习(Java异常处理机制)——转载 Binary
  • 原文地址:https://www.cnblogs.com/yxwkf/p/4819545.html
Copyright © 2011-2022 走看看