zoukankan      html  css  js  c++  java
  • 通过MultipleOutputs写到多个文件

      MultipleOutputs 类可以将数据写到多个文件,这些文件的名称源于输出的键和值或者任意字符串。这允许每个 reducer(或者只有 map 作业的 mapper)创建多个文件。 采用name-m-nnnnn 形式的文件名用于 map 输出,name-r-nnnnn 形式的文件名用于 reduce 输出,其中 name 是由程序设定的任意名字, nnnnn 是一个指明块号的整数(从 0 开始)。块号保证从不同块(mapper 或 reducer)输出在相同名字情况下不会冲突

    1、项目需求

      假如这里有一份邮箱数据文件,我们期望统计邮箱出现次数并按照邮箱的类别,将这些邮箱分别输出到不同文件路径下。

    2、数据集

      wolys@21cn.com
      zss1984@126.com
      294522652@qq.com
      simulateboy@163.com
      zhoushigang_123@163.com
      sirenxing424@126.com
      lixinyu23@qq.com
      chenlei1201@gmail.com
      370433835@qq.com
      cxx0409@126.com
      viv093@sina.com
      q62148830@163.com
      65993266@qq.com
      summeredison@sohu.com
      zhangbao-autumn@163.com
      diduo_007@yahoo.com.cn
      fxh852@163.com

    3、实现

      1 package com.buaa;
      2 
      3 import java.io.IOException;
      4 
      5 import org.apache.hadoop.conf.Configuration;
      6 import org.apache.hadoop.conf.Configured;
      7 import org.apache.hadoop.fs.FileSystem;
      8 import org.apache.hadoop.fs.Path;
      9 import org.apache.hadoop.io.IntWritable;
     10 import org.apache.hadoop.io.LongWritable;
     11 import org.apache.hadoop.io.Text;
     12 import org.apache.hadoop.mapreduce.Job;
     13 import org.apache.hadoop.mapreduce.Mapper;
     14 import org.apache.hadoop.mapreduce.Reducer;
     15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     17 import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
     18 import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
     19 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     20 import org.apache.hadoop.util.Tool;
     21 import org.apache.hadoop.util.ToolRunner;
     22 
     23 /** 
     24 * @ProjectName MultipleOutputsDemo
     25 * @PackageName com.buaa
     26 * @ClassName EmailMultipleOutputsDemo
     27 * @Description 统计邮箱出现次数并按照邮箱的类别,将这些邮箱分别输出到不同文件路径下
     28 * @Author 刘吉超
     29 * @Date 2016-05-02 15:25:18
     30 */
     31 public class EmailMultipleOutputsDemo extends Configured implements Tool {
     32     
     33     public static class EmailMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
     34         private final static IntWritable one = new IntWritable(1);
     35 
     36         @Override
     37         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
     38             context.write(value, one);
     39         }
     40     }
     41     
     42     public static class EmailReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
     43         private MultipleOutputs<Text, IntWritable> multipleOutputs;
     44         
     45         @Override
     46         protected void setup(Context context) throws IOException ,InterruptedException{
     47             multipleOutputs = new MultipleOutputs< Text, IntWritable>(context);
     48         }
     49         
     50         protected void reduce(Text Key, Iterable<IntWritable> Values,Context context) throws IOException, InterruptedException {
     51             // 开始位置
     52             int begin = Key.toString().indexOf("@");
     53             // 结束位置
     54             int end = Key.toString().indexOf(".");
     55             
     56             if(begin >= end){
     57                 return;
     58             }
     59             
     60             // 获取邮箱类别,比如 qq
     61             String name = Key.toString().substring(begin+1, end);
     62             
     63             int sum = 0;
     64             for (IntWritable value : Values) {
     65                 sum += value.get();
     66             }
     67             
     68             /*
     69              * multipleOutputs.write(key, value, baseOutputPath)方法的第三个函数表明了该输出所在的目录(相对于用户指定的输出目录)。
     70              * 如果baseOutputPath不包含文件分隔符"/",那么输出的文件格式为baseOutputPath-r-nnnnn(name-r-nnnnn);
     71              * 如果包含文件分隔符"/",例如baseOutputPath="029070-99999/1901/part",那么输出文件则为029070-99999/1901/part-r-nnnnn
     72              */
     73             multipleOutputs.write(Key, new IntWritable(sum), name);
     74         }
     75         
     76         @Override
     77         protected void cleanup(Context context) throws IOException ,InterruptedException{
     78             multipleOutputs.close();
     79         }
     80     }
     81     
     82     @SuppressWarnings("deprecation")
     83     @Override
     84     public int run(String[] args) throws Exception {
     85         // 读取配置文件
     86         Configuration conf = new Configuration();
     87         
     88         // 判断目录是否存在,如果存在,则删除
     89         Path mypath = new Path(args[1]);
     90         FileSystem hdfs = mypath.getFileSystem(conf);
     91         if (hdfs.isDirectory(mypath)) {
     92             hdfs.delete(mypath, true);
     93         }
     94         
     95         // 新建一个任务
     96         Job job = new Job(conf, "MultipleDemo");  
     97         // 主类
     98         job.setJarByClass(EmailMultipleOutputsDemo.class);
     99         
    100         // 输入路径
    101         FileInputFormat.addInputPath(job, new Path(args[0]));
    102         // 输出路径
    103         FileOutputFormat.setOutputPath(job, new Path(args[1]));
    104         
    105         // Mapper
    106         job.setMapperClass(EmailMapper.class);
    107         // Reducer
    108         job.setReducerClass(EmailReducer.class);
    109         
    110         // key输出类型
    111         job.setOutputKeyClass(Text.class);
    112         // value输出类型
    113         job.setOutputValueClass(IntWritable.class);
    114         
    115         // 去掉job设置outputFormatClass,改为通过LazyOutputFormat设置
    116         LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);  
    117         
    118         return job.waitForCompletion(true)?0:1;
    119     }
    120     
    121     public static void main(String[] args0) throws Exception {
    122         // 数据输入路径和输出路径  
    123 //        String[] args0 = {  
    124 //                "hdfs://ljc:9000/buaa/email/email.txt",  
    125 //                "hdfs://ljc:9000/buaa/email/out/"  
    126 //        }; 
    127         int ec = ToolRunner.run(new Configuration(), new EmailMultipleOutputsDemo(), args0);
    128         System.exit(ec);
    129     }
    130 }

    4、运行效果

    运行效果(EmailMultipleOutputsDemo)

    5、注意事项

      1、在reducer中调用时,要调用MultipleOutputs以下接口

    1 public void write(KEYOUT key,VALUEOUT value, String baseOutputPath) throws IOException,InterruptedException 

      如果调用

    1 public <K,V> void write(String namedOutput, K key, V value) throws IOException, InterruptedException 

      则需要在job中,预先声明named output(如下),不然会报错:named output xxx not defined:

    1 MultipleOutputs.addNamedOutput(job, "moshouzhengba", TextOutputFormat.class, Text.class, Text.class);
    2 MultipleOutputs.addNamedOutput(job, "maoxiandao", TextOutputFormat.class, Text.class, Text.class);
    3 MultipleOutputs.addNamedOutput(job, "yingxionglianmen", TextOutputFormat.class, Text.class, Text.class);

    2. 默认情况下,输出目录会生成part-r-00000或者part-m-00000的空文件,需要如下设置后,才不会生成

    // job.setOutputFormatClass(TextOutputFormat.class);
    LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

      就是去掉job设置outputFormatClass,改为通过LazyOutputFormat设置
    3. multipleOutputs.write(key, value, baseOutputPath)方法的第三个函数表明了该输出所在的目录(相对于用户指定的输出目录)。如果baseOutputPath不包含文件分隔符“/”,那么输出的文件格式为baseOutputPath-r-nnnnn(name-r-nnnnn);如果包含文件分隔符“/”,例如baseOutputPath=“029070-99999/1901/part”,那么输出文件则为

      ajNCh3H

    如果,您认为阅读这篇博客让您有些收获,不妨点击一下右下角的【推荐】。
    如果,您希望更容易地发现我的新博客,不妨点击一下左下角的【关注我】。
    如果,您对我的博客所讲述的内容有兴趣,请继续关注我的后续博客,我是【刘超★ljc】。

    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

    实现代码及数据:下载

  • 相关阅读:
    react 全局监听报错,监听白屏
    【Python】(八)Python中的set集合(每个人都是唯一的个体)
    【Python】(六)Python数据类型-列表和元组,九浅一深,用得到
    mybatis-spring多数据源配置
    自己动手实现springboot运行时新增/更新外部接口
    自己动手实现springboot运行时执行java源码(运行时编译、加载、注册bean、调用)
    自己动手实现java断点/单步调试(一)
    自己动手实现java断点/单步调试(二)
    Swift 模式下面LLDB 输出对象
    Understanding Swift’s value type thread safety
  • 原文地址:https://www.cnblogs.com/codeOfLife/p/5452902.html
Copyright © 2011-2022 走看看