Hadoop基础-Map端链式编程之MapReduce统计TopN示例
作者:尹正杰
版权声明:原创作品,谢绝转载!否则将追究法律责任。
一.项目需求
对“temp.txt”中的数据进行分析,统计出各个年份(第15~19列)总排行前十的最高气温(第87~92列),由于博客园无法上传大文件的文本,因此我把该文本的内容放在博客园的另一个链接了(需要的戳我)。,如果网页打不开的话也就可以去百度云盘里下载副本,链接:链接:https://pan.baidu.com/s/12aZFcO2XoegUGMAbS--n6Q 密码:7n91。
二.代码实现
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.mrchain; 7 8 import org.apache.hadoop.io.WritableComparable; 9 10 import java.io.DataInput; 11 import java.io.DataOutput; 12 import java.io.IOException; 13 14 public class CompKey implements WritableComparable<CompKey> { 15 16 private String year; 17 private int temp; 18 /** 19 * 重写CompKey对年份和气温排序 20 */ 21 public int compareTo(CompKey o) { 22 if(this.getYear().equals(o.getYear())){ 23 return o.getTemp() - this.getTemp(); 24 } 25 return this.getYear().compareTo(o.getYear()); 26 27 } 28 29 public void write(DataOutput out) throws IOException { 30 out.writeUTF(year); 31 out.writeInt(temp); 32 33 } 34 35 public void readFields(DataInput in) throws IOException { 36 year = in.readUTF(); 37 temp = in.readInt(); 38 39 } 40 41 public String getYear() { 42 return year; 43 } 44 45 public void setYear(String year) { 46 this.year = year; 47 } 48 49 public int getTemp() { 50 return temp; 51 } 52 53 public void setTemp(int temp) { 54 this.temp = temp; 55 } 56 57 @Override 58 public String toString() { 59 return year + ' ' +temp ; 60 } 61 }
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.mrchain; 7 8 import org.apache.hadoop.io.WritableComparable; 9 import org.apache.hadoop.io.WritableComparator; 10 11 public class MyGroupComparator extends WritableComparator { 12 13 public MyGroupComparator() { 14 super(CompKey.class,true); 15 } 16 17 public int compare(WritableComparable a, WritableComparable b) { 18 CompKey ck1 = (CompKey) a; 19 CompKey ck2 = (CompKey) b; 20 return ck1.getYear().compareTo(ck2.getYear()); 21 } 22 }
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.mrchain; 7 8 import org.apache.hadoop.io.IntWritable; 9 import org.apache.hadoop.io.LongWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.mapreduce.Mapper; 12 13 import java.io.IOException; 14 15 public class ChainMapper1 extends Mapper<LongWritable, Text, Text, IntWritable> { 16 17 @Override 18 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 19 20 String line = value.toString(); 21 22 //得到年份 23 String year = line.substring(15, 19); 24 25 //得到气温 26 int temp = Integer.parseInt(line.substring(87, 92)); 27 28 context.write(new Text(year), new IntWritable(temp)); 29 30 } 31 }
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.mrchain; 7 8 import org.apache.hadoop.io.IntWritable; 9 import org.apache.hadoop.io.NullWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.mapreduce.Mapper; 12 13 import java.io.IOException; 14 15 public class ChainMapper2 extends Mapper<Text,IntWritable,CompKey,NullWritable> { 16 17 @Override 18 protected void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException { 19 20 int i = value.get(); 21 22 if( i != 9999){ 23 CompKey ck = new CompKey(); 24 ck.setYear(key.toString()); 25 ck.setTemp(i); 26 context.write(ck,NullWritable.get()); 27 } 28 } 29 }
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.mrchain; 7 8 import org.apache.hadoop.io.IntWritable; 9 import org.apache.hadoop.io.NullWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.mapreduce.Reducer; 12 13 import java.io.IOException; 14 import java.util.Iterator; 15 16 public class ChainReducer1 extends Reducer<CompKey, NullWritable, Text, IntWritable> { 17 18 //由于分组对比器设定,相同的year放在一个分组,因此,在一个reduce循环中,得到的数据均为同一年份的数据 19 protected void reduce(CompKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { 20 String year = key.getYear(); 21 Iterator<NullWritable> it = values.iterator(); 22 int i = 0; 23 while (it.hasNext()){ 24 System.out.println(key.toString()); 25 int temp = key.getTemp(); 26 context.write(new Text(year), new IntWritable(temp)); 27 it.next(); 28 i++; 29 if(i >= 10){ 30 break; 31 } 32 } 33 } 34 }
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.mrchain; 7 8 import org.apache.hadoop.io.IntWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Mapper; 11 12 import java.io.IOException; 13 14 public class ChainReducer2 extends Mapper<Text, IntWritable, Text,IntWritable> { 15 protected void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException { 16 int temp = value.get(); 17 //取得奇数气温 18 if( temp % 2 == 1 ){ 19 context.write(key, new IntWritable(temp)); 20 } 21 22 } 23 }
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.mrchain; 7 8 import org.apache.hadoop.conf.Configuration; 9 import org.apache.hadoop.fs.FileSystem; 10 import org.apache.hadoop.fs.Path; 11 import org.apache.hadoop.io.IntWritable; 12 import org.apache.hadoop.io.LongWritable; 13 import org.apache.hadoop.io.NullWritable; 14 import org.apache.hadoop.io.Text; 15 import org.apache.hadoop.mapreduce.Job; 16 import org.apache.hadoop.mapreduce.lib.chain.ChainMapper; 17 import org.apache.hadoop.mapreduce.lib.chain.ChainReducer; 18 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 19 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 20 21 public class ChainApp { 22 23 24 public static void main(String[] args) throws Exception { 25 26 Configuration conf = new Configuration(); 27 conf.set("fs.defaultFS","file:///"); 28 29 Job job = Job.getInstance(conf); 30 31 FileSystem fs = FileSystem.get(conf); 32 33 job.setJobName("Chain"); 34 35 job.setJarByClass(ChainApp.class); 36 //在MapChain中,一个Map后面可以跟n多Map 37 ChainMapper.addMapper(job,ChainMapper1.class,LongWritable.class, Text.class, 38 Text.class, IntWritable.class,conf); 39 40 ChainMapper.addMapper(job,ChainMapper2.class,Text.class,IntWritable.class, 41 CompKey.class,NullWritable.class,conf); 42 43 //在ReduceChain中,一个Reduce中不能跟reduce,只能跟map 44 ChainReducer.setReducer(job,ChainReducer1.class,CompKey.class,NullWritable.class, 45 Text.class,IntWritable.class,conf); 46 47 ChainReducer.addMapper(job,ChainReducer2.class, Text.class, IntWritable.class, 48 Text.class,IntWritable.class, conf); 49 50 job.setGroupingComparatorClass(MyGroupComparator.class); 51 52 //判断是否存在,如果存在则删除 53 Path outPath = new Path("D:\10.Java\IDE\yhinzhengjieData\MyHadoop\out"); 54 if(fs.exists(outPath)){ 55 fs.delete(outPath,true); 56 } 57 58 //输入路径 59 FileInputFormat.addInputPath(job,new Path("D:\10.Java\IDE\yhinzhengjieData\MyHadoop\temp")); 60 61 //输出路径 62 FileOutputFormat.setOutputPath(job,outPath); 63 64 job.waitForCompletion(true); 65 } 66 }