1 package com.mengyao.hadoop.mapreduce; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.conf.Configured; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.NullWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.Mapper; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 17 import org.apache.hadoop.util.Tool; 18 import org.apache.hadoop.util.ToolRunner; 19 20 /** 21 * NLineInputFormat前面的N表示每个Mapper收到输入的行数,N的默认输入行数是1。mapreduce.input.lineinputformat.linespermap属性实现N值的设定。如果希望使Mapper收到固定行数的输入可以使用该类实现。 22 * 与TextInputFormat相同,key是文件中行的字节偏移量,value是行本身。 23 * 通常情况下,对少量的输入行执行map任务是比较低效的(任务初始化的额外开销导致)。 24 * 25 * 使用NLineInputFormat设置Mapper任务每次输入处理4行,此处应用场景为获取图书大纲,找出所有的一级索引,读取输入HDFS目录下的文件/mapreduces/bookOutline.txt,内容如下: 26 * 第1章 PostgresQL服务器简介 27 * 1.1 为什么在服务器中进行程序设计 28 * 1.2 关于本书的代码示例 29 * 1.3 超越简单函数 30 * 1.4 使用触发器管理相关数据 31 * 1.5 审核更改 32 * 1.6 数据清洗 33 * 1.7 定制排序方法 34 * 1.8 程序设计最佳实践 35 * 1.8.1 KISS——尽量简单(keep it simple stupid) 36 * 1.8.2 DRY——不要写重复的代码(don't repeat yourself) 37 * 1.8.3 YAGNI——你并不需要它(you ain'tgonnaneedit) 38 * 1.8.4 SOA——服务导向架构(service-oriented architecture) 39 * 1.8.5 类型的扩展 40 * 1.9 关于缓存 41 * 1.10 总结——为什么在服务器中进行程序设计 42 * 1.10.1 性能 43 * 1.10.2 易于维护 44 * 1.10.3 保证安全的简单方法 45 * 1.11 小结 46 * 第2章 服务器程序设计环境 47 * 2.1 购置成本 48 * 2.2 开发者的可用性 49 * 2.3 许可证书 50 * 2.4 可预测性 51 * 2.5 社区 52 * 2.6 过程化语言 53 * 2.6.1 平台兼容性 54 * 2.6.2 应用程序设计 55 * 2.6.3 更多基础 56 * 2.7 小结 57 * 第3章 第一个PL/pgsQL函数 58 * 3.1 为什么是PL/pgSQL 59 * 3.2 PL/pgSQL函数的结构 60 * ... 61 * 62 * 输出到HDFS目录下的文件/mapreduces/nlineinputformat/part-r-00000,内容如下: 63 * 第1章 PostgresQL服务器简介 64 * 第2章 服务器程序设计环境 65 * 第3章 第一个PL/pgsQL函数 66 * 67 * @author mengyao 68 * 69 */ 70 public class NLineInputFormatApp extends Configured implements Tool { 71 72 static class NLineInputFormatMapper extends Mapper<LongWritable, Text, LongWritable, Text> { 73 74 private Text outputValue; 75 76 @Override 77 protected void setup(Context context) 78 throws IOException, InterruptedException { 79 this.outputValue = new Text(); 80 } 81 82 @Override 83 protected void map(LongWritable key, Text value, Context context) 84 throws IOException, InterruptedException { 85 final String line = value.toString(); 86 //如果行第一个字是“第”则认为是一级索引 87 if (line.startsWith("第")) { 88 outputValue.set(line); 89 context.write(key, this.outputValue); 90 } 91 } 92 } 93 94 static class NLineInputFormatReducer extends Reducer<LongWritable, Text, Text, NullWritable> { 95 96 private Text outputKey; 97 private NullWritable outputValue; 98 99 @Override 100 protected void setup(Context context) 101 throws IOException, InterruptedException { 102 this.outputKey = new Text(); 103 this.outputValue = NullWritable.get(); 104 } 105 106 @Override 107 protected void reduce(LongWritable key, Iterable<Text> value, Context context) 108 throws IOException, InterruptedException { 109 outputKey.set(value.iterator().next()); 110 context.write(this.outputKey, outputValue); 111 } 112 } 113 114 @Override 115 public int run(String[] args) throws Exception { 116 Job job = Job.getInstance(getConf(), NLineInputFormatApp.class.getSimpleName()); 117 job.setJarByClass(NLineInputFormatApp.class); 118 119 NLineInputFormat.setNumLinesPerSplit(job, Integer.parseInt(args[0])); 120 job.setInputFormatClass(NLineInputFormat.class); 121 FileInputFormat.addInputPath(job, new Path(args[1])); 122 FileOutputFormat.setOutputPath(job, new Path(args[2])); 123 124 job.setMapperClass(NLineInputFormatMapper.class); 125 job.setMapOutputKeyClass(LongWritable.class); 126 job.setMapOutputValueClass(Text.class); 127 128 job.setReducerClass(NLineInputFormatReducer.class); 129 job.setOutputKeyClass(Text.class); 130 job.setOutputValueClass(NullWritable.class); 131 132 return job.waitForCompletion(true)?0:1; 133 } 134 135 public static int createJob(String[] args) { 136 Configuration conf = new Configuration(); 137 conf.set("dfs.datanode.socket.write.timeout", "7200000"); 138 conf.set("mapreduce.input.fileinputformat.split.minsize", "268435456"); 139 conf.set("mapreduce.input.fileinputformat.split.maxsize", "536870912"); 140 int status = 0; 141 142 try { 143 status = ToolRunner.run(conf, new NLineInputFormatApp(), args); 144 } catch (Exception e) { 145 e.printStackTrace(); 146 } 147 148 return status; 149 } 150 151 public static void main(String[] args) { 152 args = new String[]{"4", "/mapreduces/bookOutline.txt", "/mapreduces/nlineinputformat"}; 153 if (args.length!=3) { 154 System.out.println("Usage: "+NLineInputFormatApp.class.getName()+" Input paramters <LINE_NUMBER> <INPUT_PATH> <OUTPUT_PATH>"); 155 } else { 156 int status = createJob(args); 157 System.exit(status); 158 } 159 } 160 161 }