.xml
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 7 <groupId>cn.siit</groupId> 8 <artifactId>averagescore</artifactId> 9 <version>1.0-SNAPSHOT</version> 10 11 <dependencies> 12 <dependency> 13 <groupId>org.apache.hadoop</groupId> 14 <artifactId>hadoop-common</artifactId> 15 <version>2.7.4</version> 16 </dependency> 17 18 <dependency> 19 <groupId>org.apache.hadoop</groupId> 20 <artifactId>hadoop-hdfs</artifactId> 21 <version>2.7.4</version> 22 </dependency> 23 24 <dependency> 25 <groupId>org.apache.hadoop</groupId> 26 <artifactId>hadoop-mapreduce-client-core</artifactId> 27 <version>2.7.4</version> 28 </dependency> 29 30 <dependency> 31 <groupId>org.apache.hadoop</groupId> 32 <artifactId>hadoop-mapreduce-client-jobclient</artifactId> 33 <version>2.7.4</version> 34 </dependency> 35 36 </dependencies> 37 </project>
MyMap
1 package s26; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.LongWritable; 6 import org.apache.hadoop.io.FloatWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Mapper; 9 public class MyMap extends Mapper<LongWritable, Text, Text, FloatWritable> { 10 Text courseID = new Text(); 11 FloatWritable courseScore = new FloatWritable(); 12 13 @Override 14 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 15 16 String line = value.toString(); 17 String[] record = line.split(" "); 18 String course = record[0]; 19 float score = Float.parseFloat(record[1]); 20 21 courseID.set(course); 22 courseScore.set(score); 23 context.write(courseID,courseScore); 24 } 25 }
MyRed
1 package s26; 2 3 import java.io.IOException; 4 import org.apache.hadoop.io.FloatWritable; 5 import org.apache.hadoop.io.Text; 6 import org.apache.hadoop.mapreduce.Reducer; 7 public class MyRed extends Reducer<Text, FloatWritable, Text, FloatWritable> { 8 float sum = 0; 9 int count = 0; 10 float avg = 0; 11 FloatWritable result = new FloatWritable(); 12 @Override 13 protected void reduce(Text key, Iterable<FloatWritable> values, Context context) throws IOException, InterruptedException { 14 15 for(FloatWritable f:values){ 16 sum =sum +f.get(); 17 count++; 18 } 19 avg = sum/count; 20 result.set(avg); 21 context.write(key,result); 22 } 23 }
MyJob
1 package s26; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.FileSystem; 5 import org.apache.hadoop.fs.Path; 6 7 import org.apache.hadoop.io.FloatWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 12 13 import java.io.IOException; 14 15 public class MyJob { 16 17 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 18 19 //1.conf 20 Configuration conf = new Configuration(); 21 conf.set("fs.defaultFS","hdfs://master:9000"); 22 23 //2.job 24 Job job = Job.getInstance(conf); 25 // jar-package 26 job.setJarByClass(MyJob.class); 27 job.setMapperClass(MyMap.class); 28 job.setReducerClass(MyRed.class); 29 // 30 job.setOutputKeyClass(Text.class); 31 job.setOutputValueClass(FloatWritable.class); 32 //3.io 33 Path pin = new Path("/sjw"); 34 Path pout = new Path("/out"); 35 // 36 FileSystem fs = FileSystem.get(conf); 37 if(fs.exists(pout)){ 38 fs.delete(pout,true); 39 } 40 FileInputFormat.setInputPaths(job,pin); 41 FileOutputFormat.setOutputPath(job,pout); 42 //4.run 43 job.waitForCompletion(true); 44 } 45 }
hadoop jar /root/IdeaProjects/averagescore/out/artifacts/averagescore_jar/averagescore.jar