zoukankan      html  css  js  c++  java
  • MapReduce的CombineFileInputFormat使用

      1 package com.mengyao.hadoop.mapreduce;
      2 
      3 import java.io.IOException;
      4 import java.util.Iterator;
      5 
      6 import org.apache.hadoop.conf.Configuration;
      7 import org.apache.hadoop.conf.Configured;
      8 import org.apache.hadoop.fs.Path;
      9 import org.apache.hadoop.io.LongWritable;
     10 import org.apache.hadoop.io.Text;
     11 import org.apache.hadoop.mapreduce.Counter;
     12 import org.apache.hadoop.mapreduce.Job;
     13 import org.apache.hadoop.mapreduce.Mapper;
     14 import org.apache.hadoop.mapreduce.Reducer;
     15 import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
     16 import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
     17 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     19 import org.apache.hadoop.util.Tool;
     20 import org.apache.hadoop.util.ToolRunner;
     21 
     22 /**
     23  * 使用CombinerTextInputFormat可以把多个小文件打包到一个输入分片中使每个Mapper任务处理更多的数据。
     24  * 这样就避免了像FileInputFormat那样为每个文件(无论文件大小)都使用一个分片导致遇到小文件时会有很多的Mapper任务(每个Mapper任务处理的数据很小)。
     25  *
     26  * @author mengyao
     27  *
     28  */
     29 public class CombinerTextInputFormatApp extends Configured implements Tool {
     30 
     31     static class CombinerTextInputFormatMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
     32         
     33         @Override
     34         protected void setup(Context context)
     35                 throws IOException, InterruptedException {
     36         }
     37         
     38         @Override
     39         protected void map(LongWritable key, Text value, Context context)
     40                 throws IOException, InterruptedException {
     41             CombineFileSplit cfs = (CombineFileSplit)context.getInputSplit();
     42             String fileA = cfs.getPath(0).getName();
     43             String fileB = cfs.getPath(1).getName();
     44             Counter counterA = context.getCounter(fileA, fileA);
     45             Counter counterB = context.getCounter(fileB, fileB);
     46             if (fileA.equals(counterA.getName())) {
     47                 counterA.increment(1L);
     48             }
     49             if (fileB.equals(counterB.getName())) {
     50                 counterB.increment(1L);
     51             }
     52             context.write(key,value);
     53         }
     54     }
     55     
     56     static class CombinerTextInputFormatReducer extends Reducer<LongWritable, Text, LongWritable, Text> {
     57         
     58         @Override
     59         protected void setup(Context context)
     60                 throws IOException, InterruptedException {
     61         }
     62         
     63         @Override
     64         protected void reduce(LongWritable key, Iterable<Text> value, Context context)
     65                 throws IOException, InterruptedException {
     66             Iterator<Text> iterator = value.iterator();
     67             while (iterator.hasNext()) {
     68                 context.write(key, iterator.next());
     69             }
     70         }
     71     }
     72 
     73     @Override
     74     public int run(String[] args) throws Exception {
     75         Job job = Job.getInstance(getConf(), CombinerTextInputFormatApp.class.getSimpleName());
     76         job.setJarByClass(CombinerTextInputFormatApp.class);
     77         
     78         job.setInputFormatClass(CombineTextInputFormat.class);
     79         FileInputFormat.addInputPath(job, new Path(args[0]));
     80         FileOutputFormat.setOutputPath(job, new Path(args[1]));
     81         
     82         job.setMapperClass(CombinerTextInputFormatMapper.class);
     83         job.setMapOutputKeyClass(LongWritable.class);
     84         job.setMapOutputValueClass(Text.class);
     85         
     86         job.setReducerClass(CombinerTextInputFormatReducer.class);
     87         job.setOutputKeyClass(LongWritable.class);
     88         job.setOutputValueClass(Text.class);
     89         
     90         return job.waitForCompletion(true)?0:1;
     91     }
     92     
     93     public static int createJob(String[] args) {
     94         Configuration conf = new Configuration();
     95         conf.set("dfs.datanode.socket.write.timeout", "7200000");
     96         conf.set("mapreduce.input.fileinputformat.split.minsize", "268435456");
     97         conf.set("mapreduce.input.fileinputformat.split.maxsize", "536870912");
     98         conf.set("mapreduce.job.jvm.numtasks", "-1");        
     99         conf.set("mapreduce.map.speculative", "false");        
    100         conf.set("mapreduce.reduce.speculative", "false");    
    101         conf.set("mapreduce.map.maxattempts", "4");            
    102         conf.set("mapreduce.reduce.maxattempts", "4");        
    103         conf.set("mapreduce.map.skip.maxrecords", "0");
    104         int status = 0;
    105         
    106         try {
    107             status = ToolRunner.run(conf, new CombinerTextInputFormatApp(), args);
    108         } catch (Exception e) {
    109             e.printStackTrace();
    110         }
    111         
    112         return status;
    113     }
    114     
    115     public static void main(String[] args) {
    116         args = new String[]{"/mapreduces/*.txt", "/mapreduces/combinertextinputFormat"};
    117         if (args.length!=2) {
    118             System.out.println("Usage: "+CombinerTextInputFormatApp.class.getName()+" Input paramters <INPUT_PATH> <OUTPUT_PATH>");
    119         } else {
    120             int status = createJob(args);
    121             System.exit(status);
    122         }
    123     }
    124     
    125 }
  • 相关阅读:
    c中的数组与字符串
    c中的函数
    C中的流程控制
    c中的基本运算
    scanf函数
    c中的数据类型、常量、变量
    c中的关键字、标识符、注释
    ios必须知道的事情
    安卓开发之获取SD卡空间数据
    安卓日志猫的使用
  • 原文地址:https://www.cnblogs.com/mengyao/p/4865567.html
Copyright © 2011-2022 走看看