zoukankan      html  css  js  c++  java
  • Partitioner编程——根据运营商分组统计用户上网流量

    1. Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类。

    2. HashPartitioner是mapreduce的默认partitioner。计算方法是
      which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。

    3. (例子以jar形式运行)

    排序和分组

    1. 在map和reduce阶段进行排序时,比较的是k2。v2是不参与排序比较的。如果要想让v2也进行排序,需要把k2和v2组装成新的类,作为k2,才能参与比较。
    2. 分组时也是按照k2进行比较的。

    partition的数量由谁来决定?—-reducer!!

    有多少reducer就有多少partitioner

    public class DataCount {
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
    
            Job job = Job.getInstance(conf);
    
            job.setJarByClass(DataCount.class);
    
            job.setMapperClass(DCMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(DataInfo.class);
    
            job.setReducerClass(DCReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(DataInfo.class);
    
            FileInputFormat.setInputPaths(job, new Path(args[0]));
    
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            //设置partitioner的执行类
            job.setPartitionerClass(DCPartitioner.class);
    
            job.setNumReduceTasks(Integer.parseInt(args[2]));
    
    
            job.waitForCompletion(true);
    
        }
        //Map阶段 k1:行号 v1:一行数据 k2:手机号 v2:有用字段组成的javaBean
        public static class DCMapper extends Mapper<LongWritable, Text, Text, DataInfo>{
    
            private Text k = new Text();
    
            @Override
            protected void map(LongWritable key, Text value,
                    Mapper<LongWritable, Text, Text, DataInfo>.Context context)
                    throws IOException, InterruptedException {
                String line = value.toString();
                String[] fields = line.split("	");
                String tel = fields[1];
                long up = Long.parseLong(fields[8]);
                long down = Long.parseLong(fields[9]);
                DataInfo dataInfo = new DataInfo(tel,up,down);
                k.set(tel);
                context.write(k, dataInfo);
    
            }
    
        }
        //Partition阶段  
        /**
          * partition的数量由谁来决定?----reducer !!
          *  有多少个reducer就有多少个partitioner
          */
        public static class DCPartitioner extends  Partitioner<Text, DataInfo>{
            //定义一个map用于存放运营商的对应分组号
            //static是自上往下执行的
            private static Map<String,Integer> provider = new HashMap<String,Integer>();
    
            static{
                provider.put("138", 1);
                provider.put("139", 1);
                provider.put("152", 2);
                provider.put("153", 2);
                provider.put("182", 3);
                provider.put("183", 3);
            }
            /**
              *返回值:int 分组号,一个组对应一个map
              */
            @Override
            public int getPartition(Text key, DataInfo value, int numPartitions) {
                //向数据库或配置信息 读写
                String tel_sub = key.toString().substring(0,3);
                //获取手机号前三位,对运营商进行分组识别
                Integer count = provider.get(tel_sub);
                if(count == null){
                    count = 0;
                }
                //返回组号
                return count;
            }
    
        }
    
        //Reduce阶段 k2:手机号 v2:dataInfo迭代器 k3:手机号 v3:dataInfo
        public static class DCReducer extends Reducer<Text, DataInfo, Text, DataInfo>{
    
            @Override
            protected void reduce(Text key, Iterable<DataInfo> values,Reducer<Text, DataInfo, Text, DataInfo>.Context context)
                    throws IOException, InterruptedException {
                long up_sum = 0;
                long down_sum = 0;
                for(DataInfo d : values){
                    up_sum += d.getUpPayLoad();
                    down_sum += d.getDownPayLoad();
                }
                DataInfo dataInfo = new DataInfo("",up_sum,down_sum);
    
                context.write(key, dataInfo);
            }
    
        }
    
    
    }
  • 相关阅读:
    圆圈中最后剩下的数字 【微软面试100题 第十八题】
    第一个只出现一次的字符 【微软面试100题 第十七题】
    从上往下打印二叉树 【微软面试100题 第十六题】
    二叉树的镜像 【微软面试100题 第十五题】
    和为s的两个数字 【微软面试100题 第十四题】
    链表中倒数第k个结点 【微软面试100题 第十三题】
    求1+2+...+n 【微软面试100题 第十二题】
    求二叉树中结点的最大距离 【微软面试100题 第十一题】
    翻转句子中单词的顺序 【微软面试100题 第十题】
    二叉搜索树的后序遍历序列 【微软面试100题 第九题】
  • 原文地址:https://www.cnblogs.com/shiguangmanbu2016/p/5932884.html
Copyright © 2011-2022 走看看