1. 任务计数器
任务计数器由关联任务维护,并定期发送给tasktracker(YARN中为nodemanager),再由tasktracker发送给jobtracker(YARN中为application master)。因此,计数器能够被全局地聚集。任务计数器的值每次都是完整传输的,而非自上次传输之后再继续尚未完成的传输,从而避免由于消息丢失而引发的错误。另外,如果一个任务在作业执行期间失败,则相关计数器的值会减小。
2. 作业计数器
作业计数器由jobtracker(YARN中的application master)维护,因此无需再网络间传输数据,这一点与包括“用户定义的计数器”在内的其他计数器不同。这些计数器都是作业级别的统计量,其值不会随着任务运行而改变。例如,TOTAL_LAUNCHED_MAPS统计在作业执行过程中启动的map任务数,包括失败的map任务。
package com.zhen.mapreduce.counter; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * @author FengZhen * @date 2018年8月29日 * 计数器,统计输入文件中空白的行数 */ public class SimpleCounterTest extends Configured implements Tool{ enum Empty{ EMPTY, NOT_EMPTY } static class SimpleCounterMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String line = value.toString(); if (line.equals("")) { context.getCounter(Empty.EMPTY).increment(1); }else { context.getCounter(Empty.NOT_EMPTY).increment(1); } context.write(value, new IntWritable(1)); } } static class SimpleCounterReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable intWritable : values) { sum += intWritable.get(); } Counter empty = context.getCounter(Empty.EMPTY); Counter not_empty = context.getCounter(Empty.NOT_EMPTY); System.out.println("empty:"+empty.getValue() + "----not_empty:"+not_empty.getValue()); context.write(key, new IntWritable(sum)); } } public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJobName("SimpleCounterTest"); job.setJarByClass(SimpleCounterTest.class); job.setMapperClass(SimpleCounterMapper.class); job.setReducerClass(SimpleCounterReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // job.setInputFormatClass(FileInputFormat.class); // job.setOutputFormatClass(FileOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { String[] params = new String[]{"hdfs://fz/user/hdfs/MapReduce/data/counter/containsEmpty/input","hdfs://fz/user/hdfs/MapReduce/data/counter/containsEmpty/output"}; int exitCode = ToolRunner.run(new SimpleCounterTest(), params); System.exit(exitCode); } }
scp /Users/FengZhen/Desktop/Hadoop/file/SimpleCounter.jar root@ hadoop jar SimpleCounter.jar com.zhen.mapreduce.counter.SimpleCounterTest
[root@HDP4 mr]# hadoop jar SimpleCounter.jar com.zhen.mapreduce.counter.SimpleCounterTest 18/09/08 20:15:44 INFO client.RMProxy: Connecting to ResourceManager at HDP4/ 18/09/08 20:15:46 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 18/09/08 20:15:47 INFO input.FileInputFormat: Total input paths to process : 1 18/09/08 20:15:47 INFO mapreduce.JobSubmitter: number of splits:1 18/09/08 20:15:48 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1535207597429_0006 18/09/08 20:15:51 INFO impl.YarnClientImpl: Submitted application application_1535207597429_0006 18/09/08 20:15:52 INFO mapreduce.Job: The url to track the job: http://HDP4:8088/proxy/application_1535207597429_0006/ 18/09/08 20:15:52 INFO mapreduce.Job: Running job: job_1535207597429_0006 18/09/08 20:16:09 INFO mapreduce.Job: Job job_1535207597429_0006 running in uber mode : false 18/09/08 20:16:09 INFO mapreduce.Job: map 0% reduce 0% 18/09/08 20:16:22 INFO mapreduce.Job: map 100% reduce 0% 18/09/08 20:16:34 INFO mapreduce.Job: map 100% reduce 100% 18/09/08 20:16:34 INFO mapreduce.Job: Job job_1535207597429_0006 completed successfully 18/09/08 20:16:34 INFO mapreduce.Job: Counters: 51 File System Counters FILE: Number of bytes read=78 FILE: Number of bytes written=298025 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=174 HDFS: Number of bytes written=28 HDFS: Number of read operations=6 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=1 Launched reduce tasks=1 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=10609 Total time spent by all reduces in occupied slots (ms)=8008 Total time spent by all map tasks (ms)=10609 Total time spent by all reduce tasks (ms)=8008 Total vcore-milliseconds taken by all map tasks=10609 Total vcore-milliseconds taken by all reduce tasks=8008 Total megabyte-milliseconds taken by all map tasks=10863616 Total megabyte-milliseconds taken by all reduce tasks=8200192 Map-Reduce Framework Map input records=9 Map output records=9 Map output bytes=70 Map output materialized bytes=74 Input split bytes=141 Combine input records=0 Combine output records=0 Reduce input groups=4 Reduce shuffle bytes=74 Reduce input records=9 Reduce output records=4 Spilled Records=18 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=261 CPU time spent (ms)=5410 Physical memory (bytes) snapshot=499347456 Virtual memory (bytes) snapshot=5458706432 Total committed heap usage (bytes)=361893888 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 com.zhen.mapreduce.counter.SimpleCounterTest$Empty EMPTY=4 NOT_EMPTY=5 File Input Format Counters Bytes Read=33 File Output Format Counters Bytes Written=28