zoukankan      html  css  js  c++  java
  • MapReducer Counter计数器的使用,Combiner ,Partitioner,Sort,Grop的使用,

    一:Counter计数器的使用

      hadoop计数器:可以让开发人员以全局的视角来审查程序的运行情况以及各项指标,及时做出错误诊断并进行相应处理。

      内置计数器(MapReduce相关、文件系统相关和作业调度相关)

      也可以通过http://master:50030/jobdetails.jsp查看

    /**
     * 度量,在运行job任务的时候产生了那些j输出.通过计数器可以观察整个计算的过程,运行时关键的指标到底是那些.可以表征程序运行时一些关键的指标.
     * 计数器 counter 统计敏感单词出现次数
     */
    public class WordCountApp {
        private static final String INPUT_PATH = "hdfs://hadoop1:9000/abd";
        private static final String OUT_PATH = "hdfs://hadoop1:9000/out";
        public static void main(String[] args) {
            Configuration conf = new Configuration();
            try {
                FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
                fileSystem.delete(new Path(OUT_PATH), true);
                Job job = new Job(conf, WordCountApp.class.getSimpleName());
                job.setJarByClass(WordCountApp.class);
                FileInputFormat.setInputPaths(job, INPUT_PATH);
                job.setMapperClass(MyMapper.class);
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(LongWritable.class);
                job.setReducerClass(MyReducer.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(LongWritable.class);
                FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
                job.waitForCompletion(true);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static class MyMapper extends
                Mapper<LongWritable, Text, Text, LongWritable> {
            @Override
            protected void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                //获得计数器
                Counter counter = context.getCounter("Sensitive Words", "hello");//组名称  计数器名称
                String line = value.toString();
                if(line.contains("hello")){//假设hello为敏感词
                    counter.increment(1L);
                }
                String[] splited = line.split("	");
                for (String word : splited) {
                    context.write(new Text(word), new LongWritable(1));
                }
            }
        }
    
        public static class MyReducer extends
                Reducer<Text, LongWritable, Text, LongWritable> {
            @Override
            protected void reduce(Text key, Iterable<LongWritable> values,
                    Context context) throws IOException, InterruptedException {
                long count = 0L;
                for (LongWritable times : values) {
                    count += times.get();
                }
                context.write(key, new LongWritable(count));
            }
        }
        
    }
    Counter计数器的使用

    二:Combiner 的使用

      每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。

      combiner最基本是实现本地key的归并,combiner具有类似本地的reduce功能。

      如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。

      注意:Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以从我的想法来看,Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。

    /**
     * combiner位于map和reducer中间,会处理一下数据.
     * 原来的时候记录在直接从map到了reduce,
     * 现在map端有了combiner,combiner位于map阶段的后面.数据就会经过combiner再进入reduce端
     * 加入combiner之后就会在map端分组之后进行合并.
     * 
     *     为什么使用combiner
        目的:减少map端的输出,意味着shuffle时传输的数据量小,网络开销就小了.
         使用combiner有什么限制?什么时候不使用,什么时候使用?
        有一些时候使用combiner是不合适的 ,比如求平均值不合适.在进行运算的时候,运算的结果和数据的总量有关系的时候就不能使用combiner
        幂等可以使用,幂不等就不可以使用.求平均数只能根据全部的样本来求,取一部分那就不行了.
        使用combiner的时候通常和reducer的代码是一样的.
        但是combiner并不能代表reducer的作用,因为在reducer端还会把多个map的输出合并到一起.
        因为combiner只会对单个map做处理,不会对多个map的输出做处理.
     */
    public class WordCountApp {
        private static final String INPUT_PATH = "hdfs://hadoop1:9000/files";
        private static final String OUT_PATH = "hdfs://hadoop1:9000/out";
        public static void main(String[] args) {
            Configuration conf = new Configuration();
            try {
                FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
                fileSystem.delete(new Path(OUT_PATH), true);
                Job job = new Job(conf, WordCountApp.class.getSimpleName());
                job.setJarByClass(WordCountApp.class);
                FileInputFormat.setInputPaths(job, INPUT_PATH);
                job.setMapperClass(MyMapper.class);
                
                job.setCombinerClass(MyReducer.class);//设置combiner
                
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(LongWritable.class);
                
                //使用combiner之后,产生的结果和reducer产生的结果是一样的话,可以不要reducer
                job.setReducerClass(MyReducer.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(LongWritable.class);
                FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
                job.waitForCompletion(true);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static class MyMapper extends
                Mapper<LongWritable, Text, Text, LongWritable> {
            @Override
            protected void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                String line = value.toString();
                String[] splited = line.split("	");
                for (String word : splited) {
                    context.write(new Text(word), new LongWritable(1));
                }
            }
        }
    
        public static class MyReducer extends
                Reducer<Text, LongWritable, Text, LongWritable> {
            @Override
            protected void reduce(Text key, Iterable<LongWritable> values,
                    Context context) throws IOException, InterruptedException {
                long count = 0L;
                for (LongWritable times : values) {
                    count += times.get();
                }
                context.write(key, new LongWritable(count));
            }
        }
        
    }
    Combiner的使用

     三:自定义Partitioner的使用:

      1. Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类。

      2. HashPartitioner是mapreduce的默认partitioner。计算方法是 which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。

      3. (例子以jar形式运行)

    /**
     * partitioner:分区,指的是对输出的数据进行划分.
     * 在map端要分成多少个reducer去处理,就会分成多少个区.
     * 输出结果是手机号和非手机号.要求通过两个reduce分别处理不同的数据.一个是手机号的,一个是非手机的处理.
     * reduce中的数据是通过shuffle去map那拿的.shuffle在读取数据的时候需要知道哪些数据是给哪些reduce处理的,就需要在map端对数据进行分区.
     * 分区说白了就是对数据分区的一个索引.
     * 默认分区类:HashPartitioner
     * 在Partitioner返回的分区数一定要和reducer的数目相同.
     */
    
    public class KpiApp {
        public static final String INPUT_PATH = "hdfs://hadoop1:9000/kpi";
        public static final String OUT_PATH = "hdfs://hadoop1:9000/kpi_out";
        
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH),conf);
            if(fileSystem.isDirectory(new Path(OUT_PATH))){
                fileSystem.delete(new Path(OUT_PATH));
            }
            Job job = new Job(conf, KpiApp.class.getSimpleName());
            job.setJarByClass(KpiApp.class);
            FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
            
            job.setMapperClass(MyMapper.class);
            job.setPartitionerClass(MyPartitioner.class);
            job.setNumReduceTasks(2);
            
            job.setReducerClass(MyReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(KpiWritable.class);
            FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
            job.waitForCompletion(true);
            
        }
    
        public static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable>{
            @Override
            protected void map(LongWritable key, Text value,Context context)
                    throws IOException, InterruptedException {
                String line = value.toString();//value就是输入的每一行
                String[] splited = line.split("	");//制表符分割
                String mobileNumber = splited[1];//手机号
                Text k2 = new Text(mobileNumber);
                KpiWritable v2 = new KpiWritable(Long.parseLong(splited[6]), Long.parseLong(splited[7]), Long.parseLong(splited[8]), Long.parseLong(splited[9]));
                context.write(k2, v2);
            }
        }
        
        public static class MyReducer extends Reducer<Text, KpiWritable, Text, KpiWritable>{
            @Override
            protected void reduce(Text k2, Iterable<KpiWritable> v2s,Context context)throws IOException, InterruptedException {
                long upPackNum = 0L ;//上行数据包数
                long downPackNum = 0L ;//下行数据包数
                long upPayLoad = 0L ;//上行总流量
                long downPayLoad = 0L ;//下行总流量
                for (KpiWritable kpiWritable : v2s) {
                    upPackNum += kpiWritable.upPackNum ;
                    downPackNum += kpiWritable.downPackNum ;
                    upPayLoad += kpiWritable.upPayLoad ;
                    downPayLoad += kpiWritable.downPayLoad ;
                }
                KpiWritable v3 = new KpiWritable(upPackNum, downPackNum, upPayLoad, downPayLoad);
                context.write(k2, v3);
            }
        }
        //如果有一个分区就会返回一个结果,并且这个值还得是0
        //reduce的数量一定要大于等于分区的数量.
        public static class MyPartitioner extends Partitioner<Text, KpiWritable>{
    
            @Override
            public int getPartition(Text key, KpiWritable value, int numPartitions) {
                int length = key.toString().length();
                return length==11?0:1;
                //正常的应该是模 而不是简单的比较
    //            return (int)Math.abs((Math.signum(length-11))%numPartitions) ;
            }
            
        }
        
    }
    
    class KpiWritable implements Writable{
        long upPackNum ;//上行数据包数
        long downPackNum ;//下行数据包数
        long upPayLoad ;//上行总流量
        long downPayLoad ;//下行总流量
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(upPackNum);
            out.writeLong(downPackNum);
            out.writeLong(upPayLoad);
            out.writeLong(downPayLoad);
        }
        //需要注意 按照什么顺序写出去,就按照什么顺序读进来,以为我们的数据写出去之后,是一个流,流是一个一维的.
        //就是从这个方向到那个方向.
        @Override
        public void readFields(DataInput in) throws IOException {
            this.upPackNum = in.readLong();
            this.downPackNum = in.readLong();
            this.upPayLoad = in.readLong();
            this.downPayLoad = in.readLong();
        }
        public KpiWritable() {
        }
        public KpiWritable(long upPackNum, long downPackNum, long upPayLoad,
                long downPayLoad) {
            super();
            set(upPackNum, downPackNum, upPayLoad, downPayLoad);
        }
        public void set(long upPackNum, long downPackNum, long upPayLoad,
                long downPayLoad) {
            this.upPackNum = upPackNum;
            this.downPackNum = downPackNum;
            this.upPayLoad = upPayLoad;
            this.downPayLoad = downPayLoad;
        }
        @Override
        public String toString() {
            return upPackNum + "	"+downPackNum + "	"+upPayLoad+"	"+downPayLoad;
        }
    }
    自定义Partitioner的使用

    四:自定义排序Sort的使用:

      1. 在map和reduce阶段进行排序时,比较的是k2。v2是不参与排序比较的。如果要想让v2也进行排序,需要把k2和v2组装成新的类,作为k2,才能参与比较。

      2. 分组时也是按照k2进行比较的。

    /**
     * 自定义排序
     * 默认排序规则是按照k2进行排序的,v2是不参与排序的
     * 如果想让第二列也参与排序 意味着第二列都作为k2,因为我们的规则就是k2参加排序,所以这里使用自定义序列化类型
     */
    public class SortApp {
        private static final String INPUT_PATH = "hdfs://hadoop1:9000/data";// 输入路径
        private static final String OUT_PATH = "hdfs://hadoop1:9000/out";// 输出路径,reduce作业输出的结果是一个目录
        public static void main(String[] args) {
            Configuration conf = new Configuration();// 配置对象
            try {
                FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
                fileSystem.delete(new Path(OUT_PATH), true);
                Job job = new Job(conf, SortApp.class.getSimpleName());// jobName:作业名称
                job.setJarByClass(SortApp.class);
                FileInputFormat.setInputPaths(job, INPUT_PATH);// 指定数据的输入
                job.setMapperClass(MyMapper.class);// 指定自定义map类
                job.setMapOutputKeyClass(NewK2.class);// 指定map输出key的类型
                job.setMapOutputValueClass(LongWritable.class);// 指定map输出value的类型
                job.setReducerClass(MyReducer.class);// 指定自定义Reduce类
                job.setOutputKeyClass(LongWritable.class);// 设置Reduce输出key的类型
                job.setOutputValueClass(LongWritable.class);// 设置Reduce输出的value类型
                FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));// Reduce输出完之后,就会产生一个最终的输出,指定最终输出的位置
                job.waitForCompletion(true);// 提交给jobTracker并等待结束
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static class MyMapper extends
                Mapper<LongWritable, Text, NewK2, LongWritable> {
            @Override
            protected void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                String line = value.toString();
                String[] splited = line.split("	");
                context.write(new NewK2(Long.parseLong(splited[0]),Long.parseLong(splited[1])), new LongWritable());// 把每个单词出现的次数1写出去.
            }
        }
        public static class MyReducer extends
                Reducer<NewK2, LongWritable, LongWritable, LongWritable> {
            @Override
            protected void reduce(NewK2 key, Iterable<LongWritable> values,
                    Context context) throws IOException, InterruptedException {
                context.write(new LongWritable(key.first), new LongWritable(key.second));
            }
        }
        
        public static class NewK2 implements WritableComparable<NewK2>{
    
            long first ;
            long second ;
            public NewK2(long first, long second) {
                super();
                this.first = first;
                this.second = second;
            }
            //无参必须有
            public NewK2() {
                // TODO Auto-generated constructor stub
            }
            @Override
            public void write(DataOutput out) throws IOException {
                out.writeLong(this.first);
                out.writeLong(this.second);
            }
    
            @Override
            public void readFields(DataInput in) throws IOException {
                this.first = in.readLong() ;
                this.second = in.readLong() ;
            }
    
            @Override
            public int compareTo(NewK2 o) {
                long minus = this.first - o.first;
                if(minus != 0){
                    return (int) minus ;
                }
                return (int)(this.second - o.second);
            }
        }
        
    }
    自定义排序Sort的使用

     五:自定义分组Grop的使用:

    /**
     * 自定义分组
     * 当第一列相同 要第二列的最大值  
     * 默认排完序之后是分成6个组的,因为是第二列也参与比较的,那么就没法三组,只有分成第二列中找到最大值
     * 
        3    3
        3    2
        3    1
        2    2
        2    1
        1    1
     */
    public class GroupApp {
        private static final String INPUT_PATH = "hdfs://hadoop1:9000/data";
        private static final String OUT_PATH = "hdfs://hadoop1:9000/out";
        public static void main(String[] args) {
            Configuration conf = new Configuration();
            try {
                FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
                fileSystem.delete(new Path(OUT_PATH), true);
                Job job = new Job(conf, GroupApp.class.getSimpleName());
                job.setJarByClass(GroupApp.class);
                FileInputFormat.setInputPaths(job, INPUT_PATH);
                job.setMapperClass(MyMapper.class);
                job.setMapOutputKeyClass(NewK2.class);
                job.setMapOutputValueClass(LongWritable.class);
                
                job.setGroupingComparatorClass(MyGroupComparator.class);//实现一个比较键
                
                job.setReducerClass(MyReducer.class);
                job.setOutputKeyClass(LongWritable.class);
                job.setOutputValueClass(LongWritable.class);
                FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
                job.waitForCompletion(true);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static class MyMapper extends
                Mapper<LongWritable, Text, NewK2, LongWritable> {
            @Override
            protected void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                String line = value.toString();
                String[] splited = line.split("	");
                context.write(new NewK2(Long.parseLong(splited[0]),Long.parseLong(splited[1])), new LongWritable(Long.parseLong(splited[1])));// 把每个单词出现的次数1写出去.
            }
        }
        public static class MyReducer extends
                Reducer<NewK2, LongWritable, LongWritable, LongWritable> {
            @Override
            protected void reduce(NewK2 key, Iterable<LongWritable> values,
                    Context context) throws IOException, InterruptedException {
                long min = Long.MAX_VALUE ;
                for (LongWritable longWritable : values) {
                    if(longWritable.get() < min){
                        min = longWritable.get() ;
                    }
                }
                context.write(new LongWritable(key.first), new LongWritable(min));
            }
        }
        
        public static class NewK2 implements WritableComparable<NewK2>{
    
            long first ;
            long second ;
            public NewK2(long first, long second) {
                super();
                this.first = first;
                this.second = second;
            }
            //无参必须有
            public NewK2() {
                // TODO Auto-generated constructor stub
            }
            @Override
            public void write(DataOutput out) throws IOException {
                out.writeLong(this.first);
                out.writeLong(this.second);
            }
    
            @Override
            public void readFields(DataInput in) throws IOException {
                this.first = in.readLong() ;
                this.second = in.readLong() ;
            }
    
            @Override
            public int compareTo(NewK2 o) {
                long minus = this.first - o.first;
                if(minus != 0){
                    return (int) minus ;
                }
                return (int)(this.second - o.second);
            }
        }
        
        public static class MyGroupComparator implements RawComparator<NewK2>{
    
            @Override
            public int compare(NewK2 o1, NewK2 o2) {
                return 0;
            }
    
            //分组时只使用这个方法
            /**
             * b1:相当于this
             * b2:相当于o 比较的
             * s1和s2表示从很长的字节数组中从哪个位置去读取你的这个值.
             * l1和l2表示处理的值长度
             */
            @Override
            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
                //只需要比较第一列 long占有8个字节
                return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);
            }
            
        }
        
    }
    自定义分组Grop的使用
  • 相关阅读:
    linux磁盘挂载
    3个方法解决百度网盘限速 (2018-07-20)
    mysql状态分析之show global status
    Cgroups子系统介绍
    Go语言 关键字:defer
    Go语言 map的实现
    Go语言 基本类型
    MySQL 监控指标
    sshpass的使用方法
    C++11 std::ref使用场景
  • 原文地址:https://www.cnblogs.com/xiaolong1032/p/4543098.html
Copyright © 2011-2022 走看看