zoukankan      html  css  js  c++  java
  • hadoop1.2.1 MultipleOutputs将结果输出到多个文件或文件夹

    hadoop1.2.1 MultipleOutputs将结果输出到多个文件或文件夹

    博客分类:http://tydldd.iteye.com/blog/2053867
     

    hadoop1.2.1中使用MultipleOutputs将结果输出到多个文件或文件夹

    使用步骤主要有三步:

    1、在reduce或map类中创建MultipleOutputs对象,将结果输出

    Java代码  收藏代码
    1. class reduceStatistics extends Reducer<Text, IntWritable, Text, IntWritable>{  
    2.   
    3.     //将结果输出到多个文件或多个文件夹  
    4.     private MultipleOutputs<Text,IntWritable> mos;  
    5.     //创建对象  
    6.     protected void setup(Context context) throws IOException,InterruptedException {  
    7.         mos = new MultipleOutputs<Text, IntWritable>(context);  
    8.      }  
    9.           
    10.         //关闭对象  
    11.     protected void cleanup(Context context) throws IOException,InterruptedException {  
    12.         mos.close();  
    13.     }  
    14. }  

     2、在map或reduce方法中使用MultipleOutputs对象输出数据,代替congtext.write()

    Java代码  收藏代码
    1. protected void reduce(Text key, Iterable<IntWritable> values, Context context)  
    2.             throws IOException, InterruptedException {  
    3.         IntWritable V = new IntWritable();  
    4.         int sum = 0;  
    5.         for(IntWritable value : values){  
    6.             sum = sum + value.get();  
    7.         }  
    8.         System.out.println("word:" + key.toString() + "     sum = " + sum);  
    9.         V.set(sum);  
    10.   
    11.         //使用MultipleOutputs对象输出数据  
    12.         if(key.toString().equals("hello")){  
    13.             mos.write("hello", key, V);  
    14.         }else if(key.toString().equals("world")){  
    15.             mos.write("world", key, V);  
    16.         }else if(key.toString().equals("hadoop")){  
    17.             //输出到hadoop/hadoopfile-r-00000文件  
    18.             mos.write("hadoopfile", key, V, "hadoop/");  
    19.         }  
    20.           
    21.     }  

     3、在创建job时,定义附加的输出文件,这里的文件名称与第二步设置的文件名相同

    Java代码  收藏代码
    1. //定义附加的输出文件  
    2.             MultipleOutputs.addNamedOutput(job,"hello",TextOutputFormat.class,Text.class,IntWritable.class);  
    3.             MultipleOutputs.addNamedOutput(job,"world",TextOutputFormat.class,Text.class,IntWritable.class);  
    4.             MultipleOutputs.addNamedOutput(job,"hadoopfile",TextOutputFormat.class,Text.class,IntWritable.class);  

    完整代码:

    Java代码  收藏代码
    1. package com.ru.hadoop.wordcount;  
    2.   
    3. import java.io.IOException;  
    4. import java.net.URI;  
    5. import java.net.URISyntaxException;  
    6. import java.util.regex.Pattern;  
    7.   
    8. import org.apache.hadoop.conf.Configuration;  
    9. import org.apache.hadoop.conf.Configured;  
    10. import org.apache.hadoop.fs.FileSystem;  
    11. import org.apache.hadoop.fs.Path;  
    12. import org.apache.hadoop.io.IntWritable;  
    13. import org.apache.hadoop.io.LongWritable;  
    14. import org.apache.hadoop.io.NullWritable;  
    15. import org.apache.hadoop.io.Text;  
    16. import org.apache.hadoop.mapred.JobConf;  
    17. import org.apache.hadoop.mapred.RecordWriter;  
    18. import org.apache.hadoop.mapred.lib.MultipleOutputFormat;  
    19. import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;  
    20. import org.apache.hadoop.mapreduce.Job;  
    21. import org.apache.hadoop.mapreduce.Mapper;  
    22. import org.apache.hadoop.mapreduce.Reducer;  
    23. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
    24. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
    25. import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;  
    26. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
    27. import org.apache.hadoop.util.Progressable;  
    28.   
    29. public class WordCount2 extends Configured{  
    30.   
    31.     public static void main(String[] args) {  
    32.         String in = "/home/nange/work/test/word/";  
    33.         String out = "hdfs://localhost:9000/hdfs/test/wordcount/out/";  
    34.           
    35.         Job job;  
    36.         try {  
    37.             //删除hdfs目录  
    38.             WordCount2 wc2 = new WordCount2();  
    39.             wc2.removeDir(out);  
    40.               
    41.             job = new Job(new Configuration(), "wordcount Job");  
    42.             job.setOutputKeyClass(Text.class);  
    43.             job.setOutputValueClass(IntWritable.class);  
    44.             job.setMapperClass(mapperString.class);  
    45. //          job.setCombinerClass(reduceStatistics.class);  
    46.             job.setReducerClass(reduceStatistics.class);  
    47.               
    48.             //定义附加的输出文件  
    49.             MultipleOutputs.addNamedOutput(job,"hello",TextOutputFormat.class,Text.class,IntWritable.class);  
    50.             MultipleOutputs.addNamedOutput(job,"world",TextOutputFormat.class,Text.class,IntWritable.class);  
    51.             MultipleOutputs.addNamedOutput(job,"hadoopfile",TextOutputFormat.class,Text.class,IntWritable.class);  
    52.               
    53.             FileInputFormat.addInputPath(job, new Path(in));  
    54.             FileOutputFormat.setOutputPath(job, new Path(out));  
    55.             job.waitForCompletion(true);  
    56.         } catch (IOException e) {  
    57.             e.printStackTrace();  
    58.         } catch (URISyntaxException e) {  
    59.             e.printStackTrace();  
    60.         } catch (ClassNotFoundException e) {  
    61.             e.printStackTrace();  
    62.         } catch (InterruptedException e) {  
    63.             e.printStackTrace();  
    64.         }  
    65.     }  
    66.       
    67.     public void removeDir(String filePath) throws IOException, URISyntaxException{  
    68.         String url = "hdfs://localhost:9000";  
    69.         FileSystem fs  = FileSystem.get(new URI(url), new Configuration());  
    70.         fs.delete(new Path(filePath));  
    71.     }  
    72. }  
    73.   
    74.   
    75. /** 
    76.  * 重写maptask使用的map方法  
    77.  * @author nange 
    78.  * 
    79.  */  
    80. class mapperString extends Mapper<LongWritable, Text, Text, IntWritable>{  
    81.     //设置正则表达式的编译表达形式  
    82.     public static Pattern PATTERN = Pattern.compile(" ");  
    83.     Text K = new Text();  
    84.     IntWritable V = new IntWritable(1);  
    85.     @Override  
    86.     protected void map(LongWritable key, Text value, Context context)  
    87.             throws IOException, InterruptedException {  
    88.           
    89.         String[] words = PATTERN.split(value.toString());  
    90.         System.out.println("********" + value.toString());  
    91.         for(String word : words){  
    92.             K.set(word);  
    93.             context.write(K, V);  
    94.         }  
    95.     }  
    96. }  
    97.   
    98. /** 
    99.  * 对单词做统计 
    100.  * @author nange 
    101.  * 
    102.  */  
    103. class reduceStatistics extends Reducer<Text, IntWritable, Text, IntWritable>{  
    104.   
    105.     //将结果输出到多个文件或多个文件夹  
    106.     private MultipleOutputs<Text,IntWritable> mos;  
    107.     //创建MultipleOutputs对象  
    108.     protected void setup(Context context) throws IOException,InterruptedException {  
    109.         mos = new MultipleOutputs<Text, IntWritable>(context);  
    110.      }  
    111.       
    112.     @Override  
    113.     protected void reduce(Text key, Iterable<IntWritable> values, Context context)  
    114.             throws IOException, InterruptedException {  
    115.         IntWritable V = new IntWritable();  
    116.         int sum = 0;  
    117.         for(IntWritable value : values){  
    118.             sum = sum + value.get();  
    119.         }  
    120.         System.out.println("word:" + key.toString() + "     sum = " + sum);  
    121.         V.set(sum);  
    122.   
    123.         //使用MultipleOutputs对象输出数据  
    124.         if(key.toString().equals("hello")){  
    125.             mos.write("hello", key, V);  
    126.         }else if(key.toString().equals("world")){  
    127.             mos.write("world", key, V);  
    128.         }else if(key.toString().equals("hadoop")){  
    129.             //输出到hadoop/hadoopfile-r-00000文件  
    130.             mos.write("hadoopfile", key, V, "hadoop/");  
    131.         }  
    132.           
    133.     }  
    134.       
    135.     //关闭MultipleOutputs对象  
    136.     protected void cleanup(Context context) throws IOException,InterruptedException {  
    137.         mos.close();  
    138.     }  
    139. }  
  • 相关阅读:
    内存管理
    git学习思维导图
    shell编程 条件判断式----利用 case ..... esac 判断
    shell编程 条件判断式----利用 if .... then ----多重
    shell编程 Shell script 的默认变量($0, $1...)
    VxWorks实验六 基于优先级的抢占式调度及实验的源程序和实验步骤
    Linux下FTP用户的使用配置
    Linux 下 FTP虚拟用户的使用配置
    Linux 常用命令
    实验一 用户界面与Shell命令
  • 原文地址:https://www.cnblogs.com/luolizhi/p/4931561.html
Copyright © 2011-2022 走看看