自定义计数器的使用(记录敏感单词)
1 package counter; 2 3 import java.net.URI; 4 import org.apache.hadoop.conf.Configuration; 5 import org.apache.hadoop.fs.FileSystem; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Counter; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.Reducer; 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 14 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 17 18 public class WordCountApp { 19 static final String INPUT_PATH = "hdfs://chaoren:9000/hello"; 20 static final String OUT_PATH = "hdfs://chaoren:9000/out"; 21 22 public static void main(String[] args) throws Exception { 23 Configuration conf = new Configuration(); 24 FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); 25 Path outPath = new Path(OUT_PATH); 26 if (fileSystem.exists(outPath)) { 27 fileSystem.delete(outPath, true); 28 } 29 30 Job job = new Job(conf, WordCountApp.class.getSimpleName()); 31 32 // 1.1指定读取的文件位于哪里 33 FileInputFormat.setInputPaths(job, INPUT_PATH); 34 // 指定如何对输入的文件进行格式化,把输入文件每一行解析成键值对 35 //job.setInputFormatClass(TextInputFormat.class); 36 37 // 1.2指定自定义的map类 38 job.setMapperClass(MyMapper.class); 39 // map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略 40 //job.setOutputKeyClass(Text.class); 41 //job.setOutputValueClass(LongWritable.class); 42 43 // 1.3分区 44 //job.setPartitionerClass(org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class); 45 // 有一个reduce任务运行 46 //job.setNumReduceTasks(1); 47 48 // 1.4排序、分组 49 50 // 1.5归约 51 52 // 2.2指定自定义reduce类 53 job.setReducerClass(MyReducer.class); 54 // 指定reduce的输出类型 55 job.setOutputKeyClass(Text.class); 56 job.setOutputValueClass(LongWritable.class); 57 58 // 2.3指定写出到哪里 59 FileOutputFormat.setOutputPath(job, outPath); 60 // 指定输出文件的格式化类 61 //job.setOutputFormatClass(TextOutputFormat.class); 62 63 // 把job提交给jobtracker运行 64 job.waitForCompletion(true); 65 } 66 67 /** 68 * 69 * KEYIN 即K1 表示行的偏移量 70 * VALUEIN 即V1 表示行文本内容 71 * KEYOUT 即K2 表示行中出现的单词 72 * VALUEOUT 即V2 表示行中出现的单词的次数,固定值1 73 * 74 */ 75 static class MyMapper extends 76 Mapper<LongWritable, Text, Text, LongWritable> { 77 protected void map(LongWritable k1, Text v1, Context context) 78 throws java.io.IOException, InterruptedException { 79 /** 80 * 自定义计数器的使用 81 */ 82 Counter counter = context.getCounter("Sensitive Words", "hello");//自定义计数器名称Sensitive Words 83 String line = v1.toString(); 84 if(line.contains("hello")){ 85 counter.increment(1L);//记录敏感词汇hello的出现次数 86 } 87 String[] splited = line.split(" "); 88 for (String word : splited) { 89 context.write(new Text(word), new LongWritable(1)); 90 } 91 }; 92 } 93 94 /** 95 * KEYIN 即K2 表示行中出现的单词 96 * VALUEIN 即V2 表示出现的单词的次数 97 * KEYOUT 即K3 表示行中出现的不同单词 98 * VALUEOUT 即V3 表示行中出现的不同单词的总次数 99 */ 100 static class MyReducer extends 101 Reducer<Text, LongWritable, Text, LongWritable> { 102 protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, 103 Context ctx) throws java.io.IOException, 104 InterruptedException { 105 long times = 0L; 106 for (LongWritable count : v2s) { 107 times += count.get(); 108 } 109 ctx.write(k2, new LongWritable(times)); 110 }; 111 } 112 }
在eclipse中运行后,可以在控制台查看到结果: