zoukankan      html  css  js  c++  java
  • 【原创】MapReduce计数器

    MapReduce框架内置了一些计数器的支持,当然,我们也可以设置自己的计数器用来满足一些特殊的要求。

    其实计数器可以用来完成很多事,关键要看你如何用,例如你想知道map输入数据的指定记录特定的信息有多少可以设置计数,还有,在MR执行过程中,一些特定事件的发生次数也可以记录。使用计数器的莫大好处在于整个计数的过程只需要再map阶段就可以完成,而且也可以不做任何输出,可以快速的得到自己想要的一些计数结果。但并不是计数器可以设置为无限多,因为计数器过多会影响JT的效率,甚至可能被自定义的分析程序拖垮。

    • 计数器原理

    计数器的信息是存储再JobTracker中的内存中的,TaskTracker执行任务时会对设定的信息进行计数,按照既定的条件对计数器进行累加,并聚合汇报给JT。JT在工作完成的时候做整体聚合。

    • 程序实例

    首先需要定义个枚举类:

    package zebra.shlgao.counters;
    
    public enum MyCounter {
        CounterA,CounterB
    }

    然后在MR程序中分别计数不同Counter的数量:

    package zebra.shlgao.counters;
    
    import java.io.IOException;
    import java.net.URI;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.*;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class TestCounter {
        public static class CounterMapper extends Mapper<Object, Text, Text, Text>{        
            @Override
            protected void map(Object key, Text value,Context context)
                    throws IOException, InterruptedException {
                // TODO Auto-generated method stub
                String txt = value.toString();
                if (txt.contains("java")){
                    context.getCounter(MyCounter.CounterA).increment(1);
                }else{
                    context.getCounter(MyCounter.CounterB).increment(2);
                }
    //            context.write(new Text(key), value);
            }
            
        }
        public static void main(String args[]) throws IOException, ClassNotFoundException, InterruptedException{
            Configuration conf = new Configuration();
            Job job = new Job(conf, "testCounter");
            job.setJarByClass(TestCounter.class);
            job.setMapperClass(CounterMapper.class);
    //        job.setNumReduceTasks(0);
            FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:19000/testdir/file22m"));
            FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:19000/testdir/file22mResult"));
            int finish = job.waitForCompletion(true) ? 0 : 1 ;
            FileSystem fs  =  FileSystem.get(URI.create("hdfs://localhost:19000/testdir/file22mResult"),conf);
            fs.delete(new Path("hdfs://localhost:19000/testdir/file22mResult"),true);//删除空的输出路径
            System.exit(finish);
        }
    }

    由于这里是快速计数,所以可以不必做任何输出,但是在配置Job的时候必须定义输出路径,所以可以在最后将空的输出路径删除。

  • 相关阅读:
    【已解决】Kettle新建数据库连接报错(Mysql,MS Sql Server)
    SQL面试题-练习2
    WIN7bat批处理遍历文件夹,输出当前文件夹下所有文件。
    【已解决】MYSQL安装过程报错,怎么解决?MySQL error 0: Authentication to host 'localhost' for user 'root' using method 'caching_sha2_password' failed with message: Reading from the stream has failed.
    常用外国在线英语词典-单词查询
    Oracle 11g 服务端的安装步骤
    Oracle 查询(SELECT)语句(一)
    Oracle 增删改(INSERT、DELETE、UPDATE)语句
    记录一个 C# 导出 Excel 的坑
    C# 中的浅拷贝与深拷贝
  • 原文地址:https://www.cnblogs.com/gslyyq/p/4127280.html
Copyright © 2011-2022 走看看