这篇博客,给大家,体会不一样的版本编程。
代码
1 package zhouls.bigdata.myMapReduce.wordcount3; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Mapper; 9 import org.apache.hadoop.util.StringUtils; 10 11 public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ 12 13 //该方法循环调用,从文件的split中读取每行调用一次,把该行所在的下标为key,该行的内容为value 14 protected void map(LongWritable key, Text value, 15 Context context) 16 throws IOException, InterruptedException { 17 String[] words = StringUtils.split(value.toString(), ' '); 18 for(String w :words){ 19 context.write(new Text(w), new IntWritable(1)); 20 } 21 } 22 }
1 package zhouls.bigdata.myMapReduce.wordcount3; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Reducer; 8 9 public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ 10 11 //每组调用一次,这一组数据特点:key相同,value可能有多个。 12 protected void reduce(Text arg0, Iterable<IntWritable> arg1, 13 Context arg2) 14 throws IOException, InterruptedException { 15 int sum =0; 16 for(IntWritable i: arg1){ 17 sum=sum+i.get(); 18 } 19 arg2.write(arg0, new IntWritable(sum)); 20 } 21 }
1 package zhouls.bigdata.myMapReduce.wordcount3; 2 3 4 import org.apache.hadoop.conf.Configuration; 5 import org.apache.hadoop.fs.FileSystem; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.IntWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 12 13 public class RunJob { 14 15 public static void main(String[] args) { 16 Configuration config =new Configuration(); 17 18 try { 19 FileSystem fs =FileSystem.get(config); 20 21 Job job =Job.getInstance(config); 22 job.setJarByClass(RunJob.class); 23 24 job.setJobName("wc"); 25 26 job.setMapperClass(WordCountMapper.class); 27 job.setReducerClass(WordCountReducer.class); 28 29 job.setMapOutputKeyClass(Text.class); 30 job.setMapOutputValueClass(IntWritable.class); 31 32 FileInputFormat.addInputPath(job, new Path("./data/wc.txt")); 33 34 Path outpath =new Path("./out/WordCountout"); 35 if(fs.exists(outpath)){ 36 fs.delete(outpath, true); 37 } 38 FileOutputFormat.setOutputPath(job, outpath); 39 40 boolean f= job.waitForCompletion(true); 41 if(f){ 42 System.out.println("job任务执行成功"); 43 } 44 } catch (Exception e) { 45 e.printStackTrace(); 46 } 47 } 48 }