zoukankan      html  css  js  c++  java
  • Hadoop生产集群的监视——计数器

      可以在Hadoop作业中插桩计数器来分析其整体运作。在程序中定义不同的计数器,分别累计特定事件的发生次数。对于来自同一个作业所有任务的相同计数器,Hadoop会自动对它们进行求和, 以反映整个作业的情况。这些计数器的数值会在JobTracker的Web用户界面中与Hadoop的内部计数器一起显示。

      计数器的典型应用是用来跟踪不同的输入记录类型,特别是跟踪“坏”记录。例如,我们得到的数据集格式为(只显示一部分):

    "PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD","SECDLWBD"
    3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,
    3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,
    3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,,
    3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,
    3070805,1963,1096,,"US","CA",,1,,2,6,63,,1,,0,,,,,,,
    3070806,1963,1096,,"US","PA",,1,,2,6,63,,0,,,,,,,,,
    3070807,1963,1096,,"US","OH",,1,,623,3,39,,3,,0.4444,,,,,,,
    3070808,1963,1096,,"US","IA",,1,,623,3,39,,4,,0.375,,,,,,,
    3070809,1963,1096,,"US","AZ",,1,,4,6,65,,0,,,,,,,,,
    3070810,1963,1096,,"US","IL",,1,,4,6,65,,3,,0.4444,,,,,,,
    3070811,1963,1096,,"US","CA",,1,,4,6,65,,8,,0,,,,,,,
    3070812,1963,1096,,"US","LA",,1,,4,6,65,,3,,0.4444,,,,,,,
    3070813,1963,1096,,"US","NY",,1,,5,6,65,,2,,0,,,,,,,
    3070814,1963,1096,,"US","MN",,2,,267,5,59,,2,,0.5,,,,,,,
    3070815,1963,1096,,"US","CO",,1,,7,5,59,,1,,0,,,,,,,
    3070816,1963,1096,,"US","OK",,1,,114,5,55,,4,,0,,,,,,,
    3070817,1963,1096,,"US","RI",,2,,114,5,55,,5,,0.64,,,,,,,
    3070818,1963,1096,,"US","IN",,1,,441,6,69,,4,,0.625,,,,,,,
    3070819,1963,1096,,"US","TN",,4,,12,6,63,,0,,,,,,,,,
    3070820,1963,1096,,"GB","",,2,,12,6,63,,0,,,,,,,,,
    3070821,1963,1096,,"US","IL",,2,,15,6,69,,1,,0,,,,,,,
    3070822,1963,1096,,"US","NY",,2,,401,1,12,,4,,0.375,,,,,,,
    3070823,1963,1096,,"US","MI",,1,,401,1,12,,8,,0.6563,,,,,,,
    3070824,1963,1096,,"US","IL",,1,,401,1,12,,5,,0.48,,,,,,,
    3070825,1963,1096,,"US","IL",,1,,401,1,12,,7,,0.6531,,,,,,,
    3070826,1963,1096,,"US","IA",,1,,401,1,12,,1,,0,,,,,,,
    3070827,1963,1096,,"US","CA",,4,,401,1,12,,2,,0.5,,,,,,,
    3070828,1963,1096,,"US","CT",,2,,16,5,59,,4,,0.625,,,,,,,
    3070829,1963,1096,,"FR","",,3,,16,5,59,,5,,0.48,,,,,,,
    3070830,1963,1096,,"US","NH",,2,,16,5,59,,0,,,,,,,,,
    3070831,1963,1096,,"US","CT",,2,,16,5,59,,0,,,,,,,,,
    3070832,1963,1096,,"US","LA",,2,,452,6,61,,1,,0,,,,,,,
    3070833,1963,1096,,"US","LA",,1,,452,6,61,,5,,0,,,,,,,
    3070834,1963,1096,,"US","FL",,1,,452,6,61,,3,,0.4444,,,,,,,
    3070835,1963,1096,,"US","IL",,2,,264,5,51,,5,,0.64,,,,,,,
    3070836,1963,1096,,"US","OK",,2,,264,5,51,,24,,0.7569,,,,,,,
    3070837,1963,1096,,"CH","",,3,,264,5,51,,7,,0.6122,,,,,,,
    3070838,1963,1096,,"CH","",,5,,425,5,51,,5,,0.48,,,,,,,
    3070839,1963,1096,,"US","TN",,2,,425,5,51,,8,,0.4063,,,,,,,
    3070840,1963,1096,,"GB","",,3,,425,5,51,,6,,0.7778,,,,,,,
    3070841,1963,1096,,"US","OH",,2,,264,5,51,,6,,0.8333,,,,,,,
    3070842,1963,1096,,"US","TX",,1,,425,5,51,,1,,0,,,,,,,
    3070843,1963,1096,,"US","NY",,2,,425,5,51,,1,,0,,,,,,,
    3070844,1963,1096,,"US","OH",,2,,425,5,51,,2,,0,,,,,,,
    3070845,1963,1096,,"US","IL",,1,,52,6,69,,3,,0,,,,,,,
    3070846,1963,1096,,"US","NY",,2,,425,5,51,,9,,0.7407,,,,,,,

    我们想要计算每个国家专利声明的平均数,但是在许多记录中没有声明数。我们的程序会忽略这些记录,知道被忽略记录的数量是有用的。除了满足我们的好奇心,这种插桩让我们理解程序的操作并对其正确性做一些检查。

      通过Reporter.incrCounter( )方法来使用计数器。Reporter对象被传递给map( )和reduce( )方法。以计数器名以及增量为参数来调用incrCounter( ) 。每个不同的事件都有一个独立命名的计数器。当用一个新的计数器名来调用incrCounter( ),这个计数器会被初始化并进行值的累加。

      Reporter.incrCounter( )方法有两种签名:

    public void incrCounter(String group, String counter, long amount)
    public void incrCounter(Enum key, long amount)

      如下是使用了计数器之后的计算每个国家专利声明平均数的代码段:

    package hadoop.in.action;
    
    import java.io.IOException;
    import java.util.Iterator;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.FileInputFormat;
    import org.apache.hadoop.mapred.FileOutputFormat;
    import org.apache.hadoop.mapred.JobClient;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.MapReduceBase;
    import org.apache.hadoop.mapred.Mapper;
    import org.apache.hadoop.mapred.OutputCollector;
    import org.apache.hadoop.mapred.Reducer;
    import org.apache.hadoop.mapred.Reporter;
    import org.apache.hadoop.mapred.RunningJob;
    import org.apache.hadoop.mapred.TextInputFormat;
    import org.apache.hadoop.mapred.TextOutputFormat;
    
    public class AverageByAttribute {
    
        public static class MapClass extends MapReduceBase implements
                Mapper<LongWritable, Text, Text, Text> {
    
            static enum ClaimsCounters {
                MISSING, QUOTED
            };
    
            private Text k = new Text();
            private Text v = new Text();
    
            @Override
            public void map(LongWritable key, Text value,
                    OutputCollector<Text, Text> output, Reporter reporter)
                    throws IOException {
    
                String[] fields = value.toString().split(",", -1);
                String country = fields[4];
                String numClaims = fields[8];
                if (numClaims.length() == 0) {
                    reporter.incrCounter(ClaimsCounters.MISSING, 1);
                } else {
                    if (numClaims.startsWith(""")) {
                        reporter.incrCounter(ClaimsCounters.QUOTED, 1);
                    } else {
                        k.set(country);
                        v.set(numClaims + ",1");
                        output.collect(k, v);
                    }
                }
    
            }
    
        }
    
        public static class CombineClass extends MapReduceBase implements
                Reducer<Text, Text, Text, Text> {
    
            private Text v = new Text();
    
            @Override
            public void reduce(Text key, Iterator<Text> values,
                    OutputCollector<Text, Text> output, Reporter reporter)
                    throws IOException {
    
                int count = 0;
                double sum = 0;
                while (values.hasNext()) {
                    String[] fields = values.next().toString().split(",");
                    sum += Double.parseDouble(fields[0]);
                    count += Integer.parseInt(fields[1]);
                    v.set(sum + "," + count);
                    output.collect(key, v);
                }
            }
    
        }
    
        public static class ReduceClass extends MapReduceBase implements
                Reducer<Text, Text, Text, DoubleWritable> {
    
            private DoubleWritable v = new DoubleWritable();
    
            @Override
            public void reduce(Text key, Iterator<Text> values,
                    OutputCollector<Text, DoubleWritable> output, Reporter reporter)
                    throws IOException {
    
                int count = 0;
                double sum = 0;
                while (values.hasNext()) {
                    String[] fields = values.next().toString().split(",");
                    sum += Double.parseDouble(fields[0]);
                    count += Integer.parseInt(fields[1]);
                }
                v.set((double) sum / count);
                output.collect(key, v);
            }
    
        }
    
        public static void run() throws IOException {
    
            Configuration configuration = new Configuration();
            JobConf jobConf = new JobConf(configuration, AverageByAttribute.class);
    
            String input = "hdfs://localhost:9000/user/hadoop/input/apat63_99.txt";
            String output = "hdfs://localhost:9000/user/hadoop/output";
    
            // HDFSDao hdfsDao = new HDFSDao(configuration);
            // hdfsDao.rmr(output);
    
            FileInputFormat.setInputPaths(jobConf, new Path(input));
            FileOutputFormat.setOutputPath(jobConf, new Path(output));
    
            jobConf.setInputFormat(TextInputFormat.class);
            jobConf.setOutputFormat(TextOutputFormat.class);
    
            jobConf.setMapOutputKeyClass(Text.class);
            jobConf.setMapOutputValueClass(Text.class);
            jobConf.setOutputKeyClass(Text.class);
            jobConf.setOutputValueClass(DoubleWritable.class);
    
            jobConf.setMapperClass(MapClass.class);
            jobConf.setCombinerClass(CombineClass.class);
            jobConf.setReducerClass(ReduceClass.class);
    
            RunningJob runningJob = JobClient.runJob(jobConf);
            while (!runningJob.isComplete()) {
                runningJob.waitForCompletion();
            }
    
        }
    
        public static void main(String[] args) throws IOException {
    
            run();
    
        }
    
    }

      程序运行后,可以看到定义的计数器和Hadoop内部的计数器都被显示在JobTracker的Web用户界面中:

  • 相关阅读:
    android布局
    Windows7 32/64位系统搭建Cocos2d-x及Android交叉编译环境
    第12章 文件管理
    第十章 多处理器和实时调度
    C语言实现多级反馈队列调度算法
    C++实现操作系统调度算法(FSFS,SJF,RR,多级反馈队列算法)
    多级反馈队列调度算法
    第九章 单处理器调度
    第六章 并发:死锁与饥饿
    第七章 内存管理
  • 原文地址:https://www.cnblogs.com/Murcielago/p/4649130.html
Copyright © 2011-2022 走看看