zoukankan      html  css  js  c++  java
  • 9.1hadoop 内置计数器、自定义枚举计数器、Streaming计数器

    1.1  计数器

    计数器的作用是用来统计数量的,用于记录特定事件的次数,分为内置计数器、自定义java枚举计数器、自定义Stream计数器三大类。用于质量分析,或应用级统计。分析计数器的值比分析一堆日志更高效。

    计数器名称

    计数器介绍

    内置计数器

    Hadoop自带的计数器,有特定的计数器名称。例如统计输入、输出的记录数量,输入输出的字节数。

    自定义java枚举计数器

    用户自定义的枚举型计数器,用于统计用户的特殊要求的计数器,例如统计记录中无效记录的数量。

    自定义Streaming计数器

    通过向标准输出流发送特定格式的信息,来增加指定计数器的数值。Streaming是hadoop工具,用于执行非java的map和reduce作业。

    1.1.1         内置计数器

    组别

    名称/类别

    参考

    MapReduce任务计数器

    org.apache.hadoop.mapreduce.TaskCounter

    Map和reduce的任务统计

    文件系统计数器

    org.apache.hadoop.mapreduce.FiIeSystemCounter

    文件系统读取写入统计

    FiIeInputFormat计数器

    org.apache.hadoop.mapreduce.lib.input.FilelnputFormatCounter

    Map任务通过FilelnputForma读取数据的数量

    FiIeOutputFormat计数器

    org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter

    map任务(针对仅含map的作业)或者reduce任务通过FileOutputFormat写的字节数

    作业计数器

    org.apache.hadoop.mapreduce.JobCounter

    从作业的维度统计的数值

    (1)   任务计数器

    MAP_INPUT_RECORDS计数器统计map任务输入记录的总数,一个作业的所有map任务进行聚集,定期发送给application master,全量发送更新。可以统计任务输入、输出的记录数量,输入、输出的字节数,分片(split)的原始字节数(SPLIT_RAW_BYTES)等

    表内置的mapreduce任务计数器

    计数器名称

    说明

    map输人的记录数(MAP_INPUT_RECORDS)

    作业中所有map已处理的输人记录数。每次RecordReader读到一条记录并将其传给map的map()函数时,该计数器的值递增

    分片(split)的原始字节数(SPLIT_RAW_BYTES)

    由map读取的输人-分片对象的字节数。这些对象描述分片元数据(文件的位移和长度),而不是分片的数据自身,因此总规模是小的

    map输出的记录数(MAP_OUTPUT_RECORDS)

    作业中所有map产生的map输出记录数。每次某一个map
    的OutputCollector调用collect()方法时,该计数器的值增加

    map输出的字节数(MAP_OUTPUT_BYTES)

    作业中所有map产生的耒经压缩的输出数据的字节数·每次某一个map的OutputCollector调用collect()方法时,该计数器的值增加

    map输出的物化字节数(MAP_OUTPUT_MATERIALIZED_BYTES)

    map输出后确实写到磁盘上的字节数;若map输出压缩功能被启用,则会在计数器值上反映出来

    combine输人的记录数(COMBINE_INPUT_RECORDS)

    作业中所有combiner(如果有)已处理的输人记录数。combiner的迭代器每次读一个值,该计数器的值增加。注意:本计数器代表combiner已经处理的值的个数,并非不同的键组数(后者并无实所意文,因为对于combiner而言,并不要求每个键对应一个组,详情参见2.4.2节和7.3节

    combine输出的记录数(COMBINE_OUTPUT_RECORDS)

    作业中所有combiner(如果有)已产生的输出记录数。每当一个combiner的OutputCollector调用collect()方法时,该计数器的值增加

    reduce输人的组(REDUCE_INPUT_GROUPS)

    作业中所有reducer已经处理的不同的码分组的个数。每当某一个reducer的reduce()被调用时,该计数器的值增加

    reduce输人的记录数(REDUCE_INPUT_RECORDS)

    作业中所有reducer已经处理的输人记录的个数。每当某个reducer的迭代器读一个值时,该计数器的值增加。如果所有reducer已经处理数完所有输人,則该计数器的值与计数器"map输出的记录"的值相同

    reduce输出的记录数(REDUCE_OUTPUT_RECORDS)

    作业中所有map已经产生的reduce输出记录数。每当某个reducer的OutputCollector调用collect()方法时,该计数器的值增加

    reduce经过shuffle的字节数(REDUCE_SHUFFLE_BYTES)

    由shume复制到reducer的map输出的字节数

    溢出的记录数(SPILLED_RECORDS)

    作业中所有map和reduce任务溢出到磁的记录数

    CPU毫秒(CPU_MILLISECONDS)

    一个任务的总CPU时间,以毫秒为单位,可由/proc/cpuinfo获取

    物理内存字节数(PHYSICAL_MEMORY_BYTES)

    一个任务所用的物理内存,以字节数为单位,可 由/proc/meminfo获取

    虚拟内存字节数(VIRTUAL_MEMORY_BYTES)

    一个任务所用虚拟内存的字节数,由/proc/meminfo而'面获取

    有效的堆字节数(COMMITTED_HEAP_BYTES)

    在JVM中的总有效内存最(以字节为单位),可由Runtime.
    getRuntime().totalMemory()获取

    GC运行时间毫秒数(GC_TIME_MILLIS)

    在任务执行过程中,垃圾收集器(garbage collection)花费的时间(以毫秒为单位),可由GarbageCollector MXBean.
    getCollectionTime()获取

    由shuffle传输的map输出数(SHUFFLED_MAPS)

    由shume传输到reducer的map输出文件数,详情参见7.3节

    失敗的shuffle数(FAILED_SHUFFLE)

    shuffle过程中,发生map输出拷贝错误的次数

    被合并的map输出数(MERGED_MAP_OUTPUTS)

    shuffle过程中,在reduce端合并的map输出文件数

    内置的文件系统任务计数器

    计数器名称

    说明

    文件系统的读字节数(BYTES_READ)

    由map任务和reduce任务在各个文件系统中读取的字节数,各个文件系统分别对应一个计数器,文件系统可以是ocal、
    HDFS、S3等

    文件系统的写字节数(BYTES_WRITTEN)

    由map任务和reduce任务在各个文件系统中写的字节数

    文件系统读操作的数量(READ_OPS)

    由map任务和reduce任务在各个文件系统中进行的读操作的数量(例如,open操作,filestatus操作)

    文件系统大规模读操作的数最(LARGE_READ_OPS)

    由map和reduce任务在各个文件系统中进行的大规模读操作(例如,对于一个大容量目录进行list操作)的数

    文件系统写操作的数最(WRITE_OPS)

    由map任务和reduce任务在各个文件系统中进行的写操作的数量(例如,create操作,append操作)

    内置的FilelnputFormat计数器

    计数器名称

    说明

    读取的字节数(BYTES_READ)

    由map任务通过FilelnputFormat读取的字节数

    内置的FileOutputFormat任务计数器

    计数器名称

    说明

    写的字节数(BYTES_WRITTEN)

    由map任务(针对仅含map的作业)或者reduce任务通过FileOutputFormat写的字节数

    (2)   作业计数器

    作业计数器有application master维护,作业级别统计,值不会随着任务的执行而变化,例如TOTAL_LAUNCHED_MAPS统计作业执行的任务数。

    计数器名称

    说明

    启用的map任务数(TOTAL_LAUNCHED_MAPS)

    启动的map任务数,包括以“推测执行”方式启动的任务,详情参见7.4.2节

    启用的reduce任务数(TOTAL_LAUNCHED_REDUCES)

    启动的reduce任务数,包括以“推测执行”方式启动的任务

    启用的uber任务数(TOTAL_LAIÆHED_UBERTASKS)

    启用的uber任务数,详情参见7.1节

    uber任务中的map数(NUM_UBER_SUBMAPS)

    在uber任务中的map数

    Uber任务中的reduce数(NUM_UBER_SUBREDUCES)

    在任务中的reduce数

    失败的map任务数(NUM_FAILED_MAPS)

    失败的map任务数,用户可以参见7.2.1节对任务失败的讨论,了解失败原因

    失败的reduce任务数(NUM_FAILED_REDUCES)

    失败的reduce任务数

    失败的uber任务数(NIN_FAILED_UBERTASKS)

    失败的uber任务数

    被中止的map任务数(NUM_KILLED_MAPS)

    被中止的map任务数,可以参见7.2.1节对任务失败的讨论,了解中止原因

    被中止的reduce任务数(NW_KILLED_REDUCES)

    被中止的reduce任务数

    数据本地化的map任务数(DATA_LOCAL_MAPS)

    与输人数据在同一节点上的map任务数

    机架本地化的map任务数(RACK_LOCAL_MAPS)

    与输人数据在同一机架范围内但不在同一节点上的map任务数

    其他本地化的map任务数(OTHER_LOCAL_MAPS)

    与输人数据不在同一机架范围内的map任务数。山于机架之间的带宽资源相对较少,Hadoop会尽量让map任务靠近输人数据执行,因此该计数器值一般比较小。详情参见图2-2

    map任务的总运行时间(MILLIS_MAPS)

    map任务的总运行时间,单位毫秒。包括以推测执行方式启动的任务。可参见相关的度量内核和内存使用的计数器(VCORES_MILLIS_MAPS和MB_MILLIS_MAPS)

    reduce任务的总运行时间(MILLIS_REDUCES)

    reduce任务的总运行时间,单位毫秒。包括以推滌执行方式启动的任务。可参见相关的度量内核和内存使用的计数器(VCORES _MILLIS_REDUCES和MB_MILLIS_REDUCES)

    1.1.2         自定义java计数器

    计数器由java枚举类型来定义,以便进行分组,枚举名称即为组名,字段即为计数器,计数器为全局的,mapreduce框架跨所有map和reduce聚集这些计数器。

    import java.io.IOException;

    import java.util.Iterator;

    import org.apache.hadoop.conf.Configured;

    import org.apache.hadoop.io.*;

    import org.apache.hadoop.mapred.JobClient;

    import org.apache.hadoop.mapred.JobConf;

    import org.apache.hadoop.mapred.MapReduceBase;

    import org.apache.hadoop.mapred.Mapper;

    import org.apache.hadoop.mapred.OutputCollector;

    import org.apache.hadoop.mapred.Reducer;

    import org.apache.hadoop.mapred.Reporter;

    import org.apache.hadoop.util.Tool;

    import org.apache.hadoop.util.ToolRunner;

    //统计最高气温的作业。也统计气温值缺少的记录,不规范的记录

    public class MaxTemperatureWithCounters extends Configured implements Tool {

     //自定义计数器

           enum Temperature {

                  MiSSING, //统计气温缺失的记录

    MALFORMED//统计不规则的记录

           }

     //定义maper

           static class MaxTemeratureMapperWithCounters extends MapReduceBase implements

                         Mapper<LongWritable, Text, Text, IntWritable> {

                  private NcdcRecordParser parser = new NcdcRecordParser();

                  @Override

                  public void map(LongWritable key, Text value,

                                OutputCollector<Text, IntWritable> output, Reporter reporter)

                                throws IOException {

                         parser.parse(value);

                         if (parser.isValidTemperature()) {

                                int airTemperature = parser.getAirTemperature();

                                output.collect(new Text(parser.getYear()), new IntWritable(

                                              airTemperature));

                         } else if (parser.isMa1formedTemperature()) {

           //增加计数器的值                  //context.getCounter(Temperature.MALFORMED).increment(1);

                                // Reporter是MapReduce提供给应用程序的工具。可使用Reporter中的方法报告完成进度(progress)、设定状态消息(setStatus)以及更新计数器(incrCounter)

    reporter.incrCounter(Temperature.MALFORMED, 1);

                         } else if (parser.IsMissingTemperature()) {

    //context.getCounter(Temperature.MISSING).increment(1);

                                reporter.incrCounter(Temperature. MISSING, 1);

                         }

                        //动态计数器     context.getCounter(“TemperatureQuality”,parse.getQuality()).increment(1);

                  }

           }

     //定义reduce

           static class MaxTemperatureReduceWithCounters extends MapReduceBase implements

                         Reducer<Text, IntWritable, Text, IntWritable> {

                  public void reduce(Text key, Iterator<IntWritable> values,

                                OutputCollector<Text, IntWritable> output, Reporter reporter)

                                throws IOException {

                         int maxValue = Integer.MIN_VALUE;

                         while (values.hasNext()) {

                                maxValue = Math.max(maxValue, values.next().get());

                         }

                         output.collect(key, new IntWritable(maxValue));

                  }

           }

           @Override

           public int run(String[] args) throws Exception {

                  args = new String[] { "/test/input/t", "/test/output/t" }; // 给定输入输出路径

                  JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args);

                  if (conf == null) {

                         return -1;

                  }

                  conf.setOutputKeyClass(Text.class);

                  conf.setOutputValueClass(IntWritable.class);

                  conf.setMapperClass(MaxTemeratureMapperWithCounters.class);

                 conf.setCombinerClass(MaxTemperatureReduceWithCounters.class);

                  conf.setReducerClass(MaxTemperatureReduceWithCounters.class);

                  JobClient.runJob(conf);

                  return 0;

           }

           public static void main(String[] args) throws Exception {

                  int exitCode = ToolRunner.run(new MaxTemperatureWithCounters(), args);

                  System.exit(exitCode);

           }

    }

    执行任务,-counters参数,会输出所有计数器的值

    hadoop jar Hadoop-examples.jar MaxTemperatureWithCounter input/ncdc/all output -counters

    (1)动态计数器   

    动态计数器不像枚举型计数器需要提前定义组名和计数器类型,而是通过字符串名称动态的创建计数器。枚举类型计数器传入枚举类型也要转成String,所以两种方法时等价的,枚举型简单安全。

    context.getCounter(String groupName,String counterName);

    context.getCounter(“TemperatureQuality”,parse.getQuality()).increment(1);

    (2)获取计数器

    或者用javaAPI获取计数器值。根据配置信息创建cluster对象,根据jobid获取job,获取job的计数器,根据类路径获取计数器的值,counters.findCounter(MaxtemperatureCounters.Temperature.MISSING).getValue();方法。

    public class MissingTemperatureFields extends Configured implement Tool{

    @override

    public int run(String[] args)throws Exception{

                  if(args.length()!=1)

    {

           return -1;

    }

    String jobID=args[0];

    //根据配置信息创建cluster对象

    Cluster cluster =new Cluster(getConf());

    //根据jobid获取job

     Job job=cluster.getJob(JobID.forName(jobID));

    if(job==null)

    {

    System.err.printf(“NO job with ID %s”,jobID);

    return -1;

    }

    if(!job.isComplete())

    {

    System.err.printf(“job ID %s is not complete”,jobID);

    return -1;

    }

    获取job的计数器,

    Counters counters=job.getCounters();

    //根据路径获取计数器的值

    long missing=counters.findCounter(MaxtemperatureCounters.Temperature.MISSING).getValue();

    long total=counters.finCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();

    //计算任务的失败率

    System.out.printf(“Records missing present %.2f/%% ”,100.0*missing/total);

    }

    public static void main(String[] args)throw Exception{

           int exitCode=ToolRunner.run(new MissingTemperatureFields(),args);

           System.exit(exitCode);

    }

    }

    执行hadoop任务,传入jobid参数

    %hadoop jar hadoop-example.jar MissingTempratureFields job_11223131_0007

    1.1.3         用户自定义的Streaming计数器

    Hadoop streaming是hadoop的一个工具,用于运行非java的maper或reducer作业,例如maper和reducer是C++编写的可执行程序或者脚本文件。使用Streaming的mapreduce程序可以像保准错误流发送特殊格式的信息,增加计数器的值格式如下:reporter:counter:group.counter,amount

    Python实例如下

    sys.stderr.write(“reporter:counter:Temperature,Missing,1 ”);

    状态信息发送格式如下

    sys.stderr.write(“reporter:status:message”);

    1.1.4         获取计数器的方法汇总

    1)web界面查看计数器值

    2)命令行mapred job –counter查看计数器的值;

    3)动态计数器用context获取,context.getCounter (“TemperatureQuality”,parse.getQuality()).increment(1);

    4)用context的getCounter方法或者用reportor的incrCounter方法context.getCounter(Temperature.MISSING).increment(1);reporter.incrCounter(Temperature. MISSING, 1);

    5)Streaming程序,即非java的mapreduce程序,通过向标准输出发送固定格式的数据来增加计数器的值。sys.stderr.write(“reporter:counter:Temperature,Missing,1 ”);

    6)或者用javaAPI获取计数器值。根据配置信息创建cluster对象,根据jobid获取job,获取job的计数器,根据类路径获取计数器的值,counters.findCounter(MaxtemperatureCounters.Temperature.MISSING).getValue();

    自己开发了一个股票智能分析软件,功能很强大,需要的点击下面的链接获取:

    https://www.cnblogs.com/bclshuai/p/11380657.html

  • 相关阅读:
    使用Autofac动态注入启动Api服务
    RabbitMQ学习笔记(六) RPC
    RabbitMQ学习笔记(五) Topic
    RabbitMQ学习笔记(四) Routing
    RabbitMQ学习笔记(三) 发布与订阅
    Quartz.NET学习笔记(二) Job和JobDetails
    RabbitMQ学习笔记(二) 工作队列
    Spark SQL JSON数据处理
    Hive JSON数据处理的一点探索
    由“Beeline连接HiveServer2后如何使用指定的队列(Yarn)运行Hive SQL语句”引发的一系列思考
  • 原文地址:https://www.cnblogs.com/bclshuai/p/12297802.html
Copyright © 2011-2022 走看看