zoukankan      html  css  js  c++  java
  • MapReduce的SequenceFileInputFormat使用

      1 package com.mengyao.hadoop.mapreduce;
      2 
      3 import java.io.IOException;
      4 import java.util.Iterator;
      5 
      6 import org.apache.hadoop.conf.Configuration;
      7 import org.apache.hadoop.conf.Configured;
      8 import org.apache.hadoop.fs.Path;
      9 import org.apache.hadoop.io.IntWritable;
     10 import org.apache.hadoop.io.NullWritable;
     11 import org.apache.hadoop.io.Text;
     12 import org.apache.hadoop.mapreduce.Job;
     13 import org.apache.hadoop.mapreduce.Mapper;
     14 import org.apache.hadoop.mapreduce.Reducer;
     15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     16 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
     17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     18 import org.apache.hadoop.util.Tool;
     19 import org.apache.hadoop.util.ToolRunner;
     20 
     21 /**
     22  * 使用SequenceFileInputFormat处理HDFS上的SequenceFile类型文件,输出为普通文件
     23  *
     24  * @author mengyao
     25  *
     26  */
     27 public class SequenceFileInputFormatApp extends Configured implements Tool  {
     28 
     29     static class SequenceFileInputFormatMapper extends Mapper<IntWritable, Text, NullWritable, Text> {
     30         
     31         private NullWritable outputKey;
     32         
     33         @Override
     34         protected void setup(Context context)
     35                 throws IOException, InterruptedException {
     36             this.outputKey = NullWritable.get();
     37         }
     38         
     39         @Override
     40         protected void map(IntWritable key, Text value, Context context)
     41                 throws IOException, InterruptedException {
     42             context.write(outputKey, value);
     43         }
     44     }
     45     
     46     static class SequenceFileInputFormatReducer extends Reducer<NullWritable, Text, NullWritable, Text> {
     47         
     48         private NullWritable outputKey;
     49         
     50         @Override
     51         protected void setup(Context context)
     52                 throws IOException, InterruptedException {
     53             this.outputKey = NullWritable.get();
     54         }
     55         
     56         @Override
     57         protected void reduce(NullWritable key, Iterable<Text> value, Context context)
     58                 throws IOException, InterruptedException {
     59             Iterator<Text> iterator = value.iterator();
     60             while (iterator.hasNext()) {
     61                 context.write(outputKey, iterator.next());
     62             }
     63         }
     64     }
     65 
     66     @Override
     67     public int run(String[] args) throws Exception {
     68         Job job = Job.getInstance(getConf(), SequenceFileInputFormatApp.class.getSimpleName());
     69         job.setJarByClass(SequenceFileInputFormatApp.class);
     70         
     71         job.setInputFormatClass(SequenceFileInputFormat.class);
     72         FileInputFormat.addInputPath(job, new Path(args[0]));
     73         FileOutputFormat.setOutputPath(job, new Path(args[1]));
     74         
     75         job.setMapperClass(SequenceFileInputFormatMapper.class);
     76         job.setMapOutputKeyClass(NullWritable.class);
     77         job.setMapOutputValueClass(Text.class);
     78         
     79         job.setReducerClass(SequenceFileInputFormatReducer.class);
     80         job.setOutputKeyClass(NullWritable.class);
     81         job.setOutputValueClass(Text.class);
     82         
     83         return job.waitForCompletion(true)?0:1;
     84     }
     85     
     86     public static int createJob(String[] args) {
     87         Configuration conf = new Configuration();
     88         conf.set("dfs.datanode.socket.write.timeout", "7200000");
     89         conf.set("mapreduce.input.fileinputformat.split.minsize", "268435456");
     90         conf.set("mapreduce.input.fileinputformat.split.maxsize", "536870912");
     91         conf.set("mapreduce.job.jvm.numtasks", "-1");        
     92         conf.set("mapreduce.map.speculative", "false");        
     93         conf.set("mapreduce.reduce.speculative", "false");    
     94         conf.set("mapreduce.map.maxattempts", "4");            
     95         conf.set("mapreduce.reduce.maxattempts", "4");        
     96         conf.set("mapreduce.map.skip.maxrecords", "0");
     97         int status = 0;
     98         
     99         try {
    100             status = ToolRunner.run(conf, new SequenceFileInputFormatApp(), args);
    101         } catch (Exception e) {
    102             e.printStackTrace();
    103         }
    104         
    105         return status;
    106     }
    107     
    108     public static void main(String[] args) {
    109         args = new String[]{"/mapreduces/seqfile/book1.txt", "/mapreduces/sequencefileinputformat"};
    110         if (args.length!=2) {
    111             System.out.println("Usage: "+SequenceFileInputFormatApp.class.getName()+" Input paramters <INPUT_PATH> <OUTPUT_PATH>");
    112         } else {
    113             int status = createJob(args);
    114             System.exit(status);
    115         }
    116     }
    117 }
  • 相关阅读:
    vue改变了数据却没有自动刷新
    Unable to find vcvarsall.bat
    修改Linux用户配置之后先验证再退出
    平面最远点对
    [转]你可能不知道的五个强大HTML5 API
    sqlite3常用技巧
    使用rsync
    画图必备numpy函数
    np.percentile获取中位数、百分位数
    [转]numpy 100道练习题
  • 原文地址:https://www.cnblogs.com/mengyao/p/4865582.html
Copyright © 2011-2022 走看看