zoukankan      html  css  js  c++  java
  • MapReduce的自定义结果文件名OutputFormat

      1 package com.mengyao.hadoop.mapreduce;
      2 
      3 import java.io.IOException;
      4 import java.net.URI;
      5 import java.net.URISyntaxException;
      6 import java.text.SimpleDateFormat;
      7 import java.util.Date;
      8 
      9 import org.apache.hadoop.fs.FSDataOutputStream;
     10 import org.apache.hadoop.fs.FileSystem;
     11 import org.apache.hadoop.fs.Path;
     12 import org.apache.hadoop.io.LongWritable;
     13 import org.apache.hadoop.io.Text;
     14 import org.apache.hadoop.mapreduce.Job;
     15 import org.apache.hadoop.mapreduce.JobContext;
     16 import org.apache.hadoop.mapreduce.Mapper;
     17 import org.apache.hadoop.mapreduce.OutputCommitter;
     18 import org.apache.hadoop.mapreduce.OutputFormat;
     19 import org.apache.hadoop.mapreduce.Partitioner;
     20 import org.apache.hadoop.mapreduce.RecordWriter;
     21 import org.apache.hadoop.mapreduce.Reducer;
     22 import org.apache.hadoop.mapreduce.TaskAttemptContext;
     23 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     24 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
     25 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
     26 import org.apache.hadoop.util.Tool;
     27 import org.apache.hadoop.util.ToolRunner;
     28 import org.apache.hadoop.conf.Configuration;
     29 import org.apache.hadoop.conf.Configured;
     30 
     31 public class JobOutputRenameApp extends Configured implements Tool {
     32 
     33     
     34     static class JobOutputRenameRecordWriter extends RecordWriter<Text, LongWritable> {
     35 
     36         FSDataOutputStream out;
     37         
     38         public JobOutputRenameRecordWriter(){
     39             
     40         }
     41         
     42         public JobOutputRenameRecordWriter(FSDataOutputStream out){
     43             this.out = out;
     44         }
     45         
     46         @Override
     47         public void write(Text key, LongWritable value) throws IOException,
     48                 InterruptedException {
     49             out.writeUTF(key.toString());
     50             out.writeUTF("\t");
     51             out.writeLong(value.get());
     52             out.writeUTF("\n");
     53         }
     54 
     55         @Override
     56         public void close(TaskAttemptContext context) throws IOException,
     57                 InterruptedException {
     58             out.close();
     59         }
     60         
     61     }
     62     
     63     static class JobOutputRenameOutputFormat extends OutputFormat<Text, LongWritable> {
     64         @Override
     65         public RecordWriter<Text, LongWritable> getRecordWriter(
     66                 TaskAttemptContext context) throws IOException,
     67                 InterruptedException {
     68             final String output_dir = context.getConfiguration().get("output_dir");
     69             final String output_name = context.getConfiguration().get("output_name");
     70             FSDataOutputStream out = null;
     71             try {
     72                 FileSystem fs = FileSystem.get(new URI(output_dir), context.getConfiguration(), "root");
     73                 out = fs.create(new Path(output_dir+output_name), true);
     74             } catch (IllegalArgumentException e) {
     75                 e.printStackTrace();
     76             } catch (URISyntaxException e) {
     77                 e.printStackTrace();
     78             }
     79             
     80             return new JobOutputRenameRecordWriter(out);
     81         }
     82 
     83         @Override
     84         public void checkOutputSpecs(JobContext context) throws IOException,
     85                 InterruptedException {
     86         }
     87 
     88         @Override
     89         public OutputCommitter getOutputCommitter(TaskAttemptContext context)
     90                 throws IOException, InterruptedException {
     91             final String output_dir = context.getConfiguration().get("output_dir");
     92             return new FileOutputCommitter(new Path(output_dir), context);
     93         }
     94         
     95     }
     96     
     97     static class JobOutputRenameMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
     98         
     99         private Text k = null;
    100         private LongWritable v = null;
    101         
    102         @Override
    103         protected void setup(
    104                 Mapper<LongWritable, Text, Text, LongWritable>.Context context)
    105                 throws IOException, InterruptedException {
    106             k = new Text();
    107             v = new LongWritable(1L);
    108         }
    109 
    110         @Override
    111         protected void map(LongWritable key, Text value, Context context)
    112                 throws IOException, InterruptedException {
    113             String[] words = value.toString().split("\t");
    114             for (String word : words) {
    115                 k.set(word);
    116                 context.write(k, v);
    117             }
    118         }
    119         
    120     }
    121     
    122     static class JobOutputRenamePartition extends Partitioner<Text, LongWritable> {
    123 
    124         @Override
    125         public int getPartition(Text key, LongWritable value, int numPartitions) {
    126             if (!key.toString().equals("hello")) {
    127                 return 1;
    128             }
    129             return 0;
    130         }
    131         
    132     }
    133     
    134     static class JobOutputRenameReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
    135         @Override
    136         protected void reduce(Text key, Iterable<LongWritable> value,
    137                 Reducer<Text, LongWritable, Text, LongWritable>.Context context)
    138                 throws IOException, InterruptedException {
    139             long count = 0L;
    140             for (LongWritable item : value) {
    141                 count += item.get();
    142             }
    143             context.write(key, new LongWritable(count));
    144         }
    145     }
    146     
    147     @Override
    148     public int run(String[] args) throws Exception {
    149         Configuration conf = getConf();
    150         conf.set("output_dir", args[1]);
    151         conf.set("output_name", args[2]);
    152         Job job = Job.getInstance(conf, JobOutputRenameApp.class.getSimpleName());
    153         job.setJarByClass(JobOutputRenameApp.class);
    154         
    155         FileInputFormat.addInputPath(job, new Path(args[0]));
    156         job.setInputFormatClass(TextInputFormat.class);
    157         job.setMapperClass(JobOutputRenameMapper.class);
    158         job.setMapOutputKeyClass(Text.class);
    159         job.setMapOutputValueClass(LongWritable.class);
    160         
    161 //        job.setPartitionerClass(JobOutputRenamePartition.class);
    162 //        job.setNumReduceTasks(2);
    163         
    164         job.setReducerClass(JobOutputRenameReducer.class);
    165         job.setOutputKeyClass(Text.class);
    166         job.setOutputValueClass(LongWritable.class);
    167         job.setOutputFormatClass(JobOutputRenameOutputFormat.class);
    168         
    169         return job.waitForCompletion(true)?0:1;
    170     }
    171     
    172     public static int createJob(String[] params){
    173         Configuration conf = new Configuration();
    174         int status = 1;
    175         try {
    176             status = ToolRunner.run(conf, new JobOutputRenameApp(), params);
    177         } catch (Exception e) {
    178             e.printStackTrace();
    179             new RuntimeException(e);
    180         }
    181         
    182         return status;
    183     }
    184 
    185     public static void main(String[] args) throws Exception {
    186         args = new String[3];
    187         args[0] = "/testdata/words";
    188         args[1] = "/job/mapreduce/"+JobOutputRenameApp.class.getSimpleName()+"_"+new SimpleDateFormat("yyyyMMddhhMMss").format(new Date())+"/";
    189         args[2] = "wordcount.txt";
    190         
    191 //        String[] otherArgs = new GenericOptionsParser(args).getRemainingArgs();
    192         if (args != null && args.length !=3) {
    193             System.out.println("Usage: "+JobOutputRenameApp.class.getSimpleName()+" <input dir> <output dir> <output name> ");
    194             System.exit(3);
    195         } else {
    196             int status = createJob(args);
    197             System.exit(status);
    198         }
    199     }
    200 }
  • 相关阅读:
    ucosii事件控制块------信号量集
    ucosii事件控制块------消息邮箱与消息队列
    C语言中续行符“”说明
    HTTP请求方法
    HTTP消息结构
    如何在Linux系统上安装字体
    LibreOffice openoffice 区别
    解决linux下不生成core dump文件
    开源图形数据库Neo4j使用 php开发
    Aria2 懒人安装教程
  • 原文地址:https://www.cnblogs.com/mengyao/p/4865600.html
Copyright © 2011-2022 走看看