zoukankan      html  css  js  c++  java
  • 玩转大数据系列之Apache Pig高级技能之函数编程(六)

    原创不易,转载请务必注明,原创地址,谢谢配合!
    http://qindongliang.iteye.com/


    Pig系列的学习文档,希望对大家有用,感谢关注散仙!

    Apache Pig的前世今生

    Apache Pig如何自定义UDF函数?

    Apache Pig5行代码怎么实现Hadoop的WordCount?

    Apache Pig入门学习文档(一)

    Apache Pig学习笔记(二)

    Apache Pig学习笔记之内置函数(三)

    玩转大数据系列之Apache Pig如何与Apache Lucene集成(一)

    玩转大数据系列之Apache Pig如何与Apache Solr集成(二)

    玩转大数据系列之Apache Pig如何与MySQL集成(三)

    玩转大数据系列之如何给Apache Pig自定义存储形式(四)

    玩转大数据系列之Apache Pig如何通过自定义UDF查询数据库(五)

    如何使用Pig集成分词器来统计新闻词频?




    在Hadoop的生态系统中,如果我们要离线的分析海量的数据,大多数人都会选择Apache Hive或Apache Pig,在国内总体来说,Hive使用的人群占比比较高, 而Pig使用的人相对来说,则少的多,这并不是因为Pig不成熟,不稳定,而是因为Hive提供了类数据库SQL的查询语句,使得大多人上手Hive非常容易,相反而Pig则提供了类Linux shell的脚本语法,这使得大多数人不喜欢使用。

    如果在编程界,统计一下会SQL和会shell,那个人数占的比重大,散仙觉得,毫无疑问肯定是SQL语句了。因为有相当一部分编程人员是不使用Linux的,而是微软的的一套从C#,到ASP.NET,SQL Server再到Windows的专用服务器 。





    OK,扯远了,赶紧回来,使用shell的攻城师们,我觉得都会爱上它的,因为在linux系统中,没有比shell更简洁易用了,如果再配上awk和sed更是如虎添翼了。

    我们都知道shell是支持函数调用的,这一点和JavaScript是非常类似的,通过定义函数我们可以重复使用某个功能,而不用再次大量编码,其中,把变的东西,分离成参数,不变的东西定义成语句,这样以来,就能够降低编码的冗余和复杂性,试想一下,如果Java里,没有方法,那将会是多么不可思议的一件事。

    Pig作为类shell的语言,也支持了函数的方式,封装某个功能,以便于我们重用,这一点相比Hive来说,是一个很好的优势。

    下面先看下定义Pig函数(也叫宏命令)定义的语法:

    DEFINE (macros) :
    支持的参数:
    alias  pig的标量引用
    整形(integer)
    浮点型(float)
    字符串(String)

    下面看几个例子,让我们迅速对它熟悉并掌握,先看下我们的测试数据:

    Java代码 复制代码 收藏代码
    1. 1,张三,男,23,中国   
    2. 2,张三,女,32,法国   
    3. 3,小花,男,20,英国   
    4. 4,小红,男,16,中国   
    5. 5,小红,女,25,洛阳   
    6. 6,李静,女,25,中国河南安阳   
    7. 7,王强,男,11,英国   
    8. 8,张飞,男,20,美国  
    1,张三,男,23,中国
    2,张三,女,32,法国
    3,小花,男,20,英国
    4,小红,男,16,中国
    5,小红,女,25,洛阳
    6,李静,女,25,中国河南安阳
    7,王强,男,11,英国
    8,张飞,男,20,美国



    再看下pig脚本:

    Java代码 复制代码 收藏代码
    1. --定义pig函数1 支持分组统计数量   
    2. DEFINE group_and_count (A,group_key,number_reduces) RETURNS B {   
    3.     
    4.  d = group $A by $group_key parallel $number_reduces;   
    5.     
    6.  $B = foreach d generate group, COUNT($1);   
    7.   
    8. };   
    9.   
    10.   
    11. --定义pig函数2 支持排序   
    12. --A 关系引用标量   
    13. --order_field 排序的字段   
    14. --order_type 排序方式 desc ? asc ?   
    15. --storedir 存储的HDFS路径   
    16. --空返回值   
    17. define my_order(A,order_field,order_type,storedir) returns void {   
    18.     
    19.   d = order $A by $order_field $order_type ;   
    20.   store  d into '$storedir' ;     
    21.     
    22.   
    23. };    
    24.   
    25.   
    26. --定义pig函数3,支持filter过滤,以及宏命令里面调用   
    27.   
    28. --定义过滤操作   
    29. define  myfilter (A,field,count) returns B{   
    30.   
    31.    b= filter $A by $field > $count ;   
    32.   
    33.    $B = group_and_count(b,'sex',1);   
    34.   
    35. };   
    36.   
    37.   
    38. a = load  '/tmp/dongliang/318/person' using PigStorage(',') AS (id:int,name:chararray,sex:chararray,age:int,address:chararray) ;   
    39.   
    40.   
    41. --------pig函数1测试-----------------   
    42.   
    43. --定义按名字分组   
    44. --bb = group_and_count(a,name,1);   
    45. --定义按性别分组   
    46. --cc = group_and_count(a,sex,1);   
    47. --dump bb;   
    48. --dump cc;   
    49.   
    50. -------pig函数2测试------------------   
    51.   
    52. --按年龄降序   
    53. --my_order(a,age,'desc','/tmp/dongliang/318/z');   
    54.   
    55.   
    56.   
    57. --dump a;   
    58.   
    59.   
    60. -------pig函数3测试------------------   
    61.   
    62.  --过滤年龄大于20的,并按性别,分组统计数量   
    63.  r =  myfilter(a,'age',20);   
    64.   
    65.   
    66. dump r;  
    --定义pig函数1 支持分组统计数量
    DEFINE group_and_count (A,group_key,number_reduces) RETURNS B {
     
     d = group $A by $group_key parallel $number_reduces;
     
     $B = foreach d generate group, COUNT($1);
    
    };
    
    
    --定义pig函数2 支持排序
    --A 关系引用标量
    --order_field 排序的字段
    --order_type 排序方式 desc ? asc ?
    --storedir 存储的HDFS路径
    --空返回值
    define my_order(A,order_field,order_type,storedir) returns void {
     
      d = order $A by $order_field $order_type ;
      store  d into '$storedir' ;  
     
    
    }; 
    
    
    --定义pig函数3,支持filter过滤,以及宏命令里面调用
    
    --定义过滤操作
    define  myfilter (A,field,count) returns B{
    
       b= filter $A by $field > $count ;
    
       $B = group_and_count(b,'sex',1);
    
    };
    
    
    a = load  '/tmp/dongliang/318/person' using PigStorage(',') AS (id:int,name:chararray,sex:chararray,age:int,address:chararray) ;
    
    
    --------pig函数1测试-----------------
    
    --定义按名字分组
    --bb = group_and_count(a,name,1);
    --定义按性别分组
    --cc = group_and_count(a,sex,1);
    --dump bb;
    --dump cc;
    
    -------pig函数2测试------------------
    
    --按年龄降序
    --my_order(a,age,'desc','/tmp/dongliang/318/z');
    
    
    
    --dump a;
    
    
    -------pig函数3测试------------------
    
     --过滤年龄大于20的,并按性别,分组统计数量
     r =  myfilter(a,'age',20);
    
    
    dump r;



    在上面的脚本中,散仙定义了三个函数,
    (1)分组统计数量
    (2)自定义输出存储
    (3)自定义过滤并结合(1)统计数量

    通过这3个例子,让大家对pig函数有一个初步的认识,上面的函数和代码都在一个脚本中,这样看起来不太友好,而且重用性,还没有得到最大发挥,实际上函数和主体脚本是可以分离的,再用的时候,我们只需要导入函数脚本,即可拥有所有的函数功能,这样一来,函数脚本被分离到主脚本外面,就大大增加了函数脚本的重用性,我们也可以再其他脚本中引用,而且函数脚本中也可以再次引用其他的函数脚本,但前提是不能够,递归引用,这样Pig语法在执行时,是会报错的,下面看下分离后的脚本文件:

    一:函数脚本文件

    Java代码 复制代码 收藏代码
    1. --定义pig函数1 支持分组统计数量   
    2. --A 关系引用标量   
    3. --group_key 分组字段   
    4. --使用reduce的个数   
    5. --返回最终的引用结果   
    6. DEFINE group_and_count (A,group_key,number_reduces) RETURNS B {   
    7.     
    8.  d = group $A by $group_key parallel $number_reduces;   
    9.     
    10.  $B = foreach d generate group, COUNT($1);   
    11.   
    12. };   
    13.   
    14.   
    15. --定义pig函数2 支持排序   
    16. --A 关系引用标量   
    17. --order_field 排序的字段   
    18. --order_type 排序方式 desc ? asc ?   
    19. --storedir 存储的HDFS路径   
    20. --空返回值   
    21. define my_order(A,order_field,order_type,storedir) returns void {   
    22.     
    23.   d = order $A by $order_field $order_type ;   
    24.   store  d into '$storedir' ;     
    25.     
    26.   
    27. };    
    28.   
    29.   
    30. --定义pig函数3,支持filter过滤,以及宏命令里面调用   
    31. --A 关系引用标量   
    32. --field 过滤的字段   
    33. --count 阈值   
    34. --返回最终的引用结果   
    35.   
    36. define  myfilter (A,field,count) returns B{   
    37.   
    38.    b= filter $A by $field > $count ;   
    39.   
    40.    $B = group_and_count(b,'sex',1);   
    41.   
    42. };   
    43.   
    44.   
    45.   
    46. [search@dnode1 pigmacros]$   
    --定义pig函数1 支持分组统计数量
    --A 关系引用标量
    --group_key 分组字段
    --使用reduce的个数
    --返回最终的引用结果
    DEFINE group_and_count (A,group_key,number_reduces) RETURNS B {
     
     d = group $A by $group_key parallel $number_reduces;
     
     $B = foreach d generate group, COUNT($1);
    
    };
    
    
    --定义pig函数2 支持排序
    --A 关系引用标量
    --order_field 排序的字段
    --order_type 排序方式 desc ? asc ?
    --storedir 存储的HDFS路径
    --空返回值
    define my_order(A,order_field,order_type,storedir) returns void {
     
      d = order $A by $order_field $order_type ;
      store  d into '$storedir' ;  
     
    
    }; 
    
    
    --定义pig函数3,支持filter过滤,以及宏命令里面调用
    --A 关系引用标量
    --field 过滤的字段
    --count 阈值
    --返回最终的引用结果
    
    define  myfilter (A,field,count) returns B{
    
       b= filter $A by $field > $count ;
    
       $B = group_and_count(b,'sex',1);
    
    };
    
    
    
    [search@dnode1 pigmacros]$ 



    二,主体脚本文件

    Java代码 复制代码 收藏代码
    1. --导入pig公用的函数库   
    2.   
    3. import 'function.pig' ;   
    4.   
    5. a = load  '/tmp/dongliang/318/person' using PigStorage(',') AS (id:int,name:chararray,sex:chararray,age:int,address:chararray) ;   
    6.   
    7.   
    8. --------pig函数1测试-----------------   
    9.   
    10. --定义按名字分组   
    11. --bb = group_and_count(a,name,1);   
    12. --定义按性别分组   
    13. --cc = group_and_count(a,sex,1);   
    14. --dump bb;   
    15. --dump cc;   
    16.   
    17.   
    18. -------pig函数2测试------------------   
    19.   
    20. --按年龄降序   
    21. --my_order(a,age,'desc','/tmp/dongliang/318/z');   
    22. --dump a;   
    23.   
    24.   
    25. -------pig函数3测试------------------   
    26.   
    27.  --过滤年龄大于20的,并按性别,分组统计数量   
    28.  r =  myfilter(a,'age',20);   
    29.  dump r;  
    --导入pig公用的函数库
    
    import 'function.pig' ;
    
    a = load  '/tmp/dongliang/318/person' using PigStorage(',') AS (id:int,name:chararray,sex:chararray,age:int,address:chararray) ;
    
    
    --------pig函数1测试-----------------
    
    --定义按名字分组
    --bb = group_and_count(a,name,1);
    --定义按性别分组
    --cc = group_and_count(a,sex,1);
    --dump bb;
    --dump cc;
    
    
    -------pig函数2测试------------------
    
    --按年龄降序
    --my_order(a,age,'desc','/tmp/dongliang/318/z');
    --dump a;
    
    
    -------pig函数3测试------------------
    
     --过滤年龄大于20的,并按性别,分组统计数量
     r =  myfilter(a,'age',20);
     dump r;


    需要注意的是,导入的函数文件,需要用单引号引起来,这样我们就完成了pig函数的重用,是不是非常类似shell的语法呢?有兴趣的同学们,赶紧体验一把吧!  

  • 相关阅读:
    重温CLR(七 ) 属性和事件
    重温CLR(六)方法和参数
    KMP算法
    Webstorm 2019最新激活码
    bash: cd: too many arguments 报错
    mongoDB线上数据库连接报错记录
    常见的 eslint 基本报错信息
    git 查看项目代码统计命令
    npm 删除指定的某个包以及再次安装
    vue.config.js常用配置
  • 原文地址:https://www.cnblogs.com/qindongliang/p/4349137.html
Copyright © 2011-2022 走看看