zoukankan      html  css  js  c++  java
  • Pig、Hive、MapReduce 解决分组 Top K 问题(转)

    问题:

    有如下数据文件 city.txt (id, city, value)

    cat city.txt 
    1 wh 500
    2 bj 600
    3 wh 100
    4 sh 400
    5 wh 200
    6 bj 100
    7 sh 200
    8 bj 300
    9 sh 900
    需要按 city 分组聚合,然后从每组数据中取出前两条value最大的记录。

    1、这是实际业务中经常会遇到的 group TopK 问题,下面来看看 pig 如何解决:

    1 a = load '/data/city.txt'  using PigStorage(' 'as (id:chararray, city:chararray, value:int);
    2 b = group by city;
    3 c = foreach b {c1=order by value desc; c2=limit c1 2; generate group,c2.value;};
    4 d = stream c through `sed 's/[(){}]//g'`;
    5 dump d;
    结果:
    1 (bj,600,300)
    2 (sh,900,400)
    3 (wh,500,200)
    这几行代码其实也实现了mysql中的 group_concat 函数的功能:
    1 a = load '/data/city.txt'  using PigStorage(' 'as (id:chararray, city:chararray, value:int);
    2 b = group by city;
    3 c = foreach b {c1=order by value desc;  generate group,c1.value;};
    4 d = stream c through `sed 's/[(){}]//g'`;
    5 dump d;
    结果:
    1 (bj,600,300,100)
    2 (sh,900,400,200)
    3 (wh,500,200,100)

    2、下面我们再来看看hive如何处理group topk的问题:

    本质上HSQL和sql有很多相同的地方,但HSQL目前功能还有很多缺失,至少不如原生态的SQL功能强大,

    比起PIG也有些差距,如果SQL中这类分组topk的问题如何解决呢?

    1 select from city a where
    2 2>(select count(1) from city where cname=a.cname and value>a.value)
    3 distribute by a.cname sort by a.cname,a.value desc;
    http://my.oschina.net/leejun2005/blog/78904

    但是这种写法在HQL中直接报语法错误了,下面我们只能用hive udf的思路来解决了:

    排序city和value,然后对city计数,最后where过滤掉city列计数器大于k的行即可。

    好了,上代码:

    (1)定义UDF:

    01 package com.example.hive.udf;
    02 import org.apache.hadoop.hive.ql.exec.UDF;
    03       
    04 public final class Rank extends UDF{
    05     private int  counter;
    06     private String last_key;
    07     public int evaluate(final String key){
    08       if ( !key.equalsIgnoreCase(this.last_key) ) {
    09          this.counter = 0;
    10          this.last_key = key;
    11       }
    12       return this.counter++;
    13     }
    14 }
    (2)注册jar、建表、导数据,查询:
    1 add jar Rank.jar;
    2 create temporary function rank as 'com.example.hive.udf.Rank';
    3 create table city(id int,cname string,value int) row format delimited fields terminated by ' ';
    4 LOAD DATA LOCAL INPATH 'city.txt' OVERWRITE INTO TABLE city;
    5 select cname, value from (
    6     select cname,rank(cname) csum,value from (
    7         select id, cname, value from city distribute by cname sort by cname,value desc
    8     )a
    9 )b where csum < 2;

    (3)结果:

     

    1 bj  600
    2 bj  300
    3 sh  900
    4 sh  400
    5 wh  500
    6 wh  200
    可以看到,hive相比pig来说,处理起来稍微复杂了点,但随着hive的日渐完善,以后比pig更简洁也说不定。

    REF:hive中分组取前N个值的实现

    http://baiyunl.iteye.com/blog/1466343

     

    3、最后我们来看一下原生态的MR:

     

    01 import java.io.IOException;
    02 import java.util.TreeSet;
    03  
    04 import org.apache.hadoop.conf.Configuration;
    05 import org.apache.hadoop.fs.Path;
    06 import org.apache.hadoop.io.IntWritable;
    07 import org.apache.hadoop.io.LongWritable;
    08 import org.apache.hadoop.io.Text;
    09 import org.apache.hadoop.mapreduce.Job;
    10 import org.apache.hadoop.mapreduce.Mapper;
    11 import org.apache.hadoop.mapreduce.Reducer;
    12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    14 import org.apache.hadoop.util.GenericOptionsParser;
    15  
    16 public class GroupTopK {
    17     // 这个 MR 将会取得每组年龄中 id 最大的前 3 个
    18     // 测试数据由脚本生成:http://my.oschina.net/leejun2005/blog/76631
    19     public static class GroupTopKMapper extends
    20             Mapper<LongWritable, Text, IntWritable, LongWritable> {
    21         IntWritable outKey = new IntWritable();
    22         LongWritable outValue = new LongWritable();
    23         String[] valArr = null;
    24  
    25         public void map(LongWritable key, Text value, Context context)
    26                 throws IOException, InterruptedException {
    27             valArr = value.toString().split(" ");
    28             outKey.set(Integer.parseInt(valArr[2]));// age int
    29             outValue.set(Long.parseLong(valArr[0]));// id long
    30             context.write(outKey, outValue);
    31         }
    32     }
    33  
    34     public static class GroupTopKReducer extends
    35             Reducer<IntWritable, LongWritable, IntWritable, LongWritable> {
    36  
    37         LongWritable outValue = new LongWritable();
    38  
    39         public void reduce(IntWritable key, Iterable<LongWritable> values,
    40                 Context context) throws IOException, InterruptedException {
    41             TreeSet<Long> idTreeSet = new TreeSet<Long>();
    42             for (LongWritable val : values) {
    43                 idTreeSet.add(val.get());
    44                 if (idTreeSet.size() > 3) {
    45                     idTreeSet.remove(idTreeSet.first());
    46                 }
    47             }
    48             for (Long id : idTreeSet) {
    49                 outValue.set(id);
    50                 context.write(key, outValue);
    51             }
    52         }
    53     }
    54  
    55     public static void main(String[] args) throws Exception {
    56         Configuration conf = new Configuration();
    57         String[] otherArgs = new GenericOptionsParser(conf, args)
    58                 .getRemainingArgs();
    59  
    60         System.out.println(otherArgs.length);
    61         System.out.println(otherArgs[0]);
    62         System.out.println(otherArgs[1]);
    63  
    64         if (otherArgs.length != 3) {
    65             System.err.println("Usage: GroupTopK <in> <out>");
    66             System.exit(2);
    67         }
    68         Job job = new Job(conf, "GroupTopK");
    69         job.setJarByClass(GroupTopK.class);
    70         job.setMapperClass(GroupTopKMapper.class);
    71         job.setReducerClass(GroupTopKReducer.class);
    72         job.setNumReduceTasks(1);
    73         job.setOutputKeyClass(IntWritable.class);
    74         job.setOutputValueClass(LongWritable.class);
    75         FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
    76         FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
    77         System.exit(job.waitForCompletion(true) ? 0 1);
    78     }
    79 }

    hadoop jar GroupTopK.jar GroupTopK /tmp/decli/record_new.txt /tmp/1

    结果:

     

    hadoop fs -cat /tmp/1/part-r-00000
    0       12869695
    0       12869971
    0       12869976
    1       12869813
    1       12869870
    1       12869951

    ......

    数据验证:

    awk '$3==0{print $1}' record_new.txt|sort -nr|head -3
    12869976
    12869971
    12869695

    可以看到结果没有问题。 

    注:测试数据由以下脚本生成:

    http://my.oschina.net/leejun2005/blog/76631

     

    PS:

    如果说hive类似sql的话,那pig就类似plsql存储过程了:程序编写更自由,逻辑能处理的更强大了。

    pig中还能直接通过反射调用java的静态类中的方法,这块内容请参考之前的相关pig博文。

    附几个HIVE UDAF链接,有兴趣的同学自己看下:

    Hive UDAF和UDTF实现group by后获取top值 http://blog.csdn.net/liuzhoulong/article/details/7789183
    hive中自定义函数(UDAF)实现多行字符串拼接为一行 http://blog.sina.com.cn/s/blog_6ff05a2c0100tjw4.html
    编写Hive UDAF http://www.fuzhijie.me/?p=118
    Hive UDAF开发 http://richiehu.blog.51cto.com/2093113/386113

  • 相关阅读:
    浦东新区2019年下半年部分街镇社区工作者和部分单位编外人员公开招聘考试大纲
    苏州 山西
    第几行记录
    命令 检查Linux服务器性能
    SQLRecoverableException: I/O Exception: Connection reset
    Oracle单表备份三种方案
    vim 清空
    常看 Shell: 文本文件操作
    bash date format
    Linux Shell 截取字符串
  • 原文地址:https://www.cnblogs.com/stubborn412/p/3930262.html
Copyright © 2011-2022 走看看