zoukankan      html  css  js  c++  java
  • MR案例:多文件输出MultipleOutputs

    问题描述:现有 ip-to-hosts.txt 数据文件,文件中每行数据有两个字段:分别是ip地址和该ip地址对应的国家,以' '分隔。要求汇总不同国家的IP数,并以国家名为文件名将其输出。解读:MultipleOutputs类

    测试数据:ip-to-hosts.txt

    18.217.167.70 United States
    206.96.54.107 United States
    196.109.151.139 Mauritius
    174.52.58.113 United States
    142.111.216.8 Canada

    代码实现:

    package country;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    public class Ip2Hosts {
        public static void main(String[] args) throws Exception { 
            
            //指定输入输出路径
            args =new String[] {"hdfs://10.16.17.182:9000/test/in/ip-to-hosts.txt","hdfs://10.16.17.182:9000/test/out/0821/09"};
            System.exit(run(args));
        }
    
        public static int run(String[] args) throws Exception {
    
            Job job = Job.getInstance(new Configuration());
            job.setJarByClass(Ip2Hosts.class);
    
            job.setMapperClass(IPCountryMapper.class);
            job.setReducerClass(IPCountryReducer.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            /**
             * 输出 08 和 09 需要调用此设置,07 就需要注释掉
             */
    MultipleOutputs.addNamedOutput(job,"abc",TextOutputFormat.class,Text.class,IntWritable.class); //通过此配置可以不再产生默认的空文件【part-*-00000】 LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); return job.waitForCompletion(true) ? 1 : 0; } //map阶段 public static class IPCountryMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] splited = value.toString().split(" "); context.write(new Text(splited[1]), new IntWritable(1)); } } //reduce阶段 public static class IPCountryReducer extends Reducer<Text, IntWritable, Text, IntWritable> { //1.定义多文件输出类MultipleOutputs private MultipleOutputs<Text, IntWritable> mos; @Override protected void setup(Context context ) throws IOException, InterruptedException { //2.MultipleOutputs初始化 mos = new MultipleOutputs<Text, IntWritable>(context); } @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int total = 0; for(IntWritable value: values) { total += value.get(); }         //3.调用MultipleOutputs中的write()方法
     
    //07-输出 mos.write(/*"abc",*/ key, new IntWritable(total),key.toString()); //08-输出 mos.write("abc", key, new IntWritable(total)/*,key.toString()*/); //09-输出 mos.write("abc", key, new IntWritable(total),key.toString()); } @Override protected void cleanup(Context context ) throws IOException, InterruptedException { //4.关闭流资源 mos.close(); } } }

    代码解读:

    1).输出-07所调用的方法和对应的输出结果:

    /**
     * @ 输出的key类型 
     * @ 输出的value类型
     * @ 输出的基路径,实际输出结果为:'基路径-r-00000'
     */
    MultipleOutputs.write(KEYOUT key, VALUEOUT value, String baseOutputPath)

    2).输出-08所调用的方法和对应的输出结果:

    /**
     * @ 自定义的输出.对于不指定'基路径',则结果为:'自定义的输出-r-00000'
     * @ 输出的key类型
     * @ 输出的value类型
     */
    MultipleOutputs.write(String namedOutput, K key, V value)

    3).输出-09所调用的方法和对应的输出结果:

    /**
     * @ 自定义的输出.
     * @ 输出的key类型
     * @ 输出的value类型
     * @ 输出的基路径,指定输出'基路径',则结果为:'基路径-r-00000'
     */
    MultipleOutputs.write(String namedOutput, K key, V value, String baseOutputPath)

    用法总结:

    1. 在Mapper或Reducer类中创建 MultipleOutputs 成员变量 mos
    2. setup()方法中初始化 mos 变量,
    3. map()或reduce()方法中调用 mos.write() 方法输出数据,代替context.write()
    4. mos.write() 方法具有三个重载,对于 输出-08-09 还需Job配置中指定输出格式
    5. cleanup()方法中调用 mos.close() 方法关闭输出流
  • 相关阅读:
    平衡——职场小说《监控》推荐
    《java程序员全攻略:从小工到专家》连载一:外行人眼中的IT人
    《java程序员全攻略:从小工到专家》连载二:IT行情分布
    各路技术牛人都推荐的书
    程序员2009精华本 有哪些值得期待
    揭秘孙小小《PPT演示之道》
    In the beginning, the world was void and without form…
    大学计算机课程复习操作系统
    大学计算机课程复习汇编语言
    (转)Winform 创建桌面快捷方式并开机启动
  • 原文地址:https://www.cnblogs.com/skyl/p/4732300.html
Copyright © 2011-2022 走看看