.xml
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 7 <groupId>cn.siit</groupId> 8 <artifactId>topn</artifactId> 9 <version>1.0-SNAPSHOT</version> 10 11 <dependencies> 12 <dependency> 13 <groupId>org.apache.hadoop</groupId> 14 <artifactId>hadoop-common</artifactId> 15 <version>2.7.4</version> 16 </dependency> 17 18 <dependency> 19 <groupId>org.apache.hadoop</groupId> 20 <artifactId>hadoop-hdfs</artifactId> 21 <version>2.7.4</version> 22 </dependency> 23 24 <dependency> 25 <groupId>org.apache.hadoop</groupId> 26 <artifactId>hadoop-mapreduce-client-core</artifactId> 27 <version>2.7.4</version> 28 </dependency> 29 30 <dependency> 31 <groupId>org.apache.hadoop</groupId> 32 <artifactId>hadoop-mapreduce-client-jobclient</artifactId> 33 <version>2.7.4</version> 34 </dependency> 35 36 </dependencies> 37 </project>
MySort
1 package s26; 2 3 import org.apache.hadoop.io.IntWritable; 4 5 public class MySort extends IntWritable.Comparator { 6 7 @Override 8 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 9 return -super.compare(b1, s1, l1, b2, s2, l2); 10 } 11 }
MyMap
1 package s26; 2 3 4 import org.apache.hadoop.io.IntWritable; 5 import org.apache.hadoop.io.LongWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Mapper; 8 9 import java.io.IOException; 10 11 public class MyMap extends Mapper<LongWritable,Text,IntWritable,Text> { 12 13 IntWritable ha_numReal = new IntWritable(); 14 Text ha_name = new Text(); 15 @Override 16 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 17 super.map(key, value, context); 18 //1.hadoop --> java 19 String line = value.toString(); 20 //2.calc 21 String[] info = line.split(" "); 22 String name = info[0]; 23 String num = info[1]; 24 25 // 2.1 String --> int 26 int numReal = Integer.parseInt(num); 27 28 //3.java --> hadoop 29 ha_numReal.set(numReal); 30 ha_name.set(name); 31 32 context.write(ha_numReal,ha_name); 33 } 34 }
MyRed
1 package s26; 2 3 import org.apache.hadoop.io.IntWritable; 4 import org.apache.hadoop.io.Text; 5 import org.apache.hadoop.mapreduce.Reducer; 6 7 import java.io.IOException; 8 9 public class MyRed extends Reducer<IntWritable,Text,IntWritable,Text> { 10 11 int count = 0; 12 13 @Override 14 protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 15 super.reduce(key, values, context); 16 for (Text t: 17 values) { 18 // 1. if >= 3 19 if(count>=3){ 20 break; 21 } 22 context.write(key,t); 23 count++; 24 } 25 } 26 }
MyJob
1 package s26; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.FileSystem; 5 import org.apache.hadoop.fs.Path; 6 7 import org.apache.hadoop.io.IntWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 12 13 import java.io.IOException; 14 15 public class MyJob { 16 17 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 18 19 //1.conf 20 Configuration conf = new Configuration(); 21 conf.set("fs.defaultFS","hdfs://master:9000"); 22 23 //2.job 24 Job job = Job.getInstance(conf); 25 // jar-package 26 job.setSortComparatorClass(MySort.class); 27 job.setJarByClass(MyJob.class); 28 job.setMapperClass(MyMap.class); 29 job.setReducerClass(MyRed.class); 30 // 31 job.setOutputKeyClass(IntWritable.class); 32 job.setOutputValueClass(Text.class); 33 //3.io 34 Path pin = new Path("/sjw"); 35 Path pout = new Path("/out"); 36 //delete out 37 FileSystem fs = FileSystem.get(conf); 38 if(fs.exists(pout)){ 39 fs.delete(pout,true); 40 } 41 FileInputFormat.setInputPaths(job,pin); 42 FileOutputFormat.setOutputPath(job,pout); 43 //4.run 44 job.waitForCompletion(true); 45 } 46 }