MyMap
1 package s26; 2 3 import org.apache.hadoop.io.IntWritable; 4 import org.apache.hadoop.io.LongWritable; 5 import org.apache.hadoop.io.Text; 6 import org.apache.hadoop.mapreduce.Mapper; 7 8 import java.io.IOException; 9 10 11 public class MyMap extends Mapper<LongWritable,Text,Text,IntWritable> { 12 13 @Override 14 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 15 //super.map(key, value, context); 16 Text result = new Text(); 17 IntWritable one = new IntWritable(1); 18 19 String line = value.toString(); 20 String[] words = line.split(" "); 21 22 for (String w:words) { 23 result.set(w); 24 context.write(result,one); 25 } 26 27 } 28 }
MyRed
1 package s26; 2 3 import org.apache.hadoop.io.IntWritable; 4 import org.apache.hadoop.io.Text; 5 import org.apache.hadoop.mapreduce.Reducer; 6 7 import java.io.IOException; 8 9 public class MyRed extends Reducer<Text,IntWritable,Text,IntWritable> { 10 11 @Override 12 protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 13 //super.reduce(key, values, context); 14 int sum = 0; 15 for(IntWritable i:values){ 16 sum += i.get(); 17 } 18 context.write(key,new IntWritable(sum)); 19 20 } 21 }
MyJob
1 package s26; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.FileSystem; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.io.IntWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Job; 9 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 11 12 import java.io.IOException; 13 14 public class MyJob { 15 16 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 17 18 //1.conf 19 Configuration conf = new Configuration(); 20 conf.set("fs.defaultFS","hdfs://master:9000"); 21 22 //2.job 23 Job job = Job.getInstance(conf); 24 // jar-package 25 job.setJarByClass(MyJob.class); 26 // 27 job.setMapperClass(MyMap.class); 28 // 29 job.setReducerClass(MyRed.class); 30 job.setOutputKeyClass(Text.class); 31 job.setOutputValueClass(Integer.class); 32 33 job.setMapOutputKeyClass(Text.class); 34 job.setMapOutputValueClass(IntWritable.class); 35 job.setOutputKeyClass(Text.class); 36 job.setOutputValueClass(IntWritable.class); 37 //3.io 38 Path pin = new Path("/sjw"); 39 Path pout = new Path("/out"); 40 // 41 FileSystem fs = FileSystem.get(conf); 42 if(fs.exists(pout)){ 43 fs.delete(pout,true); 44 } 45 FileInputFormat.setInputPaths(job,pin); 46 FileOutputFormat.setOutputPath(job,pout); 47 48 //4.run 49 job.waitForCompletion(true); 50 51 52 } 53 }