zoukankan      html  css  js  c++  java
  • MapReduce的KeyValueTextInputFormat使用

      1 package com.mengyao.hadoop.mapreduce;
      2 
      3 import java.io.IOException;
      4 
      5 import org.apache.hadoop.conf.Configuration;
      6 import org.apache.hadoop.conf.Configured;
      7 import org.apache.hadoop.fs.Path;
      8 import org.apache.hadoop.io.NullWritable;
      9 import org.apache.hadoop.io.Text;
     10 import org.apache.hadoop.mapreduce.InputFormat;
     11 import org.apache.hadoop.mapreduce.Job;
     12 import org.apache.hadoop.mapreduce.Mapper;
     13 import org.apache.hadoop.mapreduce.Reducer;
     14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     15 import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
     16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     17 import org.apache.hadoop.util.Tool;
     18 import org.apache.hadoop.util.ToolRunner;
     19 
     20 /**
     21  * KeyValueTextInputFormat默认将行的第一个制表符分隔,前面的是key,后面的是value。并且Mapper输入的k1、v1必须是 Text, Text。
     22  * mapreduce.input.keyvaluelinerecordreader.key.value.separator属性可以改变分隔符,默认为“\t”。官方API描述如下:
     23  *         An {@link InputFormat} for plain text files. Files are broken into lines.
     24  *         Either line feed or carriage-return are used to signal end of line.
     25  *         Each line is divided into key and value parts by a separator byte. If no
     26  *         such a byte exists, the key will be the entire line and value will be empty.
     27  *         The separator byte can be specified in config file under the attribute name
     28  *         mapreduce.input.keyvaluelinerecordreader.key.value.separator. The default
     29  *         is the tab character ('\t').
     30  *         public class KeyValueTextInputFormat extends FileInputFormat<Text, Text>
     31  *         
     32  * 此处应用场景为获取图书大纲,找出所有的一级索引,读取输入HDFS目录下的文件/mapreduces/bookOutline.txt,内容如下:
     33  *     第1章    PostgresQL服务器简介
     34  *         1.1    为什么在服务器中进行程序设计
     35  *         1.2    关于本书的代码示例
     36  *         1.3    超越简单函数
     37  *         1.4    使用触发器管理相关数据
     38  *         1.5    审核更改
     39  *         1.6    数据清洗
     40  *         1.7    定制排序方法
     41  *         1.8    程序设计最佳实践
     42  *         1.8.1    KISS——尽量简单(keep it simple stupid)
     43  *         1.8.2    DRY——不要写重复的代码(don't repeat yourself)
     44  *         1.8.3    YAGNI——你并不需要它(you ain'tgonnaneedit)
     45  *         1.8.4    SOA——服务导向架构(service-oriented architecture)
     46  *         1.8.5    类型的扩展
     47  *         1.9    关于缓存
     48  *         1.10    总结——为什么在服务器中进行程序设计
     49  *         1.10.1    性能
     50  *         1.10.2    易于维护
     51  *         1.10.3    保证安全的简单方法
     52  *         1.11    小结
     53  *     第2章    服务器程序设计环境
     54  *         2.1    购置成本
     55  *         2.2    开发者的可用性
     56  *         2.3    许可证书
     57  *         2.4    可预测性
     58  *         2.5    社区
     59  *         2.6    过程化语言
     60  *         2.6.1    平台兼容性
     61  *         2.6.2    应用程序设计
     62  *         2.6.3    更多基础
     63  *         2.7    小结
     64  *     第3章    第一个PL/pgsQL函数
     65  *         3.1    为什么是PL/pgSQL
     66  *         3.2    PL/pgSQL函数的结构
     67  *         ...
     68  *
     69  * 输出到HDFS目录下的文件/mapreduces/keyvaluetextinputformat/part-r-00000,内容如下:
     70  *        第1章
     71  *        第2章
     72  *        第3章
     73  *
     74  * @author mengyao
     75  *
     76  */
     77 public class KeyValueTextInputFormatApp extends Configured implements Tool {
     78 
     79     static class NLineInputFormatMapper extends Mapper<Text, Text, Text, NullWritable> {
     80         
     81         private NullWritable outputValue;
     82         
     83         @Override
     84         protected void setup(Context context)
     85                 throws IOException, InterruptedException {
     86             this.outputValue = NullWritable.get();
     87         }
     88         
     89         @Override
     90         protected void map(Text key, Text value, Context context)
     91                 throws IOException, InterruptedException {
     92             //如果key值不为空则认为是一级索引
     93             if (key.toString()!=null && !key.toString().isEmpty()) {
     94                 context.write(key, this.outputValue);
     95             }
     96         }
     97     }
     98     
     99     static class NLineInputFormatReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
    100         
    101         private NullWritable outputValue;
    102         
    103         @Override
    104         protected void setup(Context context)
    105                 throws IOException, InterruptedException {
    106             this.outputValue = NullWritable.get();
    107         }
    108         
    109         @Override
    110         protected void reduce(Text key, Iterable<NullWritable> value, Context context)
    111                 throws IOException, InterruptedException {
    112             context.write(key, outputValue);
    113         }
    114     }
    115     
    116     @Override
    117     public int run(String[] args) throws Exception {
    118         Job job = Job.getInstance(getConf(), KeyValueTextInputFormatApp.class.getSimpleName());
    119         job.setJarByClass(KeyValueTextInputFormatApp.class);
    120         
    121         job.setInputFormatClass(KeyValueTextInputFormat.class);
    122         FileInputFormat.addInputPath(job, new Path(args[0]));
    123         FileOutputFormat.setOutputPath(job, new Path(args[1]));
    124         
    125         job.setMapperClass(NLineInputFormatMapper.class);
    126         job.setMapOutputKeyClass(Text.class);
    127         job.setMapOutputValueClass(NullWritable.class);
    128         
    129         job.setReducerClass(NLineInputFormatReducer.class);
    130         job.setOutputKeyClass(Text.class);
    131         job.setOutputValueClass(NullWritable.class);
    132         
    133         return job.waitForCompletion(true)?0:1;
    134     }
    135 
    136     public static int createJob(String[] args) {
    137         Configuration conf = new Configuration();
    138         conf.set("dfs.datanode.socket.write.timeout", "7200000");
    139         conf.set("mapreduce.input.fileinputformat.split.minsize", "268435456");
    140         conf.set("mapreduce.input.fileinputformat.split.maxsize", "536870912");
    141         conf.set("mapreduce.job.jvm.numtasks", "-1");        
    142         conf.set("mapreduce.map.speculative", "false");        
    143         conf.set("mapreduce.reduce.speculative", "false");    
    144         conf.set("mapreduce.map.maxattempts", "4");            
    145         conf.set("mapreduce.reduce.maxattempts", "4");        
    146         conf.set("mapreduce.map.skip.maxrecords", "0");
    147         int status = 0;
    148         
    149         try {
    150             status = ToolRunner.run(conf, new KeyValueTextInputFormatApp(), args);
    151         } catch (Exception e) {
    152             e.printStackTrace();
    153         }
    154         
    155         return status;
    156     }
    157     
    158     public static void main(String[] args) {
    159         args = new String[]{"/mapreduces/bookOutline.txt", "/mapreduces/keyvaluetextinputformat"};
    160         if (args.length!=2) {
    161             System.out.println("Usage: "+KeyValueTextInputFormatApp.class.getName()+" Input paramters <INPUT_PATH> <OUTPUT_PATH>");
    162         } else {
    163             int status = createJob(args);
    164             System.exit(status);
    165         }
    166     }
    167 
    168 }
  • 相关阅读:
    完全卸载SQL Server 2008r2
    win7:你需要来自Administrators的权限才能对此文件进行修改的一个文件
    web.config文件中配置数据库连接的两种方式
    IIS6/7 配置操作
    IIS6/7 配置问题
    svn一整套使用,从下载到整个服务器搭建完成的详细说明
    HTTP协议详解
    深入理解String的关键点和方法
    将博客搬至CSDN
    对Java Web项目中路径的理解
  • 原文地址:https://www.cnblogs.com/mengyao/p/4865571.html
Copyright © 2011-2022 走看看