zoukankan      html  css  js  c++  java
  • MapReduce的NLineInputFormat使用

      1 package com.mengyao.hadoop.mapreduce;
      2 
      3 import java.io.IOException;
      4 
      5 import org.apache.hadoop.conf.Configuration;
      6 import org.apache.hadoop.conf.Configured;
      7 import org.apache.hadoop.fs.Path;
      8 import org.apache.hadoop.io.LongWritable;
      9 import org.apache.hadoop.io.NullWritable;
     10 import org.apache.hadoop.io.Text;
     11 import org.apache.hadoop.mapreduce.Job;
     12 import org.apache.hadoop.mapreduce.Mapper;
     13 import org.apache.hadoop.mapreduce.Reducer;
     14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     15 import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
     16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     17 import org.apache.hadoop.util.Tool;
     18 import org.apache.hadoop.util.ToolRunner;
     19 
     20 /**
     21  * NLineInputFormat前面的N表示每个Mapper收到输入的行数,N的默认输入行数是1。mapreduce.input.lineinputformat.linespermap属性实现N值的设定。如果希望使Mapper收到固定行数的输入可以使用该类实现。
     22  * 与TextInputFormat相同,key是文件中行的字节偏移量,value是行本身。
     23  * 通常情况下,对少量的输入行执行map任务是比较低效的(任务初始化的额外开销导致)。
     24  *
     25  * 使用NLineInputFormat设置Mapper任务每次输入处理4行,此处应用场景为获取图书大纲,找出所有的一级索引,读取输入HDFS目录下的文件/mapreduces/bookOutline.txt,内容如下:
     26  *     第1章    PostgresQL服务器简介
     27  *         1.1    为什么在服务器中进行程序设计
     28  *         1.2    关于本书的代码示例
     29  *         1.3    超越简单函数
     30  *         1.4    使用触发器管理相关数据
     31  *         1.5    审核更改
     32  *         1.6    数据清洗
     33  *         1.7    定制排序方法
     34  *         1.8    程序设计最佳实践
     35  *         1.8.1    KISS——尽量简单(keep it simple stupid)
     36  *         1.8.2    DRY——不要写重复的代码(don't repeat yourself)
     37  *         1.8.3    YAGNI——你并不需要它(you ain'tgonnaneedit)
     38  *         1.8.4    SOA——服务导向架构(service-oriented architecture)
     39  *         1.8.5    类型的扩展
     40  *         1.9    关于缓存
     41  *         1.10    总结——为什么在服务器中进行程序设计
     42  *         1.10.1    性能
     43  *         1.10.2    易于维护
     44  *         1.10.3    保证安全的简单方法
     45  *         1.11    小结
     46  *     第2章    服务器程序设计环境
     47  *         2.1    购置成本
     48  *         2.2    开发者的可用性
     49  *         2.3    许可证书
     50  *         2.4    可预测性
     51  *         2.5    社区
     52  *         2.6    过程化语言
     53  *         2.6.1    平台兼容性
     54  *         2.6.2    应用程序设计
     55  *         2.6.3    更多基础
     56  *         2.7    小结
     57  *     第3章    第一个PL/pgsQL函数
     58  *         3.1    为什么是PL/pgSQL
     59  *         3.2    PL/pgSQL函数的结构
     60  *         ...
     61  *
     62  * 输出到HDFS目录下的文件/mapreduces/nlineinputformat/part-r-00000,内容如下:
     63  *        第1章    PostgresQL服务器简介
     64  *        第2章    服务器程序设计环境
     65  *        第3章    第一个PL/pgsQL函数
     66  *
     67  * @author mengyao
     68  *
     69  */
     70 public class NLineInputFormatApp extends Configured implements Tool {
     71 
     72     static class NLineInputFormatMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
     73         
     74         private Text outputValue;
     75         
     76         @Override
     77         protected void setup(Context context)
     78                 throws IOException, InterruptedException {
     79             this.outputValue = new Text();
     80         }
     81         
     82         @Override
     83         protected void map(LongWritable key, Text value, Context context)
     84                 throws IOException, InterruptedException {
     85             final String line = value.toString();
     86             //如果行第一个字是“第”则认为是一级索引
     87             if (line.startsWith("第")) {
     88                 outputValue.set(line);
     89                 context.write(key, this.outputValue);
     90             }
     91         }
     92     }
     93     
     94     static class NLineInputFormatReducer extends Reducer<LongWritable, Text, Text, NullWritable> {
     95         
     96         private Text outputKey;
     97         private NullWritable outputValue;
     98         
     99         @Override
    100         protected void setup(Context context)
    101                 throws IOException, InterruptedException {
    102             this.outputKey = new Text();
    103             this.outputValue = NullWritable.get();
    104         }
    105         
    106         @Override
    107         protected void reduce(LongWritable key, Iterable<Text> value, Context context)
    108                 throws IOException, InterruptedException {
    109             outputKey.set(value.iterator().next());
    110             context.write(this.outputKey, outputValue);
    111         }
    112     }
    113     
    114     @Override
    115     public int run(String[] args) throws Exception {
    116         Job job = Job.getInstance(getConf(), NLineInputFormatApp.class.getSimpleName());
    117         job.setJarByClass(NLineInputFormatApp.class);
    118         
    119         NLineInputFormat.setNumLinesPerSplit(job, Integer.parseInt(args[0]));
    120         job.setInputFormatClass(NLineInputFormat.class);
    121         FileInputFormat.addInputPath(job, new Path(args[1]));
    122         FileOutputFormat.setOutputPath(job, new Path(args[2]));
    123         
    124         job.setMapperClass(NLineInputFormatMapper.class);
    125         job.setMapOutputKeyClass(LongWritable.class);
    126         job.setMapOutputValueClass(Text.class);
    127         
    128         job.setReducerClass(NLineInputFormatReducer.class);
    129         job.setOutputKeyClass(Text.class);
    130         job.setOutputValueClass(NullWritable.class);
    131         
    132         return job.waitForCompletion(true)?0:1;
    133     }
    134 
    135     public static int createJob(String[] args) {
    136         Configuration conf = new Configuration();
    137         conf.set("dfs.datanode.socket.write.timeout", "7200000");
    138         conf.set("mapreduce.input.fileinputformat.split.minsize", "268435456");
    139         conf.set("mapreduce.input.fileinputformat.split.maxsize", "536870912");
    140         int status = 0;
    141         
    142         try {
    143             status = ToolRunner.run(conf, new NLineInputFormatApp(), args);
    144         } catch (Exception e) {
    145             e.printStackTrace();
    146         }
    147         
    148         return status;
    149     }
    150     
    151     public static void main(String[] args) {
    152         args = new String[]{"4", "/mapreduces/bookOutline.txt", "/mapreduces/nlineinputformat"};
    153         if (args.length!=3) {
    154             System.out.println("Usage: "+NLineInputFormatApp.class.getName()+" Input paramters <LINE_NUMBER> <INPUT_PATH> <OUTPUT_PATH>");
    155         } else {
    156             int status = createJob(args);
    157             System.exit(status);
    158         }
    159     }
    160 
    161 }
  • 相关阅读:
    SSH Secure File Transfer上传文件错误:encountered 1 errors during the transfer解决办法
    出现Unrecognized field "state" (class com.jt.manage.pojo.ItemCat)异常
    (error) DENIED Redis is running in protected mode because protected mode is enabled
    错误:在maven install是抛出 “1.5不支持diamond运算符,请使用source 7或更高版本以启用diamond运算符”
    Java实现POI读取Excel文件,兼容后缀名xls和xlsx
    数据库的主从复制常用Xshell命令
    Linux搭建主从数据库服务器(主从复制)
    项目数据库备份
    Entity Frameword 查询 sql func linq 对比
    jq 选择器基础及拓展
  • 原文地址:https://www.cnblogs.com/mengyao/p/4865579.html
Copyright © 2011-2022 走看看